Skip to content

Zenoh cpp bindings

ROS / ros world / zenoh

Install

Download deb's file from github release page

Note

zeboh cpp binding depend in zeno_pico or zeno_c


Code from Zenoh tutorial

session
1
2
3
4
5
6
7
8
9
#include <zenoh.hxx>

int main(){
    zenoh:config c;
    auto z = zenoh::expect<zenoh::Session>(zenoh::open(std::move(c)));


    return 0;
}
put
//primitive value
z.put("my/expression", "value");
publisher
auto p = zeno::expect(z.declare_publisher("my/expression"));
p.put(my_data);

zeno encoding

  • json
  • png
  • and more

todo:// 42:36

subscriber
1
2
3
auto s = zenoh::expect<zenoh::Subscriber>(
    z.declare_subscriber("key", callback)
);

subscriber to multi

"key/**"

QoS

todo: 44:47 todo: 46:26 (opts)

Build

todo: 47:33


Demo: pub / sub

Pub Sub data using zenoh

pub_sub.cpp
#include <iostream>
#include <thread>
#include "zenoh.hxx"

using namespace zenoh;
void task1()
{
    Config config = Config::create_default();
    auto session = Session::open(std::move(config));
    // auto publisher = session.declare_publisher(KeyExpr("demo/example/simple"));

    for (int i = 0; i < 5; ++i) {
        // Publish using session.put
        session.put(KeyExpr("demo/example/simple"), Bytes("Simple from session.put! i=" + std::to_string(i)));

        // Publish using publisher.put
        // publisher.put("Simple from publisher.put!");

        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}



int main()
{
    std::thread t1(task1);
    Config config = Config::create_default();
    auto session = Session::open(std::move(config));
    auto subscriber = session.declare_subscriber(
        KeyExpr("demo/example/simple"),
        [](const Sample &sample)
        {
            std::cout << "Received: " << sample.get_payload().as_string() << std::endl;
        },
        closures::none);

    t1.join(); // Wait for task1 to finish

    std::cout << "Demo completed.\n";
    return 0;
}
CMakeLists.txt
cmake_minimum_required(VERSION 3.10)
project(zenoh_ws)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

find_package(zenohpico) 
find_package(zenohcxx REQUIRED)

add_executable(pub_sub pub_sub.cpp)

target_link_libraries(pub_sub
    zenohcxx::zenohpico)

Demo: pub / sub with ROS message serialization

pub_sub.cpp
#include <iostream>
#include <thread>
#include "zenoh.hxx"
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/int32.hpp>
#include <rclcpp/serialization.hpp>
#include <rclcpp/serialized_message.hpp>

using namespace zenoh;

std::vector<uint8_t> serialize_ros_message(const std_msgs::msg::Int32 & msg)
{
    rclcpp::Serialization<std_msgs::msg::Int32> serializer;
    rclcpp::SerializedMessage serialized_msg;

    serializer.serialize_message(&msg, &serialized_msg);

    // Copy message block of memory to std::vector
    std::vector<uint8_t> buffer(
        serialized_msg.get_rcl_serialized_message().buffer,
        serialized_msg.get_rcl_serialized_message().buffer +
        serialized_msg.get_rcl_serialized_message().buffer_length);

    return buffer;
}

std_msgs::msg::Int32 deserialize_ros_message(const std::vector<uint8_t> &buffer)
{
    rclcpp::SerializedMessage serialized_msg;

    // Resize internal buffer and copy
    serialized_msg.get_rcl_serialized_message().buffer_capacity = buffer.size();
    serialized_msg.get_rcl_serialized_message().buffer_length = buffer.size();
    serialized_msg.get_rcl_serialized_message().buffer = reinterpret_cast<uint8_t *>(malloc(buffer.size()));
    std::memcpy(serialized_msg.get_rcl_serialized_message().buffer, buffer.data(), buffer.size());

    rclcpp::Serialization<std_msgs::msg::Int32> serializer;
    std_msgs::msg::Int32 msg;
    serializer.deserialize_message(&serialized_msg, &msg);



    return msg;
}

void task1()
{
    Config config = Config::create_default();
    auto session = Session::open(std::move(config));
    // auto publisher = session.declare_publisher(KeyExpr("demo/example/simple"));

    for (int i = 0; i < 5; ++i) {
        std_msgs::msg::Int32 msg;
        msg.data = i;
        auto buffer = serialize_ros_message(msg);

        // Publish using session.put
        session.put(KeyExpr("demo/example/simple"), buffer);

        // Publish using publisher.put
        // publisher.put("Simple from publisher.put!");

        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
}



int main()
{
    std::thread t1(task1);
    Config config = Config::create_default();
    auto session = Session::open(std::move(config));
    auto subscriber = session.declare_subscriber(
        KeyExpr("demo/example/simple"),
        [](const Sample &sample)
        {
            auto buffer = sample.get_payload().as_vector();
            auto msg = deserialize_ros_message(buffer);
            std::cout << "Received: " << msg.data << std::endl;
        },
        closures::none);

    t1.join(); // Wait for task1 to finish

    std::cout << "Both tasks completed.\n";
    return 0;
}
CMakeLists.txt
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

find_package(zenohpico) 
find_package(zenohcxx REQUIRED)
find_package(std_msgs REQUIRED)
find_package(rclcpp REQUIRED)

add_executable(pub_sub_cdr pub_sub.cpp)

target_link_libraries(pub_sub_cdr
    zenohcxx::zenohpico
)

ament_target_dependencies(pub_sub_cdr rclcpp std_msgs)

Demo: Subscribe to ros message with zenoh bridge

terminal 1
# Public message using ros
ros2 topic pub my_int32_topic std_msgs/msg/Int32 "{data: 10}"
terminal 2
# Run zenoh bridge
zenoh-bridge-ros2dds
terminal 3
# Run subscriber
./sub
sub.cpp
#include <iostream>
#include <thread>
#include "zenoh.hxx"
#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/int32.hpp>
#include <rclcpp/serialization.hpp>
#include <rclcpp/serialized_message.hpp>
// sudo apt install libfastcdr-dev


using namespace zenoh;

std_msgs::msg::Int32 deserialize_ros_message(const std::vector<uint8_t> &buffer)
{
    rclcpp::SerializedMessage serialized_msg;

    // Resize internal buffer and copy
    serialized_msg.get_rcl_serialized_message().buffer_capacity = buffer.size();
    serialized_msg.get_rcl_serialized_message().buffer_length = buffer.size();
    serialized_msg.get_rcl_serialized_message().buffer = reinterpret_cast<uint8_t *>(malloc(buffer.size()));
    std::memcpy(serialized_msg.get_rcl_serialized_message().buffer, buffer.data(), buffer.size());

    rclcpp::Serialization<std_msgs::msg::Int32> serializer;
    std_msgs::msg::Int32 msg;
    serializer.deserialize_message(&serialized_msg, &msg);



    return msg;
}

int main(int argc, char **argv) {
   Config config = Config::create_default();
   auto session = Session::open(std::move(config));
   auto subscriber = session.declare_subscriber(
      KeyExpr("my_int32_topic"),
      [](const Sample& sample) {
        auto buffer = sample.get_payload().as_vector();
            auto msg = deserialize_ros_message(buffer);
            std::cout << "Received: " << msg.data << std::endl;

      },
      closures::none
   );
   // Wait for a key press to exit
   char c = getchar();
}
CMakeLists.txt
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

find_package(zenohpico) 
find_package(zenohcxx REQUIRED)
find_package(std_msgs REQUIRED)
find_package(rclcpp REQUIRED)


add_executable(sub sub.cpp)

target_link_libraries(sub
    zenohcxx::zenohpico
)

ament_target_dependencies(sub rclcpp std_msgs)

Resource