ND - 2. ROS essentials - run

Simple arm - Basic

init workspace

$ mkdir -p ~/catkin_ws/src
$ cd ~/catkin_ws/src
$ catkin_init_workspace
$ cd ~/catkin_ws
$ catkin_make

clone package

$ cd ~/catkin_ws/src
$ git clone https://github.com/udacity/simple_arm_01.git simple_arm

$ cd ~/catkin_ws
$ catkin_make

first launch

$ apt-get install ros-kinetic-controller-manager
$ catkin_make
$ source devel/setup.bash

$ roslaunch simple_arm robot_spawn.launch

Check dependence

$ rosdep check simple_arm
$ rosdep install -i simple_arm

Create Package

$ cd ~/catkin_ws/src

# catkin_create_pkg <your_package_name> [dependency1 dependency2 …]
$ catkin_create_pkg first_package

Simple arm - Architecture

node[simple_mover] -> publish -> simple_arm

node[arm_mover] -> service[move_arm] <- request[movement_commands] <- other node

node[arm_mover] -> service[safe_move] (minimum and maximum joint angles, by using parameters.)

node[arm_mover] -> service[safe_move] <- node[look_away] <- subscribe <- topic <- camera image

node[simple_mover] - Publisher

$ cd ~/catkin_ws/src/simple_arm/
$ mkdir scripts
$ cd scripts

$ touch simple_mover
$ chmod u+x simple_mover

$ vim simple_mover

node內定義publisher, 送message到topic

import math
import rospy
from std_msgs.msg import Float64

def mover():
    pub_j1 = rospy.Publisher('/simple_arm/joint_1_position_controller/command', Float64, queue_size=10)
    pub_j2 = rospy.Publisher('/simple_arm/joint_2_position_controller/command', Float64, queue_size=10)
    rospy.init_node('arm_mover') #  must be called before any rospy package functions are called.
    rate = rospy.Rate(10)
    start_time = 0

    while not start_time:
        start_time = rospy.Time.now().to_sec() # rospy.Time.now() will initially return 0, until the 1st message has been received on the /clock topic. 

    while not rospy.is_shutdown():
        elapsed = rospy.Time.now().to_sec() - start_time
        pub_j1.publish(math.sin(2*math.pi*0.1*elapsed)*(math.pi/2))
        pub_j2.publish(math.sin(2*math.pi*0.1*elapsed)*(math.pi/2))
        rate.sleep()

if __name__ == '__main__':
    try:
        mover()
    except rospy.ROSInterruptException:
        pass

Pattern

r = rospy.Rate(10) # 10hz
while not rospy.is_shutdown():
    pub.publish("hello")
    r.sleep()

# New in ROS 1.5+
def my_callback(event):
    print 'Timer called at ' + str(event.current_real)

rospy.Timer(rospy.Duration(2), my_callback)

Service definition

$ cd ~/catkin_ws/src/simple_arm/
$ mkdir srv
$ cd srv
$ touch GoToPosition.srv

GoToPosition.srv

float64 joint_1
float64 joint_2
---
duration time_elapsed

CMakeLists.txt

In order for catkin to generate the python / C++ which allow you to utilize messages in your code

~/catkin_ws/src/simple_arm/CMakeLists.txt

find_package(catkin REQUIRED COMPONENTS
        std_msgs
        message_generation
)

## Generate services in the 'srv' folder
add_service_files(
   FILES
   GoToPosition.srv
)

generate_messages(
   DEPENDENCIES
   std_msgs  # Or other packages containing msgs
)

package.xml

  1. defining package’s properties

  2. rosdep parse package.xml

package.xml

  <buildtool_depend>catkin</buildtool_depend>
  <build_depend>message_generation</build_depend>

  <run_depend>controller_manager</run_depend>
  <run_depend>effort_controllers</run_depend>
  <run_depend>gazebo_plugins</run_depend>
  <run_depend>gazebo_ros</run_depend>
  <run_depend>gazebo_ros_control</run_depend>
  <run_depend>joint_state_controller</run_depend>
  <run_depend>joint_state_publisher</run_depend>
  <run_depend>robot_state_publisher</run_depend>
  <run_depend>message_runtime</run_depend>
  <run_depend>xacro</run_depend>
