$ cd ~/catkin_ws/src/simple_arm/
$ mkdir scripts
$ cd scripts
$ touch simple_mover
$ chmod u+x simple_mover
$ vim simple_mover
import math
import rospy
from std_msgs.msg import Float64
def mover():
pub_j1 = rospy.Publisher('/simple_arm/joint_1_position_controller/command', Float64, queue_size=10)
pub_j2 = rospy.Publisher('/simple_arm/joint_2_position_controller/command', Float64, queue_size=10)
rospy.init_node('arm_mover') # must be called before any rospy package functions are called.
rate = rospy.Rate(10)
start_time = 0
while not start_time:
start_time = rospy.Time.now().to_sec() # rospy.Time.now() will initially return 0, until the 1st message has been received on the /clock topic.
while not rospy.is_shutdown():
elapsed = rospy.Time.now().to_sec() - start_time
pub_j1.publish(math.sin(2*math.pi*0.1*elapsed)*(math.pi/2))
pub_j2.publish(math.sin(2*math.pi*0.1*elapsed)*(math.pi/2))
rate.sleep()
if __name__ == '__main__':
try:
mover()
except rospy.ROSInterruptException:
pass
r = rospy.Rate(10) # 10hz
while not rospy.is_shutdown():
pub.publish("hello")
r.sleep()
# New in ROS 1.5+
def my_callback(event):
print 'Timer called at ' + str(event.current_real)
rospy.Timer(rospy.Duration(2), my_callback)
$ cd ~/catkin_ws/src/simple_arm/
$ mkdir srv
$ cd srv
$ touch GoToPosition.srv
$ cd ~/catkin_ws
$ catkin_make
$ source devel/setup.bash
$ cd devel/lib/python2.7/dist-packages
$ ls // a python package containing a module for the new service GoToPosition has been created deep down in the devel directory.
// /simple_arm
$ env | grep PYTHONPATH # new simple_arm package has now become part of your PYTHONPATH
// PYTHONPATH=/home/robond/workspace/catkin_ws/devel/lib/python2.7/dist-package:/opt/ros/kinetic/lib/python2.7/dis-packages
$ cd ~/catkin_ws
$ cd src/simple_arm/scripts
$ touch arm_mover
$ chmod u+x arm_mover
#!/usr/bin/env python
import math
import rospy
from std_msgs.msg import Float64
from sensor_msgs.msg import JointState
from simple_arm.srv import *
def at_goal(pos_j1, goal_j1, pos_j2, goal_j2):
tolerance = .05
result = abs(pos_j1 - goal_j1) <= abs(tolerance)
result = result and abs(pos_j2 - goal_j2) <= abs(tolerance)
return result
def clamp_at_boundaries(requested_j1, requested_j2):
clamped_j1 = requested_j1
clamped_j2 = requested_j2
min_j1 = rospy.get_param('~min_joint_1_angle', 0) # ~min_joint_1_angle = /arm_mover/min_joint_1_angle
max_j1 = rospy.get_param('~max_joint_1_angle', 2*math.pi) # 2nd parameter = default value
min_j2 = rospy.get_param('~min_joint_2_angle', 0)
max_j2 = rospy.get_param('~max_joint_2_angle', 2*math.pi)
if not min_j1 <= requested_j1 <= max_j1:
clamped_j1 = min(max(requested_j1, min_j1), max_j1)
rospy.logwarn('j1 is out of bounds, valid range (%s,%s), clamping to: %s',
min_j1, max_j1, clamped_j1)
if not min_j2 <= requested_j2 <= max_j2:
clamped_j2 = min(max(requested_j2, min_j2), max_j2)
rospy.logwarn('j2 is out of bounds, valid range (%s,%s), clamping to: %s',
min_j2, max_j2, clamped_j2)
return clamped_j1, clamped_j2
def move_arm(pos_j1, pos_j2):
time_elapsed = rospy.Time.now()
j1_publisher.publish(pos_j1)
j2_publisher.publish(pos_j2)
while True:
# wait_for_message is not a good practice.
joint_state = rospy.wait_for_message('/simple_arm/joint_states', JointState) # wait for message from topic
if at_goal(joint_state.position[0], pos_j1, joint_state.position[1], pos_j2):
time_elapsed = joint_state.header.stamp - time_elapsed
break
return time_elapsed
def handle_safe_move_request(req):
rospy.loginfo('GoToPositionRequest Received - j1:%s, j2:%s', req.joint_1, req.joint_2)
clamp_j1, clamp_j2 = clamp_at_boundaries(req.joint_1, req.joint_2)
time_elapsed = move_arm(clamp_j1, clamp_j2)
return GoToPositionResponse(time_elapsed)
def mover_service():
rospy.init_node('arm_mover')
service = rospy.Service('~safe_move', GoToPosition, handle_safe_move_request) # /arm_mover/safe_move
rospy.spin() # keep process running, not complete execution.
if __name__ == '__main__':
j1_publisher = rospy.Publisher('/simple_arm/joint_1_position_controller/command', Float64, queue_size=10)
j2_publisher = rospy.Publisher('/simple_arm/joint_2_position_controller/command', Float64, queue_size=10)
try:
mover_service()
except rospy.ROSInterruptException:
pass