Create a ROS Driver Package – Write a Python ROS Wrapper [2/4]

This Python ROS wrapper tutorial is the second post [2/4] of a series on how to create a ROS driver package.

Make sure you have read the introduction [1/4] of this series, where I explain what is a ROS wrapper, and give some details about the driver we’ll use here.

Note: after reading the introduction, if you prefer to directly write in Cpp and skip the Python part, then go to the next tutorial on how to write a Cpp ROS wrapper [3/4] Both Python and Cpp tutorials produce the same output.


You have a Python driver code and want to add a ROS interface to it, so you can control it from your ROS application? Well, in this case you need to write a Python ROS wrapper.

In this tutorial I’ll show you exactly how to do that. We’ll create a basic and dummy driver, and then focus on writing the ROS wrapper step by step.

Even if this tutorial is oriented towards writing a wrapper for a hardware driver, you can use it to write a wrapper for anything (library, module, etc.) you want.


You are learning ROS?

Check out ROS For Beginners and learn ROS step by step.


Alright, let’s get started!

The Python hardware driver

Before writing a Python ROS wrapper, let’s create a simple hardware driver interface for a speed controlled motor.

class MotorDriver:

    def __init__(self, max_speed=10):
        """
        Init communication, set default settings, ...
        """
        ...

    def set_speed(self, speed):
        """
        Give a speed that the motor will try to reach.
        """
        ...

    def stop(self):
        """
        Set speed to 0 and thus stop the motor
        """
        ...

    def get_speed(self):
        """
        Return current speed
        """
        ...

    def get_status(self):
        """
        Get hardware information from the motor
        """
        ...

Implementing the code is your task to do, or you could find an already existing driver online.

In order to be able to use this driver in this tutorial – without an actual piece of hardware – let’s fill the MotorDriver class with a “dummy” behavior.

class MotorDriver:

    def __init__(self, max_speed=10):
        """
        Init communication, set default settings, ...
        """
        self.max_speed = max_speed
        self.current_speed = 0
        self.voltage = 12
        self.temperature = 47

    def set_speed(self, speed):
        """
        Give a speed that the motor will try to reach.
        """
        if speed <= self.max_speed:
            self.current_speed = speed
        else:
            self.current_speed = self.max_speed

    def stop(self):
        """
        Set speed to 0 and thus stop the motor
        """
        self.current_speed = 0

    def get_speed(self):
        """
        Return current speed
        """
        return self.current_speed

    def get_status(self):
        """
        Get hardware information from the motor
        """
        return {
            'temperature': self.temperature,
            'voltage': self.voltage
        }

Again, the implementation code is just here for this tutorial’s purpose. The real driver will actually talk to the hardware!

Now, let’s write the Python ROS wrapper for this driver.

Setup your Python ROS wrapper node

Without OOP

#!/usr/bin/env python

import rospy
from motor_driver import MotorDriver

if __name__ == "__main__":
    rospy.init_node("motor_driver")

    motor = MotorDriver(max_speed=8)
    rospy.on_shutdown(motor.stop)

    rospy.loginfo("Motor driver is now started, ready to get commands.")
    rospy.spin()

Even if this code doesn’t do much, it’s already a working ROS wrapper.

So, how is structured this minimal Python ROS wrapper?

#!/usr/bin/env python

import rospy
from motor_driver import MotorDriver

if __name__ == "__main__":
    rospy.init_node("motor_driver")

First we include the Python module which contains the driver. The import line depends on where you have placed your driver. Here, it’s on the same folder so it’s quite easy (later on this post series you’ll see what to do if you want to create your driver in the src/ folder of your package, or in a different package).

Then we initialize the node that we call “motor_driver”.

    motor = MotorDriver(max_speed=8)
    rospy.on_shutdown(motor.stop)

    rospy.loginfo("Motor driver is now started, ready to get commands.")
    rospy.spin()

Here we create an instance of the driver, and initialize it.

