Enable the robot joint trajectory interface, parse a file created using the joint position recorder example, and send the resulting joint trajectory to the action server.
Overview
A commonly used ROS method for robot arm motion control is the joint trajectory action interface. The trajectory_controller and it's corresponding joint trajectory action server is the intera_interface implementation to support this action interface. This example shows usage for launching the joint trajectory action server, and parsing of a text file describing timestamped joint position goals into a joint trajectory action call to be commanded to the joint trajectory action server.
Introduction
This example demonstrates the usage of the Joint Trajectory Action server
to command raw joint position commands. The main()
creates and instance of the Trajectory class and calls the parse_file()
method. This method is responsible for parsing the input file and packing them as Request message for the Joint Trajectory Action server
. The start()
method is then called, which sends the Request messages to the action server. Finally, wait() is then called, which verifies if the trajectory execution was successful. Action Servers - robot/limb/right/follow_joint_trajectory
.
If you would like to follow along with the actual source code for the example on GitHub, it can be found through this link for joint trajectory file playback example.
Usage
Verify that the robot is enabled from an SDK terminal session, if the robot is not enabled run:
$ rosrun intera_interface robot_enable.py
Record a joint position file using the recorder.py example, ex:
$ rosrun intera_examples recorder.py -f <position_file_name>
The recorder is now recording joint positions with corresponding timestamps for robot arm. Move the arms while holding the cuffs. If the position file name exists, the function will overwrite existing file.
NOTE: You can open and close the gripper while recording by using the robot's cuff buttons: Oval(lower) = Close, Circle(Upper) = Open
Start the joint trajectory controller, ex:
$ rosrun intera_interface joint_trajectory_action_server.py --mode velocity
In another RSDK terminal session, Run the joint trajectory playback example program, ex:
$ rosrun intera_examples joint_trajectory_file_playback.py -f <position_file_name>
The robot arm will then be commanded to repeat the trajectory recorded during the joint position recording. The difference between this example and the joint_position playback example is that the trajectory controller has the ability to honor the velocities (due to the timestamps) to more accurately repeating the recorded trajectory.
Arguments
Important Arguments: Arguments for joint_trajectory_action_server.py
See the trajectory controller's usage on the command line by passing trajectory_controller.py the -h, help argument:
$ rosrun intera_interface joint_trajectory_action_server.py -h
Intera SDK Joint Trajectory Controller
Unlike other robots running ROS, this is not a Motor Controller plugin,
but a regular node using the SDK interface.
optional arguments:
-h, --help show this help message and exit
-l {right}, --limb {right}
joint trajectory action server limb (default: right)
-r RATE, --rate RATE trajectory control rate (Hz) (default: 100.0)
-m {position_w_id,position,velocity}, --mode {position_w_id,position,velocity}
control mode for trajectory execution (default:
position_w_id)
Important Arguments: Arguments for joint_trajectory_file_playback.py
See the joint_trajectory_file_playback's usage on the command line by passing joint_trajectory_file_playback.py the -h, help argument:
$ rosrun intera_interface joint_trajectory_file_playback.py -h
RSDK Joint Trajectory Example: File Playback
Plays back joint positions honoring timestamps recorded
via the joint_recorder example.
Run the joint_recorder.py example first to create a recording
file for use with this example. Then make sure to start the
joint_trajectory_action_server before running this example.
This example will use the joint trajectory action server
with velocity control to follow the positions and times of
the recorded motion, accurately replicating movement speed
necessary to hit each trajectory point on time.
required arguments:
-f FILE, --file FILE path to input file
optional arguments:
-h, --help show this help message and exit
-l LOOPS, --loops LOOPS
number of playback loops. 0=infinite. (default: 1)
Interfaces
ROS APIs:
See the API Reference page for details: ROS APIs
Joint Trajectory Action Server - /robot/limb/right/follow_joint_trajectory [control_msgs/FollowJointTrajectoryAction]
Intera_interface APIs:
JointTrajectoryActionServer class: joint_trajectory_action_server.py
Code Walkthrough
Now, let's break down the code.
33 import argparse
34 import operator
35 import sys
36 import threading
37
38 from bisect import bisect
39 from copy import copy
40 from os import path
41
42 import rospy
43
44 import actionlib
45
46 from control_msgs.msg import (
47 FollowJointTrajectoryAction,
48 FollowJointTrajectoryGoal,
49 )
50 from trajectory_msgs.msg import (
51 JointTrajectoryPoint,
52 )
53
54 import intera_interface
55
56 from intera_interface import CHECK_VERSION
This imports the intera_interface for accessing the limb and the gripper class. The actionlib
is imported to use its Action Server
class.The CHECK_VERSION
is imported to check if the software running on the robot would be compatible with this local version. It is not necessary to check the version in custom programs.
59 class Trajectory(object):
60 def __init__(self, limb="right"):
61 #create our action server clients
62 self._limb_client = actionlib.SimpleActionClient(
63 'robot/limb/right/follow_joint_trajectory',
64 FollowJointTrajectoryAction,
65 )
66
67 #verify joint trajectory action servers are available
68 is_server_up = self._limb_client.wait_for_server(rospy.Duration(10.0))
69 if not is_server_up:
70 msg = ("Action server not available."
71 " Verify action server availability.")
72 rospy.logerr(msg)
73 rospy.signal_shutdown(msg)
74 sys.exit(1)
Action server clients for the right limb is created. The existence of action server for right limb is checked with a timeout of 10 seconds. If it is not available, a shutdown signal is sent and the program exits.
76 #create our goal request
77 self.goal = FollowJointTrajectoryGoal()
78
79 #limb interface - current angles needed for start move
80 self.arm = intera_interface.Limb(limb)
81
82 self.limb = limb
83 self.gripper_name = '_'.join([limb, 'gripper'])
84 #gripper interface - for gripper command playback
85 try:
86 self.gripper = intera_interface.Gripper(limb)
87 self.has_gripper = True
88 except:
89 self.has_gripper = False
90 rospy.loginfo("Did not detect a connected electric gripper.")
91
92 #flag to signify the arm trajectories have begun executing
93 self._arm_trajectory_started = False
94 #reentrant lock to prevent same-thread lockout
95 self._lock = threading.RLock()
The request message that will hold the goal position for right limbs' action server is created. The current joint is also captured. If there is a electric gripper connected, the gripper positions will be captured as well.
97 # Verify Grippers Have No Errors and are Calibrated
98 if self.has_gripper:
99 if self.gripper.has_error():
100 self.gripper.reboot()
101 if not self.gripper.is_calibrated():
102 self.gripper.calibrate()
103
104 #gripper goal trajectories
105 self.grip = FollowJointTrajectoryGoal()
106
107 #gripper control rate
108 self._gripper_rate = 20.0 # Hz
109
110 # Timing offset to prevent gripper playback before trajectory has started
111 self._slow_move_offset = 0.0
112 self._trajectory_start_offset = rospy.Duration(0.0)
113 self._trajectory_actual_offset = rospy.Duration(0.0)
114
115 #param namespace
116 self._param_ns = '/rsdk_joint_trajectory_action_server/'
The error()
method returns whether or not the gripper is in an error state. The possible cause of errors might be over/under-voltage, over/under-current, motor faults, etc. The calibrated()
method returns a boolean, true if the gripper is already calibrated. The request message that will hold the goal position for the gripper is created. The namespace is modified and the gripper's control rates is modified.
118 def _execute_gripper_commands(self):
119 start_time = rospy.get_time() - self._trajectory_actual_offset.to_sec()
120 grip_cmd = self.grip.trajectory.points
121 pnt_times = [pnt.time_from_start.to_sec() for pnt in grip_cmd]
122 end_time = pnt_times[-1]
123 rate = rospy.Rate(self._gripper_rate)
124 now_from_start = rospy.get_time() - start_time
125 while(now_from_start < end_time + (1.0 / self._gripper_rate) and
126 not rospy.is_shutdown()):
127 idx = bisect(pnt_times, now_from_start) - 1
128 if self.has_gripper:
129 self.gripper.set_position(grip_cmd[idx].positions[0])
130 rate.sleep()
131 now_from_start = rospy.get_time() - start_time
The grip_cmd variable holds the gripper' position that was parsed from the file. The corresponding time stamps are read into pnt_times variable. idx holds the index of the timestamp from the file, relative to the current one. It is important to note that the velocity at which the Joint Positions was traversed is preserved during the playback. The corresponding gripper's position is then commanded to the action server.
133 def _clean_line(self, line, joint_names):
134 """
135 Cleans a single line of recorded joint positions
136
137 @param line: the line described in a list to process
138 @param joint_names: joint name keys
139
140 @return command: returns dictionary {joint: value} of valid commands
141 @return line: returns list of current line values stripped of commas
142 """
143 def try_float(x):
144 try:
145 return float(x)
146 except ValueError:
147 return None
148 #convert the line of strings to a float or None
149 line = [try_float(x) for x in line.rstrip().split(',')]
150 #zip the values with the joint names
151 combined = zip(joint_names[1:], line[1:])
152 #take out any tuples that have a none value
153 cleaned = [x for x in combined if x[1] is not None]
154 #convert it to a dictionary with only valid commands
155 command = dict(cleaned)
156 return (command, line,)
This try_float()
function replaces the contents of the variable passed with float and none values. Then, a tuple is constructed by zipping the names passed with the values from the variable line. This may contain valid values as well as none values . After removing the none values, a dictionary is constructed. This dictionary is then parsed to create valid Joint commands.
158 def _add_point(self, positions, side, time):
159 """
160 Appends trajectory with new point
161
162 @param positions: joint positions
163 @param side: limb to command point
164 @param time: time from start for point in seconds
165 """
166 #creates a point in trajectory with time_from_start and positions
167 point = JointTrajectoryPoint()
168 point.positions = copy(positions)
169 point.time_from_start = rospy.Duration(time)
170 if side == self.limb:
171 self.goal.trajectory.points.append(point)
172 elif self.has_gripper and side == self.gripper_name:
173 self.grip.trajectory.points.append(point)
This method creates a request message of the JointTrajectoryPoint
type and appends the goal position based on the side of the limb/gripper that is requesting. It compiles a list of Joint Positions as the trajectory.
175 def parse_file(self, filename):
176 """
177 Parses input file into FollowJointTrajectoryGoal format
178
179 @param filename: input filename
180 """
181 #open recorded file
182 with open(filename, 'r') as f:
183 lines = f.readlines()
184 #read joint names specified in file
185 joint_names = lines[0].rstrip().split(',')
186 #parse joint names for right limb
187 for name in joint_names:
188 if self.limb == name[:-3]:
189 self.goal.trajectory.joint_names.append(name)
The lines are split into a list with ',' as the delimiter to extract the joint names. The arm of the current joint, found by stripping the "_<joint>" (last three characters) from the joint_name, is checked and stored.
191 def find_start_offset(pos):
192 #create empty lists
193 cur = []
194 cmd = []
195 dflt_vel = []
196 vel_param = self._param_ns + "%s_default_velocity"
197 #for all joints find our current and first commanded position
198 #reading default velocities from the parameter server if specified
199 for name in joint_names:
200 if self.limb == name[:-3]:
201 cmd.append(pos[name])
202 cur.append(self.arm.joint_angle(name))
203 prm = rospy.get_param(vel_param % name, 0.25)
204 dflt_vel.append(prm)
205 diffs = map(operator.sub, cmd, cur)
206 diffs = map(operator.abs, diffs)
207 #determine the largest time offset necessary across all joints
208 offset = max(map(operator.div, diffs, dflt_vel))
209 return offset
The first commanded position and the current position for all the joints are captured. The default values that were loaded into the param server are read. The largest time offset necessary across all the joints is calculated.
211 for idx, values in enumerate(lines[1:]):
212 #clean each line of file
213 cmd, values = self._clean_line(values, joint_names)
214 #find allowable time offset for move to start position
215 if idx == 0:
216 # Set the initial position to be the current pose.
217 # This ensures we move slowly to the starting point of the
218 # trajectory from the current pose - The user may have moved
219 # arm since recording
220 cur_cmd = [self.arm.joint_angle(jnt) for jnt in self.goal.trajectory.joint_names]
221 self._add_point(cur_cmd, self.limb, 0.0)
222 start_offset = find_start_offset(cmd)
223 # Gripper playback won't start until the starting movement's
224 # duration has passed, and the actual trajectory playback begins
225 self._slow_move_offset = start_offset
226 self._trajectory_start_offset = rospy.Duration(start_offset + values[0])
227 #add a point for this set of commands with recorded time
228 cur_cmd = [cmd[jnt] for jnt in self.goal.trajectory.joint_names]
229 self._add_point(cur_cmd, self.limb, values[0] + start_offset)
230 if self.has_gripper:
231 cur_cmd = [cmd[self.gripper_name]]
232 self._add_point(cur_cmd, self.gripper_name, values[0] + start_offset)
The non-float values are cleaned and a dictionary of valid Joint Commands is returned in the _clean_line() function as explained above. The time offset for move to start position is also calculated. The parsed set of recorded commands along with their recorded time is added to the goal list as well as gripper's command and time.
234 def _feedback(self, data):
235 # Test to see if the actual playback time has exceeded
236 # the move-to-start-pose timing offset
237 if (not self._get_trajectory_flag() and
238 data.actual.time_from_start >= self._trajectory_start_offset):
239 self._set_trajectory_flag(value=True)
240 self._trajectory_actual_offset = data.actual.time_from_start
Called in start function, test to see if the actual playback time has exceeded the move-to-start-pose timing offset.
242 def _set_trajectory_flag(self, value=False):
243 with self._lock:
244 # Assign a value to the flag
245 self._arm_trajectory_started = value
246
247 def _get_trajectory_flag(self):
248 temp_flag = False
249 with self._lock:
250 # Copy to external variable
251 temp_flag = self._arm_trajectory_started
252 return temp_flag
Assign the value to the trajectory flag function and get the trajectory flag value function.
254 def start(self):
255 """
256 Sends FollowJointTrajectoryAction request
257 """
258 self._limb_client.send_goal(self.goal, feedback_cb=self._feedback)
259 # Syncronize playback by waiting for the trajectories to start
260 while not rospy.is_shutdown() and not self._get_trajectory_flag():
261 rospy.sleep(0.05)
262 if self.has_gripper:
263 self._execute_gripper_commands()
This function sends the FollowJointTrajectoryAction request message which includes the recorded positions and their corresponding time, to the Joint Trajectory Action Server.
265 def stop(self):
266 """
267 Preempts trajectory execution by sending cancel goals
268 """
269 if (self._limb_client.gh is not None and
270 self._limb_client.get_state() == actionlib.GoalStatus.ACTIVE):
271 self._limb_client.cancel_goal()
272
273 #delay to allow for terminating handshake
274 rospy.sleep(0.1)
The Trajectory execution is stopped by sending cancel goals to the Joint trajectory action server.
276 def wait(self):
277 """
278 Waits for and verifies trajectory execution result
279 """
280 #create a timeout for our trajectory execution
281 #total time trajectory expected for trajectory execution plus a buffer
282 last_time = self.goal.trajectory.points[-1].time_from_start.to_sec()
283 time_buffer = rospy.get_param(self._param_ns + 'goal_time', 0.0) + 1.5
284 timeout = rospy.Duration(self._slow_move_offset +
285 last_time +
286 time_buffer)
287
288 finish = self._limb_client.wait_for_result(timeout)
289 result = (self._limb_client.get_result().error_code == 0)
290
291 #verify result
292 if all([finish, result]):
293 return True
294 else:
295 msg = ("Trajectory action failed or did not finish before "
296 "timeout/interrupt.")
297 rospy.logwarn(msg)
298 return False
This method waits for the completion of the trajectory execution by Joint trajectory action server.
301 def main():
302 """RSDK Joint Trajectory Example: File Playback
303
304 Plays back joint positions honoring timestamps recorded
305 via the joint_recorder example.
306
307 Run the joint_recorder.py example first to create a recording
308 file for use with this example. Then make sure to start the
309 joint_trajectory_action_server before running this example.
310
311 This example will use the joint trajectory action server
312 with velocity control to follow the positions and times of
313 the recorded motion, accurately replicating movement speed
314 necessary to hit each trajectory point on time.
315 """
316 epilog = """
317 Related examples:
318 joint_recorder.py; joint_position_file_playback.py.
319 """
320 arg_fmt = argparse.RawDescriptionHelpFormatter
321 parser = argparse.ArgumentParser(formatter_class=arg_fmt,
322 description=main.__doc__,
323 epilog=epilog)
324 parser.add_argument(
325 '-f', '--file', metavar='PATH', required=True,
326 help='path to input file'
327 )
328 parser.add_argument(
329 '-l', '--loops', type=int, default=1,
330 help='number of playback loops. 0=infinite.'
331 )
332 # remove ROS args and filename (sys.arv[0]) for argparse
333 args = parser.parse_args(rospy.myargv()[1:])
The name source file to read the Joint Position is captured along with the number of times the trajectory has to be looped from the command line.
335 print("Initializing node... ")
336 rospy.init_node("sdk_joint_trajectory_file_playback")
337 print("Getting robot state... ")
338 rs = intera_interface.RobotEnable(CHECK_VERSION)
339 print("Enabling robot... ")
340 rs.enable()
341 print("Running. Ctrl-c to quit")
342
343 traj = Trajectory()
344 traj.parse_file(path.expanduser(args.file))
345 #for safe interrupt handling
346 rospy.on_shutdown(traj.stop)
347 result = True
348 loop_cnt = 1
349 loopstr = str(args.loops)
350 if args.loops == 0:
351 args.loops = float('inf')
352 loopstr = "forever"
353 while (result == True and loop_cnt <= args.loops
354 and not rospy.is_shutdown()):
355 print("Playback loop %d of %s" % (loop_cnt, loopstr,))
356 traj.start()
357 result = traj.wait()
358 loop_cnt = loop_cnt + 1
359 print("Exiting - File Playback Complete")
360
361 if __name__ == "__main__":
362 main()
The node is initialized. An instance of the Trajectory
class is created. The parse_file()
method is called to extract the recorded Joint Positions and their corresponding timestamps as explained above. The start()
method is called to send the FollowJointTrajectory
request message to the Joint Trajectory Action server
. This loops for the user specified number of times.