ND - 2. ROS essentials - run
Simple arm - Basic
init workspace
$ mkdir -p ~/catkin_ws/src
$ cd ~/catkin_ws/src
$ catkin_init_workspace
$ cd ~/catkin_ws
$ catkin_make
clone package
$ cd ~/catkin_ws/src
$ git clone https://github.com/udacity/simple_arm_01.git simple_arm
$ cd ~/catkin_ws
$ catkin_make
first launch
$ apt-get install ros-kinetic-controller-manager
$ catkin_make
$ source devel/setup.bash
$ roslaunch simple_arm robot_spawn.launch
Check dependence
$ rosdep check simple_arm
$ rosdep install -i simple_arm
Create Package
$ cd ~/catkin_ws/src
# catkin_create_pkg <your_package_name> [dependency1 dependency2 …]
$ catkin_create_pkg first_package
Simple arm - Architecture
node[simple_mover] -> publish -> simple_arm
node[arm_mover] -> service[move_arm] <- request[movement_commands] <- other node
node[arm_mover] -> service[safe_move] (minimum and maximum joint angles, by using parameters.)
node[arm_mover] -> service[safe_move] <- node[look_away] <- subscribe <- topic <- camera image
node[simple_mover] - Publisher
$ cd ~/catkin_ws/src/simple_arm/
$ mkdir scripts
$ cd scripts
$ touch simple_mover
$ chmod u+x simple_mover
$ vim simple_mover
node內定義publisher, 送message到topic
import math
import rospy
from std_msgs.msg import Float64
def mover():
pub_j1 = rospy.Publisher('/simple_arm/joint_1_position_controller/command', Float64, queue_size=10)
pub_j2 = rospy.Publisher('/simple_arm/joint_2_position_controller/command', Float64, queue_size=10)
rospy.init_node('arm_mover') # must be called before any rospy package functions are called.
rate = rospy.Rate(10)
start_time = 0
while not start_time:
start_time = rospy.Time.now().to_sec() # rospy.Time.now() will initially return 0, until the 1st message has been received on the /clock topic.
while not rospy.is_shutdown():
elapsed = rospy.Time.now().to_sec() - start_time
pub_j1.publish(math.sin(2*math.pi*0.1*elapsed)*(math.pi/2))
pub_j2.publish(math.sin(2*math.pi*0.1*elapsed)*(math.pi/2))
rate.sleep()
if __name__ == '__main__':
try:
mover()
except rospy.ROSInterruptException:
pass
Pattern
r = rospy.Rate(10) # 10hz
while not rospy.is_shutdown():
pub.publish("hello")
r.sleep()
# New in ROS 1.5+
def my_callback(event):
print 'Timer called at ' + str(event.current_real)
rospy.Timer(rospy.Duration(2), my_callback)
Service definition
$ cd ~/catkin_ws/src/simple_arm/
$ mkdir srv
$ cd srv
$ touch GoToPosition.srv
GoToPosition.srv
float64 joint_1
float64 joint_2
---
duration time_elapsed
CMakeLists.txt
In order for catkin to generate the python / C++ which allow you to utilize messages in your code
~/catkin_ws/src/simple_arm/CMakeLists.txt
find_package(catkin REQUIRED COMPONENTS
std_msgs
message_generation
)
## Generate services in the 'srv' folder
add_service_files(
FILES
GoToPosition.srv
)
generate_messages(
DEPENDENCIES
std_msgs # Or other packages containing msgs
)
package.xml
defining package’s properties
rosdep
parsepackage.xml
package.xml
<buildtool_depend>catkin</buildtool_depend>
<build_depend>message_generation</build_depend>
<run_depend>controller_manager</run_depend>
<run_depend>effort_controllers</run_depend>
<run_depend>gazebo_plugins</run_depend>
<run_depend>gazebo_ros</run_depend>
<run_depend>gazebo_ros_control</run_depend>
<run_depend>joint_state_controller</run_depend>
<run_depend>joint_state_publisher</run_depend>
<run_depend>robot_state_publisher</run_depend>
<run_depend>message_runtime</run_depend>
<run_depend>xacro</run_depend>
$ cd ~/catkin_ws
$ catkin_make
$ source devel/setup.bash
$ cd devel/lib/python2.7/dist-packages
$ ls // a python package containing a module for the new service GoToPosition has been created deep down in the devel directory.
// /simple_arm
$ env | grep PYTHONPATH # new simple_arm package has now become part of your PYTHONPATH
// PYTHONPATH=/home/robond/workspace/catkin_ws/devel/lib/python2.7/dist-package:/opt/ros/kinetic/lib/python2.7/dis-packages
node[arm_mover] - Service
$ cd ~/catkin_ws
$ cd src/simple_arm/scripts
$ touch arm_mover
$ chmod u+x arm_mover
#!/usr/bin/env python
import math
import rospy
from std_msgs.msg import Float64
from sensor_msgs.msg import JointState
from simple_arm.srv import *
def at_goal(pos_j1, goal_j1, pos_j2, goal_j2):
tolerance = .05
result = abs(pos_j1 - goal_j1) <= abs(tolerance)
result = result and abs(pos_j2 - goal_j2) <= abs(tolerance)
return result
def clamp_at_boundaries(requested_j1, requested_j2):
clamped_j1 = requested_j1
clamped_j2 = requested_j2
min_j1 = rospy.get_param('~min_joint_1_angle', 0) # ~min_joint_1_angle = /arm_mover/min_joint_1_angle
max_j1 = rospy.get_param('~max_joint_1_angle', 2*math.pi) # 2nd parameter = default value
min_j2 = rospy.get_param('~min_joint_2_angle', 0)
max_j2 = rospy.get_param('~max_joint_2_angle', 2*math.pi)
if not min_j1 <= requested_j1 <= max_j1:
clamped_j1 = min(max(requested_j1, min_j1), max_j1)
rospy.logwarn('j1 is out of bounds, valid range (%s,%s), clamping to: %s',
min_j1, max_j1, clamped_j1)
if not min_j2 <= requested_j2 <= max_j2:
clamped_j2 = min(max(requested_j2, min_j2), max_j2)
rospy.logwarn('j2 is out of bounds, valid range (%s,%s), clamping to: %s',
min_j2, max_j2, clamped_j2)
return clamped_j1, clamped_j2
def move_arm(pos_j1, pos_j2):
time_elapsed = rospy.Time.now()
j1_publisher.publish(pos_j1)
j2_publisher.publish(pos_j2)
while True:
# wait_for_message is not a good practice.
joint_state = rospy.wait_for_message('/simple_arm/joint_states', JointState) # wait for message from topic
if at_goal(joint_state.position[0], pos_j1, joint_state.position[1], pos_j2):
time_elapsed = joint_state.header.stamp - time_elapsed
break
return time_elapsed
def handle_safe_move_request(req):
rospy.loginfo('GoToPositionRequest Received - j1:%s, j2:%s', req.joint_1, req.joint_2)
clamp_j1, clamp_j2 = clamp_at_boundaries(req.joint_1, req.joint_2)
time_elapsed = move_arm(clamp_j1, clamp_j2)
return GoToPositionResponse(time_elapsed)
def mover_service():
rospy.init_node('arm_mover')
service = rospy.Service('~safe_move', GoToPosition, handle_safe_move_request) # /arm_mover/safe_move
rospy.spin() # keep process running, not complete execution.
if __name__ == '__main__':
j1_publisher = rospy.Publisher('/simple_arm/joint_1_position_controller/command', Float64, queue_size=10)
j2_publisher = rospy.Publisher('/simple_arm/joint_2_position_controller/command', Float64, queue_size=10)
try:
mover_service()
except rospy.ROSInterruptException:
pass
Launch File
~/catkin_ws/src/simple_arm/launch/robot_spawn.launch
<!-- The arm mover node -->
<node name="arm_mover" type="arm_mover" pkg="simple_arm">
<rosparam>
min_joint_1_angle: 0
max_joint_1_angle: 1.57
min_joint_2_angle: 0
max_joint_2_angle: 1.0
</rosparam>
</node>
RUN
$ cd ~/catkin_ws
$ catkin_make
$ source devel/setup.bash
$ roslaunch simple_arm robot_spawn.launch
$ rosnode list
$ rosservice list
service (/arm_mover/safe_move
) and the node (/arm_mover
) show up as expected.
$ rqt_image_view /rgb_camera/image_raw
$ rosservice call /arm_mover/safe_move "joint_1: 1.57 joint_2: 1.57"
$ rosparam set /arm_mover/max_joint_2_angle 1.57
$ rosservice call /arm_mover/safe_move "joint_1: 1.57 joint_2: 1.57"
Change Params
# ~/catkin_ws/src/simple_arm/launch/robot_spawn.launch
<!-- The arm mover node -->
<node name="arm_mover" type="arm_mover" pkg="simple_arm">
<rosparam>
min_joint_1_angle: 0
max_joint_1_angle: 1.57
min_joint_2_angle: 0
max_joint_2_angle: 1.57
</rosparam>
</node>
node[look_away] - Subscriber
# ~/catkin_ws/src/simple_arm/launch/robot_spawn.launch
<!-- The look away node -->
<node name="look_away" type="look_away" pkg="simple_arm"/>
$ cd ~/catkin_ws
$ cd src/simple_arm/scripts
$ touch look_away
$ chmod u+x look_away
$ vim look_away
#!/usr/bin/env python
import math
import rospy
from sensor_msgs.msg import Image, JointState
from simple_arm.srv import *
class LookAway(object):
def __init__(self):
rospy.init_node('look_away')
self.sub1 = rospy.Subscriber('/simple_arm/joint_states', JointState, self.joint_states_callback)
self.sub2 = rospy.Subscriber("rgb_camera/image_raw", Image, self.look_away_callback)
self.safe_move = rospy.ServiceProxy('/arm_mover/safe_move', GoToPosition) # enables calling a service from a node.
self.last_position = None
self.arm_moving = False
rospy.spin()
def uniform_image(self, image):
return all(value == image[0] for value in image)
def coord_equal(self, coord_1, coord_2):
if coord_1 is None or coord_2 is None:
return False
tolerance = .0005
result = abs(coord_1[0] - coord_2[0]) <= abs(tolerance)
result = result and abs(coord_1[1] - coord_2[1]) <= abs(tolerance)
return result
def joint_states_callback(self, data):
if self.coord_equal(data.position, self.last_position):
self.arm_moving = False
else:
self.last_position = data.position
self.arm_moving = True
def look_away_callback(self, data):
if not self.arm_moving and self.uniform_image(data.data):
try:
rospy.wait_for_service('/arm_mover/safe_move')
# define request msg
msg = GoToPositionRequest()
msg.joint_1 = 1.57
msg.joint_2 = 1.57
# send request and get response
response = self.safe_move(msg)
rospy.logwarn("Camera detecting uniform image. \
Elapsed time to look at something nicer:\n%s",
response)
except rospy.ServiceException, e:
rospy.logwarn("Service call failed: %s", e)
if __name__ == '__main__':
try:
LookAway()
except rospy.ROSInterruptException:
pass
RUN
$ cd ~/catkin_ws
$ catkin_make
$ source devel/setup.bash
$ roslaunch simple_arm robot_spawn.launch
$ rqt_image_view /rgb_camera/image_raw // view camera image
// to verify look_away
$ rosservice call /arm_mover/safe_move "joint_1: 0
joint_2: 0"
Last updated
Was this helpful?