Thanks to the function rospy.on_shutdown(), we can tell the node to execute some final actions before it shuts down (shutdown hook). So, if the node is shut down we want to stop the motor first. You can just pass the name of the function – motor.stop, without the parenthesis.

A quick message to let the user know that the driver is setup, and then we call rospy.spin(). This will keep the program alive (and thus the driver) while the node is still running.

With those few lines we have an already working basic ROS wrapper. Create a node, initialize the driver, and call rospy.spin().

Setup ROS wrapper – with OOP

This code has the exact same behavior, but it’s written with OOP. Check out OOP with ROS in Python to refresh your memory, or if you’re not familiar with it yet.

#!/usr/bin/env python

import rospy
from motor_driver import MotorDriver

class MotorDriverROSWrapper:

    def __init__(self):
        self.motor = MotorDriver(max_speed=8)

    def stop(self):
        self.motor.stop()
        

if __name__ == "__main__":
    rospy.init_node("motor_driver")

    motor_driver_wrapper = MotorDriverROSWrapper()
    rospy.on_shutdown(motor_driver_wrapper.stop)

    rospy.loginfo("Motor driver is now started, ready to get commands.")
    rospy.spin()

From now on we’ll continue with object oriented programming.

Add functionalities to your Python ROS wrapper

Get default settings from ROS params

The MotorDriver init function takes an argument to set the max speed setting. For now we have an hard-coded value. Let’s get this setting from a ROS parameter!

...

def __init__(self):
    max_speed = rospy.get_param("~max_speed", 8)
    self.motor = MotorDriver(max_speed=max_speed)

...

Here we are getting the max speed setting from a ROS param, and we pass this value to the MotorDriver init function.

If you’re not so familiar with ROS params and rospy, check out how to get and set parameters from your code.

Receive speed command from a ROS topic (Subscriber)

We’ll use a rospy subscriber to get speed commands. This subscriber will wrap the set_speed() method.

...

from std_msgs.msg import Int32

class MotorDriverROSWrapper:

    def __init__(self):
        ...
        rospy.Subscriber("speed_command", Int32, self.callback_speed_command)

    ...

    def callback_speed_command(self, msg):
        self.motor.set_speed(msg.data)

...

When you send an integer value to the “speed_command” topic, this value will be passed to the motor driver to set the actual motor speed.

Note: we could’ve also used a ROS service here instead of the subscriber. For choosing that it really depends on your requirements, update frequency, whether the client (node which sends the command) needs to get a confirmation, etc. I just chose a subscriber so we can cover more ROS interfaces in this tutorial.

Stop the motor with a ROS Service

Let’s use a rospy Service server to wrap the stop() method.

...

from std_srvs.srv import Trigger

class MotorDriverROSWrapper:

    def __init__(self):
        ...
        rospy.Service("stop_motor", Trigger, self.callback_stop)

...
        
    def callback_stop(self, req):
        self.stop()
        return {"success": True, "message": "Motor has been stopped"}

...

Sometimes you’ll need to create custom message and service definitions if you can’t find an existing one that does the work. For this example, the Trigger service definition is just what we need.

Send status and current speed to a ROS topic (Publisher)

Now that we are able to set default settings, and send commands to the motor, let’s see how we can receive useful information from it.

To get the status and the current speed, we’ll use 2 rospy Publishers. Each publisher will be run with a rospy Timer, in order to publish at a fixed rate. They will wrap the get_speed() and get_status() methods from the driver.

...

from diagnostic_msgs.msg import DiagnosticStatus
from diagnostic_msgs.msg import KeyValue