$ cd ~/catkin_ws
$ catkin_make

$ source devel/setup.bash
$ cd devel/lib/python2.7/dist-packages
$ ls // a python package containing a module for the new service GoToPosition has been created deep down in the devel directory.
// /simple_arm

$ env | grep PYTHONPATH # new simple_arm package has now become part of your PYTHONPATH
// PYTHONPATH=/home/robond/workspace/catkin_ws/devel/lib/python2.7/dist-package:/opt/ros/kinetic/lib/python2.7/dis-packages

node[arm_mover] - Service

$ cd ~/catkin_ws
$ cd src/simple_arm/scripts
$ touch arm_mover
$ chmod u+x arm_mover
#!/usr/bin/env python

import math
import rospy
from std_msgs.msg import Float64
from sensor_msgs.msg import JointState
from simple_arm.srv import *

def at_goal(pos_j1, goal_j1, pos_j2, goal_j2):
    tolerance = .05
    result = abs(pos_j1 - goal_j1) <= abs(tolerance)
    result = result and abs(pos_j2 - goal_j2) <= abs(tolerance)
    return result

def clamp_at_boundaries(requested_j1, requested_j2):
    clamped_j1 = requested_j1
    clamped_j2 = requested_j2

    min_j1 = rospy.get_param('~min_joint_1_angle', 0) # ~min_joint_1_angle = /arm_mover/min_joint_1_angle
    max_j1 = rospy.get_param('~max_joint_1_angle', 2*math.pi) # 2nd parameter = default value
    min_j2 = rospy.get_param('~min_joint_2_angle', 0)
    max_j2 = rospy.get_param('~max_joint_2_angle', 2*math.pi)

    if not min_j1 <= requested_j1 <= max_j1:
        clamped_j1 = min(max(requested_j1, min_j1), max_j1)
        rospy.logwarn('j1 is out of bounds, valid range (%s,%s), clamping to: %s',
                      min_j1, max_j1, clamped_j1)

    if not min_j2 <= requested_j2 <= max_j2:
        clamped_j2 = min(max(requested_j2, min_j2), max_j2)
        rospy.logwarn('j2 is out of bounds, valid range (%s,%s), clamping to: %s',
                      min_j2, max_j2, clamped_j2)

    return clamped_j1, clamped_j2

def move_arm(pos_j1, pos_j2):
    time_elapsed = rospy.Time.now()
    j1_publisher.publish(pos_j1)
    j2_publisher.publish(pos_j2)

    while True:
        # wait_for_message is not a good practice.
        joint_state = rospy.wait_for_message('/simple_arm/joint_states', JointState) # wait for message from topic
        if at_goal(joint_state.position[0], pos_j1, joint_state.position[1], pos_j2):
            time_elapsed = joint_state.header.stamp - time_elapsed
            break

    return time_elapsed

def handle_safe_move_request(req):
    rospy.loginfo('GoToPositionRequest Received - j1:%s, j2:%s', req.joint_1, req.joint_2)
    clamp_j1, clamp_j2 = clamp_at_boundaries(req.joint_1, req.joint_2)
    time_elapsed = move_arm(clamp_j1, clamp_j2)

    return GoToPositionResponse(time_elapsed)

def mover_service():
    rospy.init_node('arm_mover')
    service = rospy.Service('~safe_move', GoToPosition, handle_safe_move_request) # /arm_mover/safe_move 
    rospy.spin() # keep process running, not complete execution.

if __name__ == '__main__':
    j1_publisher = rospy.Publisher('/simple_arm/joint_1_position_controller/command', Float64, queue_size=10)
    j2_publisher = rospy.Publisher('/simple_arm/joint_2_position_controller/command', Float64, queue_size=10)

    try:
        mover_service()
    except rospy.ROSInterruptException:
        pass

Launch File

