Build a ROS2 Data Pipeline With ROS2 Topics

In this tutorial I’ll show you how you can chain ROS2 topics and thus build a data pipeline through several nodes.

This is a pretty common use case in robotics: you get some data from a sensor, and you need to pass it through several parts of your applications. Each part needs the data to do its own thing, and some parts may modify the data for other parts as well.

For this example we’ll create 3 nodes:

  • Node 1 (pipeline_step_1): create a random float number between 0 and 10, and publish it.
  • Node 2 (pipeline_step_2): get this number, multiply it by 2, and publish it.
  • Node 3 (pipeline_step_3): get this number, round it, and publish it.

Here’s the graph we’ll get at the end:

ROS2 Data Pipeline with Topics

First we’ll write the 3 nodes, then build them, test them, and create a launch file to start them together.


You want to learn ROS2 efficiently?

Check out ROS2 For Beginners and learn ROS2 step by step, in 1 week.


>> Watch this video as an additional resource to this article:

After watching the video, subscribe to the Robotics Back-End Youtube channel so you don’t miss the next tutorials!

Write the nodes

Data Pipeline step 1

In this node we’ll create a random float number between 0 and 10, and publish it on the “data_1” topic. Here I chose to write the node in Python.

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node

from example_interfaces.msg import Float64
import random

class Node1(Node):
    def __init__(self):
        super().__init__("pipeline_step_1")
        self.pub_ = self.create_publisher(Float64, "data_1", 10)
        self.timer_ = self.create_timer(1.0, self.publish_data)

    def publish_data(self):
        msg = Float64()
        msg.data = random.uniform(0.0, 10.0)
        self.get_logger().info("Published: " + str(msg.data))
        self.pub_.publish(msg)

def main(args=None):
    rclpy.init(args=args)
    node = Node1()
    rclpy.spin(node)
    rclpy.shutdown()

if __name__ == "__main__":
    main()

If you’re not that confident with this code, check out how to write a minimal ROS2 node in Python, and also how to write a ROS2 Python publisher.

Here we use random.uniform() to get the random float number we want to start with.

With the timer we create, we publish on the “data_1” topic every 1 second – so, at 1 Hz.

And we also print on the terminal what we’ve just published, so it will be easier to debug.

Data Pipeline step 2

This node will subscribe to the “data_1” topic, process/transform the data, and publish the new data to the “data_2” topic. Here I’m writing the node in Cpp, because well, ROS2 communications are language agnostic, so you can use any language you want for your nodes.

#include "rclcpp/rclcpp.hpp"
#include "example_interfaces/msg/float64.hpp"

class Node2 : public rclcpp::Node
{
public:
    Node2() : Node("pipeline_step_2")
    {
        pub_ = this->create_publisher<example_interfaces::msg::Float64>("data_2", 10);
        sub_ = this->create_subscription<example_interfaces::msg::Float64>(
            "data_1", 10, std::bind(&Node2::callbackData, this, std::placeholders::_1));
    }

private:
    void callbackData(const example_interfaces::msg::Float64::SharedPtr msg)
    {
        auto new_msg = example_interfaces::msg::Float64();
        new_msg.data = msg->data * 2.0;
        RCLCPP_INFO(this->get_logger(), "Received: %lf, Published: %lf", msg->data, new_msg.data);
        pub_->publish(new_msg);
    }

    rclcpp::Publisher<example_interfaces::msg::Float64>::SharedPtr pub_;
    rclcpp::Subscription<example_interfaces::msg::Float64>::SharedPtr sub_;
};

int main(int argc, char **argv)
{
    rclcpp::init(argc, argv);
    auto node = std::make_shared<Node2>();
    rclcpp::spin(node);
    rclcpp::shutdown();
    return 0;
}

If you’re not that confident with this code, check out how to write a minimal ROS2 node in Cpp.

Here we create a publisher (to “data_2”) as well as a subscriber (to “data_1”).

In the “data_1” topic callback, we:

  • Process the data and transform it, here by multiplying it by 2.
  • Create a new Float64 message and fill it with this new data.
  • Publish the data to the “data_2” topic.
  • Print what we received and what we published to make debugging/monitoring easier.

Note that we didn’t create any rate to publish on the “data_2” topic, we directly publish from the callback function of the “data_1” topic.

Data Pipeline step 3

The final node of our pipeline: this one will subscribe to the “data_2” topic, process/transform the data, and publish the new data to the “data_3” topic. And I’ll write this node in Python again.

#!/usr/bin/env python3
import rclpy
from rclpy.node import Node

from example_interfaces.msg import Float64
from example_interfaces.msg import Int64

class Node3(Node):
    def __init__(self):
        super().__init__("pipeline_step_3")
        self.pub_ = self.create_publisher(Int64, "data_3", 10)
        self.sub_ = self.create_subscription(
            Float64, "data_2", self.callback_data, 10)

    def callback_data(self, msg):
        new_msg = Int64()
        new_msg.data = round(msg.data)
        self.get_logger().info("Received: " + str(msg.data) + 
                               ", Published: " + str(new_msg.data))
        self.pub_.publish(new_msg)


def main(args=None):
    rclpy.init(args=args)
    node = Node3()
    rclpy.spin(node)
    rclpy.shutdown()

if __name__ == "__main__":
    main()

So, we have a publisher to “data_3” and a subscriber to “data_2”.

Here again we do everything in the “data_2” callback function:

  • Process the data and modify it: here we round it and we get an integer instead of a float number.
  • Create a new message with a different type. We received a Float64 data, now we’re publishing an Int64.
  • Publish the new data.
  • Log.

