Contents
Demo Mode - Code Walkthrough
The Demo mode represents a simple way of generating and displaying an interactive UI on the Baxter Research robot.
The core of this program is an Image Processing library that converts between PNG, PIL (Python Imaging Library), openCV, and ROS Message formats for images. Beyond that is a simple UI structure, with Window and Button classes that uses the aforementioned library, in co-ordination with ros messages from the robot, to generate and display the User Interface.
Image Processing - Displaying the pretty pictures
Let's start with the imports
30 import cv
31 import cv_bridge
32 import PIL
PIL is used for loading and integrating PNG asset images. cv and cv_bridge are used for image manipulation and conversion to ros messages.
XDisplay - Put it on the Big Screen
As with other functions interacting with Baxter's face, we will need to publish an Image message to the /robot/xdisplay topic to update the screen image.
To that end, our UI Controller, the highest element in the UI system, has a ui.xdisp property containing a publisher to that topic:
54 self.xdisp = rospy.Publisher('/robot/xdisplay', ImageMsg, latch=True)
Short section, but it had to be done before we go on to the rest of this...
PIL to ROS - From humble beginnings
Let's start by covering taking a .png image asset and converting it into a Ros Message
The places we do this in the code are pretty spread out, so, for a simple case, let's load a single .png file, and
from PIL import Image
from baxter_demo_ui import img_proc
#image_path = "path/to/your/image.png"
img = Image.open(image_path)
gen_cv(img)
gen_msg(img)
Now, here is the code for gen_cv (in img_proc.py, which converts a single image to from PIL to an openCV image format
54 def gen_cv(img):
55 return PIL_to_cv(rgb_to_bgr(img))
56
57 def rgb_to_bgr(img):
58 r, g, b = img.split()
59 return PIL.Image.merge('RGB', (b, g, r))
60
61 def PIL_to_cv(img):
62 ci = cv.CreateImage((1024, 600), cv.IPL_DEPTH_8U, 3)
63 cv.SetData(ci, img.tostring(), 3072)
64 return ci
To start, gen_cv takes a PIL image, and converts if from RGB to BGR, the accepted encoding format for publishing to xdisplay. We then create an empty cv_mat object and copy the data from the BGR image into that object.
To convert this from a cv_mat image into a ros message, we then have to use the cv_bridge function cv_to_imgmsg:
69 def cv_to_msg(img):
70 return cv_bridge.CvBridge().cv_to_imgmsg(img, encoding='bgr8')
With this code, you can take any number of canned images and push them to the screen in your scripts, but this by itself is not terribly useful or modular.
Anatomy of a Button - Layering images and text in PIL
In demo_buttons.py, we generate images for the different buttons available in the UI by combining icons, with an inherent alpha layer, with different button state images and drawing text on top of them.
We generate the images with our gen_image function
122 def gen_image(self, btn, icon, offset):
123 img = btn.resize(self._size, Image.ANTIALIAS)
124 if icon != None:
125 img.paste(icon, tuple(offset), icon)
126 return img
What this is doing is resizing the button itself, which is a PIL Image object loaded from a .png, to the appropriate size listed in the config, and layering the icon, another PIL image, on top of it at the given offset. The reason we call tuple here is just that we are drawing the offset from a json file and it comes in as a list rather than a tuple. The addition of the second icon term specifies the alpha mask for the image. If the image contains its own alpha layer, then you can simply use the image as its own mask.
To cover the addition of text, we need to look into demo_windows.py for a moment.
We start by importing a few new items from PIL:
32 def gen_image(self, btn, icon, offset):
33 from PIL import (
34 Image,
35 ImageDraw,
36 ImageFont
37 )
Then, when drawing the buttons for our window, we create a draw() instance using ImageDraw.draw()and pass this through to the button's draw function. This is what we will use to layer the text on the image.
173 tmp = copy(self._bg['img'])
174 d = ImageDraw.draw(tmp)
175 for name, btn in self._buttons.items():
176 if btn.name == sel_btn:
177 if disabled:
178 state = 'pressed'
179 else:
180 state = 'selected'
181 else:
182 if disabled:
183 state = 'disabled'
184 else:
185 state = 'idle'
186 btn.draw(tmp, d, state)
Now, in the initialization of the Button, we denote a font parameter, derived from the .ttf for that font. (share_path is a passed argument that just points to the share/ directory of this package)
79 self._font = ImageFont.truetype('%s/Arial.ttf' %
80 share_path, 30)
Finally, in the draw method of the Button class:
135 def draw(self, img, draw, state):
136 tmp = self._imgs[state]
137 img.paste(tmp, self._offset, tmp)
138 self.draw_label(draw)
128 def draw_label(self, draw):
129 label_x = (self._offset[0] + self._size[0] / 2 -
130 draw.textsize(self._label, self._font)[0] / 2)
131 label_y = self._offset[1] + self._label_y
132 draw.text((label_x, label_y), self._label,
133 fill='white', font=self._font)
Here, we grab the stored button image (with frame and icon) associated with the provided state, and paste it onto the window image, before calling draw_label. In draw_label, we simply derive x and y locations for the label and call the ImageDraw.draw object's text() function. (Note that self._label is the actual label text)
Anatomy of a Window - Handling of states
The function of Windows in this UI is to handle the State of the interface. The UI state is stored hierarchically, where the UI controller has a selected_window, which has a selected_button, and a number of Parents.
The Parents are used for 2 things:
- When a user pressed "Back" on a window, it will attempt to set its parent as the new active window and re-draw.
- When a window is drawn, it calls the draw() function for all of its parents recursively, and has them draw their "disabled" state to show that they are not the focus, before overlaying its current image to that base.
383 def _draw_window(self, img, window, selected=True):
384 if self.windows[window].parent:
385 img = self._draw_window(img,
386 window=self.windows[window].parent,
387 selected=False)
388 return self.windows[window].draw(img, selected)
Selected, here, refers to whether or not the window being drawn is the current active window.
The main visual functionality of the UI comes from changing which window is drawn, in what state.
Interaction - Controlling the pretty pictures through use of buttons and a knob
The Demo Mode UI responds to the navigators on both arms, and incorporates the gripper_cuff example, which enables interaction with the cuff and cuff buttons.
We use the baxter_interface and baxter_dataflow signals to connect our UI interaction functions with on_change signals for the Buttons and wheel on the arm navigators.
136 self._navigators = {'left': Navigator('left'),
137 'right': Navigator('right')}
138
139 self._listeners_connected = False
140 self._connect_listeners()
181 def _connect_listeners(self):
182 # Navigator OK Button
183 self._navigators['left'].button0_changed.connect(self._left_ok_pressed)
184 self._navigators['right'].button0_changed.connect(
185 self._right_ok_pressed)
186
187 # Navigator Wheel
188 self._navigators['left'].wheel_changed.connect(self._left_wheel_moved)
189 self._navigators['right'].wheel_changed.connect(
190 self._right_wheel_moved)
191
192 # Navigator Baxter Button
193 self._navigators['left'].button2_changed.connect(self._enable)
194 self._navigators['right'].button2_changed.connect(self._enable)
195
196 # Navigator Back Button
197 self._navigators['left'].button1_changed.connect(self.back)
198 self._navigators['right'].button1_changed.connect(self.back)
199
200 self._listeners_connected = True
So now, the buttons and wheel for each arm are attached to these UI functions.
The listener functions for buttons are very simple, and pretty much react when the button state changes to True
440 def back(self, v):
441 if v == True:
442 self.error_state = False
443 if self.active_window.parent:
444 self.kill_examples()
445 self.set_active_window(self.active_window.parent)
446 self.draw()
The listener for the navigators is similar, but it responds regardless of what it is changed to, incrementing or decrementing the scroll in the UI depending on the signal received.
394 def _left_wheel_moved(self, v):
395 self._wheel_moved(v, 'left')
396
397 def _right_wheel_moved(self, v):
398 self._wheel_moved(v, 'right')
399
400 def _wheel_moved(self, v, side):
401 if not self._active_example and self._wheel_ok:
402 if v > 0:
403 self.scroll(1)
404 else:
405 self.scroll(-1)
406 self._wheel_ok = False
407 rospy.Timer(rospy.Duration(.01), self._set_wheel_ok, oneshot=True)
408
409 def scroll(self, direction):
410 self.active_window.scroll(direction)
411 self.draw()
Important note - Enable Loop and Avoiding E-Stop Craziness
When you are in the demo mode, we assume that you want the robot to be enabled, so we have a loop in demo_ui that keeps the robot enabled, and reacts to robot state changes.
165 rospy.Timer(rospy.Duration(.5), self._enable)
The "_enable" function will attempt to enable the robot, and will throw an error screen up if it fails.
468 def _enable(self, v=1):
469 if v == 1 and not self._status.state().enabled:
470 try:
471 self._status.enable()
472 except:
473 self.error_screen('stopped')
474 return False
475 if not self._status.state().enabled:
476 self.error_screen('no_enable')
477 return False
478 self._enable_cuff()
479 return True
Unfortunately, when the robot is E-Stopped, messages from the sensors will be all messed up, and therefore the listeners we set up for the scroll wheels and buttons will send bad data, and throw errors when they try to call the callback functions.
To fix this issue, we set up another listener to the Estop button, which will disconnect and reconnect the listeners.
143 self._estop_sub = rospy.Subscriber('robot/state', AssemblyState,
144 self._estop_callback)
173 def _estop_callback(self, msg):
174 if self._estop_state != msg.stopped:
175 self._estop_state = msg.stopped
176 if msg.stopped and self._listeners_connected:
177 self._disconnect_listeners()
178 elif not msg.stopped and not self._listeners_connected:
179 self._connect_listeners()
202 def _disconnect_listeners(self):
203 # Navigator OK Button
204 self._navigators['left'].button0_changed.disconnect(
205 self._left_ok_pressed)
206 self._navigators['right'].button0_changed.disconnect(
207 self._right_ok_pressed)
208
209 # Navigator Wheel
210 self._navigators['left'].wheel_changed.disconnect(
211 self._left_wheel_moved)
212 self._navigators['right'].wheel_changed.disconnect(
213 self._right_wheel_moved)
214
215 # Navigator Baxter Button
216 self._navigators['left'].button2_changed.disconnect(self._enable)
217 self._navigators['right'].button2_changed.disconnect(self._enable)
218
219 # Navigator Back Button
220 self._navigators['left'].button1_changed.disconnect(self.back)
221 self._navigators['right'].button1_changed.disconnect(self.back)
222
223 self._listeners_connected = False
Demo Functions - The Stuff We Do
We use a couple of nifty tricks to get the robot to run through our functions when buttons are pressed. Here I'm going to cover a couple of these nifty tricks.
Processes - Make It Do What It Do, CLI-Style
There is a class defined in baxter_procs called RosProcess that will allow you to spawn off a command-line process using subprocess.Popen. This is how we run most of the example programs.
55 class RosProcess():
56 def __init__(self, command, quiet=True, get_output=False, shell=True):
57 if shell == True:
58 self.process = Popen(command, shell=True, stdout=None,
59 stdin=PIPE, stderr=STDOUT)
60 else:
61 self.process = Popen(command, shell=False, stdout=None,
62 stdin=PIPE, stderr=STDOUT)
63 self.quiet = quiet
64 self.get_output = get_output
65
66 def write(self, stdin):
67 self.process.stdin.write(stdin)
68
69 def run(self):
70 stdout, stderr = self.process.communicate()
71 if self.quiet == False:
72 print stdout
73 if self.get_output:
74 return stdout
75 else:
76 return self.process.returncode
This is used in demo_functions to run most of the example programs (such as the arm_puppet)
100 def puppet(ui, side):
101 proc = RosProcess('rosrun baxter_examples '
102 'joint_velocity_puppet.py -l %s' % side)
Camera Overlay - Live on Baxter's Face!
The other nifty thing we do here is, in the camera example, we overlay the output from the cameras onto the current window image.
75 def camera_disp(ui, cam_side):
76 def _display(camera, name):
77 for cam in ui.cameras:
78 ui.cameras[cam].close()
79 camera.resolution = (640, 400)
80 camera.open()
81
82 def _cam_to_screen(msg):
83 newMsg = overlay(ui.img, msg, (1024, 600), (205, 140, 640, 400))
84 ui.xdisp.publish(newMsg)
85
86 ui.cam_sub = rospy.Subscriber(
87 'cameras/%s_camera/image' % cam_side,
88 ImageMsg,
89 _cam_to_screen)
90
91 camera = ui.cameras[cam_side]
92 _display(camera, '%s_camera' % cam_side)
Each "cam" in ui.cameras is a CameraController from baxter_interface, corresponding to one of the physical cameras on baxter. This function will close all of the cameras when called in order to ensure that the one we want to view can be opened (as you cannot have all 3 cameras open simultaneously)
The way this function works is by linking a subscriber to the local function _cam_to_screen, which will take the UI's currently published image message and overlay the the current camera image on top of it in a given bounding-box.
The overlay() function lives in img_proc, and will convert the image message back to a cv_mat image, and copy the camera image on top of it.
77 def overlay(old_img, new_img, original_size, new_rect):
78 tmp = cv.CreateImage(original_size, cv.IPL_DEPTH_8U, 3)
79 cv.Copy(old_img, tmp)
80 sub = cv.GetSubRect(tmp, new_rect)
81 cv_img = msg_to_cv(new_img)
82 cv.Copy(cv_img, sub)
83 return cv_to_msg(tmp)