Introduction
This example demonstrates the usage of the Joint Trajectory Action Server to command raw joint position commands. The main()
creates and instance of the Trajectory
class and calls the parse_file()
method. This method is responsible for parsing the input file and packing them as Request message for the Joint Trajectory Action server
. The start()
method is then called, which sends the Request messages to the action server. Finally, wait()
is then called, which verifies if the trajectory execution was successful.< /br>
Action Servers -robot/limb/left/follow_joint_trajectory, robot/limb/right/follow_joint_trajectory.
Code Walkthrough
Now, let's break down the code.
33 import argparse
34 import operator
35 import sys
36
37 from bisect import bisect
38 from copy import copy
39 from os import path
40
41 import rospy
42
43 import actionlib
44
45 from control_msgs.msg import (
46 FollowJointTrajectoryAction,
47 FollowJointTrajectoryGoal,
48 )
49 from trajectory_msgs.msg import (
50 JointTrajectoryPoint,
51 )
52
53 import baxter_interface
54
55 from baxter_interface import CHECK_VERSION
This imports the baxter interface for accessing the limb and the gripper class. The actionlib
is imported to use its Action Server
class.The CHECK_VERSION
is imported to check if the software running on the robot would be compatible with this local version. It is not necessary to check the version in custom programs.
58 class Trajectory(object):
59 def __init__(self):
60 #create our action server clients
61 self._left_client = actionlib.SimpleActionClient(
62 'robot/limb/left/follow_joint_trajectory',
63 FollowJointTrajectoryAction,
64 )
65 self._right_client = actionlib.SimpleActionClient(
66 'robot/limb/right/follow_joint_trajectory',
67 FollowJointTrajectoryAction,
68 )
69
70 #verify joint trajectory action servers are available
71 l_server_up = self._left_client.wait_for_server(rospy.Duration(10.0))
72 r_server_up = self._right_client.wait_for_server(rospy.Duration(10.0))
73 if not l_server_up or not r_server_up:
74 msg = ("Action server not available."
75 " Verify action server availability.")
76 rospy.logerr(msg)
77 rospy.signal_shutdown(msg)
78 sys.exit(1)
Action server clients for the left and right limbs are created. The existence of action servers for both the limbs are checked with a timeout of 10 seconds. If they are not available, a shutdown signal is sent and the program exits.
79 #create our goal request
80 self._l_goal = FollowJointTrajectoryGoal()
81 self._r_goal = FollowJointTrajectoryGoal()
82
83 #limb interface - current angles needed for start move
84 self._l_arm = baxter_interface.Limb('left')
85 self._r_arm = baxter_interface.Limb('right')
86
87 #gripper interface - for gripper command playback
88 self._l_gripper = baxter_interface.Gripper('left', CHECK_VERSION)
89 self._r_gripper = baxter_interface.Gripper('right', CHECK_VERSION)
The request message that will hold the goal positions for both the limbs' action servers are created. The current joint and gripper positions are also captured.
92 if self._l_gripper.error():
93 self._l_gripper.reset()
94 if self._r_gripper.error():
95 self._r_gripper.reset()
96 if (not self._l_gripper.calibrated() and
97 self._l_gripper.type() != 'custom'):
98 self._l_gripper.calibrate()
99 if (not self._r_gripper.calibrated() and
100 self._r_gripper.type() != 'custom'):
101 self._r_gripper.calibrate()
The error()
method returns whether or not the gripper is in an error state. The possible cause of errors might be over/under-voltage, over/under-current, motor faults, etc. The calibrated() method returns a boolean, true if the gripper is already calibrated, while calibrate()
performs the actual calibration of the grippers. type()
returns the type of the gripper (custom, electric, etc). The calibration check is skipped if the gripper is a custom gripper.
103 #gripper goal trajectories
104 self._l_grip = FollowJointTrajectoryGoal()
105 self._r_grip = FollowJointTrajectoryGoal()
106
107 #param namespace
108 self._param_ns = '/rsdk_joint_trajectory_action_server/'
109
110 #gripper control rate
111 self._gripper_rate = 20.0 # Hz
The request message that will hold the goal positions for both the grippers are created. The namespace is modified and the grippers' control rates are modified.
113 def _execute_gripper_commands(self):
114 start_time = rospy.get_time()
115 r_cmd = self._r_grip.trajectory.points
116 l_cmd = self._l_grip.trajectory.points
117 pnt_times = [pnt.time_from_start.to_sec() for pnt in r_cmd]
118 end_time = pnt_times[-1]
119 rate = rospy.Rate(self._gripper_rate)
120 now_from_start = rospy.get_time() - start_time
The r_cmd
and l_cmd
variables holds the grippers' positions that were parsed from the file. The corresponding time stamps are read into pnt_times
variable.
121 while(now_from_start < end_time + (1.0 / self._gripper_rate) and
122 not rospy.is_shutdown()):
123 idx = bisect(pnt_times, now_from_start) - 1
124 if self._r_gripper.type() != 'custom':
125 self._r_gripper.command_position(r_cmd[idx].positions[0])
126 if self._l_gripper.type() != 'custom':
127 self._l_gripper.command_position(l_cmd[idx].positions[0])
128 rate.sleep()
129 now_from_start = rospy.get_time() - start_time
idx
holds the index of the timestamp from the file, relative to the current one. It is important to note that the velocities at which the Joint Positions were traversed are preserved during the playback. The corresponding grippers' positions are then commanded to the action server.
131 def _clean_line(self, line, joint_names):
132 """
133 Cleans a single line of recorded joint positions
134
135 @param line: the line described in a list to process
136 @param joint_names: joint name keys
137
138 @return command: returns dictionary {joint: value} of valid commands
139 @return line: returns list of current line values stripped of commas
140 """
141 def try_float(x):
142 try:
143 return float(x)
144 except ValueError:
145 return None
146 #convert the line of strings to a float or None
147 line = [try_float(x) for x in line.rstrip().split(',')]
148 #zip the values with the joint names
149 combined = zip(joint_names[1:], line[1:])
150 #take out any tuples that have a none value
151 cleaned = [x for x in combined if x[1] is not None]
152 #convert it to a dictionary with only valid commands
153 command = dict(cleaned)
154 return (command, line,)
This try_float()
function replaces the contents of the variable passed with float and none
values. Then, a tuple is constructed by zipping the names passed with the values from the variable line
. This may contain valid values as well as none
values . After removing the none
values, a dictionary is constructed. This dictionary is then parsed to create valid Joint commands.
156 def _add_point(self, positions, side, time):
157 """
158 Appends trajectory with new point
159
160 @param positions: joint positions
161 @param side: limb to command point
162 @param time: time from start for point in seconds
163 """
164 #creates a point in trajectory with time_from_start and positions
165 point = JointTrajectoryPoint()
166 point.positions = copy(positions)
167 point.time_from_start = rospy.Duration(time)
168 if side == 'left':
169 self._l_goal.trajectory.points.append(point)
170 elif side == 'right':
171 self._r_goal.trajectory.points.append(point)
172 elif side == 'left_gripper':
173 self._l_grip.trajectory.points.append(point)
174 elif side == 'right_gripper':
175 self._r_grip.trajectory.points.append(point)
This method creates a request message of the JointTrajectoryPoint
type and appends the goal position based on the side of the limb/gripper that is requesting. It compiles a list of Joint Positions as the trajectory.
177 def parse_file(self, filename):
178 """
179 Parses input file into FollowJointTrajectoryGoal format
180
181 @param filename: input filename
182 """
183 #open recorded file
184 with open(filename, 'r') as f:
185 lines = f.readlines()
186 #read joint names specified in file
187 joint_names = lines[0].rstrip().split(',')
188 #parse joint names for the left and right limbs
189 for name in joint_names:
190 if 'left' == name[:-3]:
191 self._l_goal.trajectory.joint_names.append(name)
192 elif 'right' == name[:-3]:
193 self._r_goal.trajectory.joint_names.append(name)
The lines are split into a list with ',' as the delimiter to extract the joint names. The arm of the current joint, found by stripping the "_<joint>" (last three characters) from the joint_name, is checked and stored.
195 def find_start_offset(pos):
196 #create empty lists
197 cur = []
198 cmd = []
199 dflt_vel = []
200 vel_param = self._param_ns + "%s_default_velocity"
201 #for all joints find our current and first commanded position
202 #reading default velocities from the parameter server if specified
203 for name in joint_names:
204 if 'left' == name[:-3]:
205 cmd.append(pos[name])
206 cur.append(self._l_arm.joint_angle(name))
207 prm = rospy.get_param(vel_param % name, 0.25)
208 dflt_vel.append(prm)
209 elif 'right' == name[:-3]:
210 cmd.append(pos[name])
211 cur.append(self._r_arm.joint_angle(name))
212 prm = rospy.get_param(vel_param % name, 0.25)
213 dflt_vel.append(prm)
214 diffs = map(operator.sub, cmd, cur)
215 diffs = map(operator.abs, diffs)
216 #determine the largest time offset necessary across all joints
217 offset = max(map(operator.div, diffs, dflt_vel))
218 return offset
The first commanded position and the current position for all the joints are captured. The default values that were loaded into the param server are read. The largest time offset necessary across all the joints is calculated.
220 for idx, values in enumerate(lines[1:]):
221 #clean each line of file
222 cmd, values = self._clean_line(values, joint_names)
223 #find allowable time offset for move to start position
224 if idx == 0:
225 start_offset = find_start_offset(cmd)
226 #add a point for this set of commands with recorded time
227 cur_cmd = [cmd[jnt] for jnt in self._l_goal.trajectory.joint_names]
228 self._add_point(cur_cmd, 'left', values[0] + start_offset)
229 cur_cmd = [cmd[jnt] for jnt in self._r_goal.trajectory.joint_names]
230 self._add_point(cur_cmd, 'right', values[0] + start_offset)
231 cur_cmd = [cmd['left_gripper']]
232 self._add_point(cur_cmd, 'left_gripper', values[0] + start_offset)
233 cur_cmd = [cmd['right_gripper']]
234 self._add_point(cur_cmd, 'right_gripper', values[0] + start_offset)
The non-float values are cleaned and a dictionary of valid Joint Commands is returned in the _clean_line()
function as explained above. The time offset for move to start position is also calculated. The parsed set of recorded commands along with their recorded time is added to the goal list.
236 def start(self):
237 """
238 Sends FollowJointTrajectoryAction request
239 """
240 self._left_client.send_goal(self._l_goal)
241 self._right_client.send_goal(self._r_goal)
242 self._execute_gripper_commands()
This function sends the FollowJointTrajectoryAction
request message which includes the recorded positions and their corresponding time, to the Joint Trajectory Action Server.
244 def stop(self):
245 """
246 Preempts trajectory execution by sending cancel goals
247 """
248 if (self._left_client.gh is not None and
249 self._left_client.get_state() == actionlib.GoalStatus.ACTIVE):
250 self._left_client.cancel_goal()
251
252 if (self._right_client.gh is not None and
253 self._right_client.get_state() == actionlib.GoalStatus.ACTIVE):
254 self._right_client.cancel_goal()
255
256 #delay to allow for terminating handshake
257 rospy.sleep(0.1)
The Trajectory execution is stopped by sending cancel goals to the Joint trajectory action server
.
259 def wait(self):
260 """
261 Waits for and verifies trajectory execution result
262 """
263 #create a timeout for our trajectory execution
264 #total time trajectory expected for trajectory execution plus a buffer
265 last_time = self._r_goal.trajectory.points[-1].time_from_start.to_sec()
266 time_buffer = rospy.get_param(self._param_ns + 'goal_time', 0.0) + 1.5
267 timeout = rospy.Duration(last_time + time_buffer)
268
269 l_finish = self._left_client.wait_for_result(timeout)
270 r_finish = self._right_client.wait_for_result(timeout)
271 l_result = (self._left_client.get_result().error_code == 0)
272 r_result = (self._right_client.get_result().error_code == 0)
273
274 #verify result
275 if all([l_finish, r_finish, l_result, r_result]):
276 return True
277 else:
278 msg = ("Trajectory action failed or did not finish before "
279 "timeout/interrupt.")
280 rospy.logwarn(msg)
281 return False
This method waits for the completion of the trajectory execution by Joint trajectory action server
.
284 def main():
285 """RSDK Joint Trajectory Example: File Playback
286
287 Plays back joint positions honoring timestamps recorded
288 via the joint_recorder example.
289
290 Run the joint_recorder.py example first to create a recording
291 file for use with this example. Then make sure to start the
292 joint_trajectory_action_server before running this example.
293
294 This example will use the joint trajectory action server
295 with velocity control to follow the positions and times of
296 the recorded motion, accurately replicating movement speed
297 necessary to hit each trajectory point on time.
298 """
299 epilog = """
300 Related examples:
301 joint_recorder.py; joint_position_file_playback.py.
302 """
303 arg_fmt = argparse.RawDescriptionHelpFormatter
304 parser = argparse.ArgumentParser(formatter_class=arg_fmt,
305 description=main.__doc__,
306 epilog=epilog)
307 parser.add_argument(
308 '-f', '--file', metavar='PATH', required=True,
309 help='path to input file'
310 )
311 parser.add_argument(
312 '-l', '--loops', type=int, default=1,
313 help='number of playback loops. 0=infinite.'
314 )
315 # remove ROS args and filename (sys.arv[0]) for argparse
316 args = parser.parse_args(rospy.myargv()[1:])
The name source file to read the Joint Position is captured along with the number of times the trajectory has to be looped from the command line.
318 print("Initializing node... ")
319 rospy.init_node("rsdk_joint_trajectory_file_playback")
320 print("Getting robot state... ")
321 rs = baxter_interface.RobotEnable(CHECK_VERSION)
322 print("Enabling robot... ")
323 rs.enable()
324 print("Running. Ctrl-c to quit")
325
326 traj = Trajectory()
327 traj.parse_file(path.expanduser(args.file))
328 #for safe interrupt handling
329 rospy.on_shutdown(traj.stop)
330 result = True
331 loop_cnt = 1
332 loopstr = str(args.loops)
333 if args.loops == 0:
334 args.loops = float('inf')
335 loopstr = "forever"
336 while (result == True and loop_cnt <= args.loops
337 and not rospy.is_shutdown()):
338 print("Playback loop %d of %s" % (loop_cnt, loopstr,))
339 traj.start()
340 result = traj.wait()
341 loop_cnt = loop_cnt + 1
342 print("Exiting - File Playback Complete")
343
344 if __name__ == "__main__":
345 main()
The node is initialized. An instance of the Trajectory
class is created. The parse_file()
method is called to extract the recorded Joint Positions and their corresponding timestamps as explained above. The start()
method is called to send the FollowJointTrajectory
request message to the Joint Trajectory Action server. This loops for the user specified number of times.