~/catkin_ws/src/simple_arm/launch/robot_spawn.launch

  <!-- The arm mover node -->
  <node name="arm_mover" type="arm_mover" pkg="simple_arm">
    <rosparam>
      min_joint_1_angle: 0
      max_joint_1_angle: 1.57
      min_joint_2_angle: 0
      max_joint_2_angle: 1.0
    </rosparam>
  </node>

RUN

$ cd ~/catkin_ws
$ catkin_make
$ source devel/setup.bash
$ roslaunch simple_arm robot_spawn.launch
$ rosnode list
$ rosservice list

service (/arm_mover/safe_move) and the node (/arm_mover) show up as expected.

$ rqt_image_view /rgb_camera/image_raw
$ rosservice call /arm_mover/safe_move "joint_1: 1.57 joint_2: 1.57"
$ rosparam set /arm_mover/max_joint_2_angle 1.57
$ rosservice call /arm_mover/safe_move "joint_1: 1.57 joint_2: 1.57"

Change Params

# ~/catkin_ws/src/simple_arm/launch/robot_spawn.launch

  <!-- The arm mover node -->
  <node name="arm_mover" type="arm_mover" pkg="simple_arm">
    <rosparam>
      min_joint_1_angle: 0
      max_joint_1_angle: 1.57
      min_joint_2_angle: 0
      max_joint_2_angle: 1.57
    </rosparam>
  </node>

node[look_away] - Subscriber

# ~/catkin_ws/src/simple_arm/launch/robot_spawn.launch

  <!-- The look away node -->
  <node name="look_away" type="look_away" pkg="simple_arm"/>
$ cd ~/catkin_ws
$ cd src/simple_arm/scripts
$ touch look_away
$ chmod u+x look_away

$ vim look_away
#!/usr/bin/env python

import math
import rospy
from sensor_msgs.msg import Image, JointState
from simple_arm.srv import *

class LookAway(object):
    def __init__(self):
        rospy.init_node('look_away')

        self.sub1 = rospy.Subscriber('/simple_arm/joint_states', JointState, self.joint_states_callback)
        self.sub2 = rospy.Subscriber("rgb_camera/image_raw", Image, self.look_away_callback)
        self.safe_move = rospy.ServiceProxy('/arm_mover/safe_move', GoToPosition) # enables calling a service from a node. 

        self.last_position = None
        self.arm_moving = False

        rospy.spin()

    def uniform_image(self, image):
        return all(value == image[0] for value in image)

    def coord_equal(self, coord_1, coord_2):
        if coord_1 is None or coord_2 is None:
            return False
        tolerance = .0005
        result = abs(coord_1[0] - coord_2[0]) <= abs(tolerance)
        result = result and abs(coord_1[1] - coord_2[1]) <= abs(tolerance)
        return result

    def joint_states_callback(self, data):
        if self.coord_equal(data.position, self.last_position):
            self.arm_moving = False
        else:
            self.last_position = data.position
            self.arm_moving = True

    def look_away_callback(self, data):
        if not self.arm_moving and self.uniform_image(data.data):
            try:
                rospy.wait_for_service('/arm_mover/safe_move')
                # define request msg
                msg = GoToPositionRequest()
                msg.joint_1 = 1.57
                msg.joint_2 = 1.57
                # send request and get response
                response = self.safe_move(msg)

                rospy.logwarn("Camera detecting uniform image. \
                               Elapsed time to look at something nicer:\n%s", 
                               response)

            except rospy.ServiceException, e:
                rospy.logwarn("Service call failed: %s", e)



if __name__ == '__main__':
    try: 
        LookAway()
    except rospy.ROSInterruptException:
        pass

RUN

$ cd ~/catkin_ws
$ catkin_make
$ source devel/setup.bash
$ roslaunch simple_arm robot_spawn.launch
$ rqt_image_view /rgb_camera/image_raw // view camera image
// to verify look_away
$ rosservice call /arm_mover/safe_move "joint_1: 0 
joint_2: 0"

Last updated

Was this helpful?