The most important point here is that we’re using a different data type to pass the message to the next step of the pipeline.

Note: as this is the last step of our pipeline example, we could just print the result with a log, and not publish it. In your own application with your own nodes and data, you’ll have a good idea of what you should do.

Build and run the ROS2 data pipeline application

Add executables in setup.py for Python nodes, and in CMakeLists.txt for Cpp nodes. Then compile from your ROS2 workspace with colcon build.

Run the app in 3 different terminals

Open 3 terminals/sessions. If you already had 3 open terminals, make sure to source your ROS2 workspace in each one before you continue.

Let’s start all 3 nodes and see what we get.

Terminal 1:

$ ros2 run ros2_tutorials_py  node_1 
...
[INFO] [1594190744.946266831] [pipeline_step_1]: Published: 7.441288072582843
[INFO] [1594190745.950362887] [pipeline_step_1]: Published: 9.968039333074264
[INFO] [1594190746.944258673] [pipeline_step_1]: Published: 8.848880026052129
[INFO] [1594190747.945936611] [pipeline_step_1]: Published: 1.8232649263149414
...

Terminal 2:

$ ros2 run ros2_tutorials_cpp node_2 
...
[INFO] [1594190744.944747967] [pipeline_step_2]: Received: 7.441288, Published: 14.882576
[INFO] [1594190745.949126317] [pipeline_step_2]: Received: 9.968039, Published: 19.936079
[INFO] [1594190746.943588170] [pipeline_step_2]: Received: 8.848880, Published: 17.697760
[INFO] [1594190747.944386405] [pipeline_step_2]: Received: 1.823265, Published: 3.646530
...

Terminal 3:

$ ros2 run ros2_tutorials_py node_3 
...
[INFO] [1594190744.955152638] [pipeline_step_3]: Received: 14.882576145165686, Published: 15
[INFO] [1594190745.951406026] [pipeline_step_3]: Received: 19.93607866614853, Published: 20
[INFO] [1594190746.944646754] [pipeline_step_3]: Received: 17.697760052104258, Published: 18
[INFO] [1594190747.946918714] [pipeline_step_3]: Received: 3.646529852629883, Published: 4
...

Thanks to the logs you can see where the data goes, when it is received and sent, and how it is processed.

Now, if you get the list of all topics running in your graph with ros2 topic list:

$ ros2 topic list
/data_1
/data_2
/data_3
/parameter_events
/rosout

We find the topics “data_1”, “data_2” and “data_3”.

So, this is really great. From there you can:

  • From the terminal, listen to any topic with ros2 topic echo and see what’s going on.
  • Plug any new node to any of those topics. For example you want to make a more complex data pipeline: another node can subscribe to “data_2”, and then process it independently.

Start your data pipeline with a ROS2 launch file

For developing and debugging, starting your nodes from the terminal is great. However, if you want to create a real ROS2 application, you’ll have to use launch files. On top of the advantages of using a ROS2 launch file, you’ll also be able to start all your nodes at the exact same time, so all the steps of the pipeline will start working at the same time.

So, let’s write a simple launch file to start all 3 nodes.

from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    ld = LaunchDescription()

    node_1 = Node(
        package="ros2_tutorials_py",
        executable="node_1"
    )

    node_2 = Node(
        package="ros2_tutorials_cpp",
        executable="node_2"
    )

    node_3 = Node(
        package="ros2_tutorials_py",
        executable="node_3"
    )

    ld.add_action(node_1)
    ld.add_action(node_2)
    ld.add_action(node_3)

    return ld

Compile and run this launch file – here’s a ROS2 launch file tutorial if you don’t know how to do.

$ ros2 launch my_robot_bringup data_pipeline.launch.py 
[INFO] [launch]: All log files can be found below /home/ed/.ros/log/2020-07-08-09-55-37-919871-ed-vm-11593
[INFO] [launch]: Default logging verbosity is set to INFO
[INFO] [node_1-1]: process started with pid [11595]
[INFO] [node_2-2]: process started with pid [11597]
[INFO] [node_3-3]: process started with pid [11599]
[node_1-1] [INFO] [1594194939.304693502] [pipeline_step_1]: Published: 0.9131821923863725
[node_2-2] [INFO] [1594194939.305641498] [pipeline_step_2]: Received: 0.913182, Published: 1.826364
[node_3-3] [INFO] [1594194939.314622095] [pipeline_step_3]: Received: 1.826364384772745, Published: 2
[node_1-1] [INFO] [1594194940.292584583] [pipeline_step_1]: Published: 6.724756051691409
[node_2-2] [INFO] [1594194940.293238316] [pipeline_step_2]: Received: 6.724756, Published: 13.449512
[node_3-3] [INFO] [1594194940.294448648] [pipeline_step_3]: Received: 13.449512103382817, Published: 13

And well, your data pipeline is now fully ready! For production you may remove the info logs and only print the warn logs.

Conclusion

You can now build a complete data pipeline using ROS2 nodes and topics.

The ROS2 architecture and tools bring you many advantages. You can:

  • Write any step or the pipeline (node) in any language you want.
  • Debug each step from the terminal with the ros2 topic command line tool.
  • Start only a few steps of your pipeline.
  • Add new steps (nodes) at the beginning, end, or anywhere else in the pipeline.

Want to learn how to program with ROS2?

Don't miss this opportunity:

ROS2 For Beginners - Step by Step Course


[NEW] ROS 2 Book


Or, learn with a video course