class MotorDriverROSWrapper:

    def __init__(self):
        ...
        publish_current_speed_frequency = rospy.get_param("~publish_current_speed_frequency", 5.0)
        publish_motor_status_frequency = rospy.get_param("~publish_motor_status_frequency", 1.0)
        
        ...

        self.current_speed_pub = rospy.Publisher("current_speed", Int32, queue_size=10)
        self.motor_status_pub = rospy.Publisher("motor_status", DiagnosticStatus, queue_size=1)

        rospy.Timer(rospy.Duration(1.0/publish_current_speed_frequency), self.publish_current_speed)
        rospy.Timer(rospy.Duration(1.0/publish_motor_status_frequency), self.publish_motor_status)

    def publish_current_speed(self, event=None):
        self.current_speed_pub.publish(self.motor.get_speed())

    def publish_motor_status(self, event=None):
        status = self.motor.get_status()
        data_list = []
        for key in status:
            data_list.append(KeyValue(key, str(status[key])))

        msg = DiagnosticStatus()
        msg.values = data_list

        self.motor_status_pub.publish(msg)

...

We use the DiagnosticStatus message from the diagnostic_msgs package. This message definition is quite appropriate for sending a hardware status of your hardware. Note: here we just fill the message with KeyValue messages but there are more fields you can use. For a complete overview check out the message definitions.

With a few parameters, publishers, and timers, we are able to wrap the methods which get the data from the motor.

Complete Python ROS wrapper code

All the code is now ready! With this wrapper you can control the motor, set default settings, and get useful data.

Here is the complete Python ROS wrapper code for the driver.

#!/usr/bin/env python

import rospy
from motor_driver import MotorDriver

from std_msgs.msg import Int32
from std_srvs.srv import Trigger
from diagnostic_msgs.msg import DiagnosticStatus
from diagnostic_msgs.msg import KeyValue

class MotorDriverROSWrapper:

    def __init__(self):
        max_speed = rospy.get_param("~max_speed", 8)
        publish_current_speed_frequency = rospy.get_param("~publish_current_speed_frequency", 5.0)
        publish_motor_status_frequency = rospy.get_param("~publish_motor_status_frequency", 1.0)
        
        self.motor = MotorDriver(max_speed=max_speed)

        rospy.Subscriber("speed_command", Int32, self.callback_speed_command)
        rospy.Service("stop_motor", Trigger, self.callback_stop)

        self.current_speed_pub = rospy.Publisher("current_speed", Int32, queue_size=10)
        self.motor_status_pub = rospy.Publisher("motor_status", DiagnosticStatus, queue_size=1)

        rospy.Timer(rospy.Duration(1.0/publish_current_speed_frequency), self.publish_current_speed)
        rospy.Timer(rospy.Duration(1.0/publish_motor_status_frequency), self.publish_motor_status)

    def publish_current_speed(self, event=None):
        self.current_speed_pub.publish(self.motor.get_speed())

    def publish_motor_status(self, event=None):
        status = self.motor.get_status()
        data_list = []
        for key in status:
            data_list.append(KeyValue(key, str(status[key])))

        msg = DiagnosticStatus()
        msg.values = data_list

        self.motor_status_pub.publish(msg)

    def stop(self):
        self.motor.stop()

    def callback_speed_command(self, msg):
        self.motor.set_speed(msg.data)
        
    def callback_stop(self, req):
        self.stop()
        return {"success": True, "message": "Motor has been stopped"}

if __name__ == "__main__":
    rospy.init_node("motor_driver")

    motor_driver_wrapper = MotorDriverROSWrapper()
    rospy.on_shutdown(motor_driver_wrapper.stop)

    rospy.loginfo("Motor driver is now started, ready to get commands.")
    rospy.spin()

This code example will help you when you need to create your own Python ROS wrapper. As you can see, you can basically wrap everything with a simple combination of ROS core functionalities: parameters, topics, services, (actions), timers, etc.

Now, continue the “Create your ROS driver package” series with those tutorials:

Note: you can skip the Cpp part [3/4] if you want, and go straight ahead to the test & package tutorial [4/4].

Did you find this tutorial useful?

Do you want to learn how to program with ROS?

If yes, this course is for you:

ROS For Beginners - A Step By Step Course

>> ROS For Beginners - A Step By Step Course <<