Skip to content

ROS2 project from development to deployment, Part 1: ROS project

The demo project has three package
- pkg_interface: message and service definition
- pkg_server: Expose a service that use the message from pkg_interface
- pkg_client: Call the service from pkg_server

Note

The source code is available at GitHub


pkg_interface

1
2
3
4
5
6
7
.
├── CMakeLists.txt
├── msg
│   └── Demo.msg
├── package.xml
└── srv
    └── Demo.srv
CMakeLists.txt
CMakeLists.txt
cmake_minimum_required(VERSION 3.8)
project(pkg_interface)

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()

# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)

set(MESSAGES
  "msg/Demo.msg"
)

set(SERVICES 
  "srv/Demo.srv"
)


rosidl_generate_interfaces(${PROJECT_NAME}
  ${MESSAGES}
  ${SERVICES} 
)


ament_export_dependencies(rosidl_default_runtime)

ament_package()
package.xml
package.xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>pkg_interface</name>
  <version>0.0.1</version>
  <description>Simple interface</description>
  <maintainer email="robo2020@gmail.com">user</maintainer>
  <license>Apache-2.0</license>

  <buildtool_depend>ament_cmake</buildtool_depend>

  <buildtool_depend>rosidl_default_generators</buildtool_depend>
  <exec_depend>rosidl_default_runtime</exec_depend>
  <member_of_group>rosidl_interface_packages</member_of_group>

  <export>
    <build_type>ament_cmake</build_type>
  </export>
</package>
Demo.srv
Demo.srv
1
2
3
---
string message
bool success

pkg_server

1
2
3
4
5
├── CMakeLists.txt
├── package.xml
└── pkg_server
    ├── __init__.py
    └── my_node.py
node
my_node.py
#!/usr/bin/env python3

import rclpy
from rclpy.node import Node
from pkg_interface.srv import Demo

TOPIC_NAME = 'my_service'

class MyNode(Node):
    def __init__(self):
        node_name="client"
        super().__init__(node_name)
        self.get_logger().info("Hello server")
        self.server = self.create_service(Demo, TOPIC_NAME, self.callback)

    def callback(self, request: Demo.Request, response: Demo.Response):
        self.get_logger().info('Service called')
        response.message = "Hello from server"
        return response



def main(args=None):
    rclpy.init(args=args)
    node = MyNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

pkg_client

1
2
3
4
5
6
7
├── CMakeLists.txt
├── launch
   └── client_server.launch.py
├── package.xml
└── pkg_client
    ├── __init__.py
    └── my_node.py
client node
my_node.py
#!/usr/bin/env python3

import rclpy
from rclpy.node import Node
from pkg_interface.srv import Demo

TOPIC_NAME = 'my_service'

class MyNode(Node):
    def __init__(self):
        node_name="client"
        super().__init__(node_name)
        self.get_logger().info("Hello client")
        self.client = self.create_client(Demo, TOPIC_NAME)
        self.client.wait_for_service()

        self.future = None
        self.call_service() 

    def call_service(self):
        self.req = Demo.Request()
        self.get_logger().info('Call serice')
        self.future = self.client.call_async(self.req)
        self.future.add_done_callback(self.callback)

    def callback(self, future):
        try:
            response: Demo.Response = future.result()
            self.get_logger().info('Service call success')
            self.get_logger().info(response.message)
        except Exception as e:
            self.get_logger().info('Service call failed %r' % (e,))


def main(args=None):
    rclpy.init(args=args)
    node = MyNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()