Skip to content

GSCAM

ROS Package for broadcasting gstreamer video stream via ROS2 camera API, publish video as sensors_msgs/Image with frame_id and timestamp

Demo

gstreamer pipe

input gstreamer pipe can set by environment variable GSCAM_CONFIG or ros parameter gscam_config

1
2
3
4
5
6
7
export GSCAM_CONFIG="videotestsrc ! video/x-raw,width=640,height=480,framrate=10/1 ! videoconvert"

#
ros2 run gscam gscam_node

# 
ros2 run rqt_image_view rat_image_view

alt text


More control

QOS

Run with parameter use_sensor_data_qos:=true to publish as best_effort qos

1
2
3
ros2 run gscam gscam_node --ros-args \
-p use_sensor_data_qos:=true \
-p gscam_config:="videotestsrc pattern=basll ! video/x-raw,width=640,height=480,framrate=10/1 ! videoconvert"

Camera info

gscam using camera_info_manager package to control camera_info topic , we can change the data using set_camera_info service

The init camera_info can control by parameter camera_info_url

1
2
3
ros2 run gscam gscam_node --ros-args \
-p use_sensor_data_qos:=true \
-p camera_info_url:=file:///<absolute>/gscam_demo/config/uncalibrated_parameters.ini
camera calibration file
# Camera intrinsics

[image]

width
1280

height
720

[camera]

camera matrix
900.00000 0.00000 640.00000 
0.00000 900.00000 360.00000 
0.00000 0.00000 1.00000 

distortion
0.10000 -0.05000 0.00100 0.00200 0.00000 


rectification
0.00000 0.00000 0.00000 
0.00000 0.00000 0.00000 
0.00000 0.00000 0.00000 

projection
0.00000 0.00000 0.00000 0.00000 
0.00000 0.00000 0.00000 0.00000 
0.00000 0.00000 0.00000 0.00000 

Download ini file

update service

The /set_camera_info service is part of camera_info_manager

Note

The service update/save the data to source ini file

#!/bin/python


import rclpy
from rclpy.node import Node
from sensor_msgs.msg import CameraInfo
from sensor_msgs.srv import SetCameraInfo


class CameraInfoClient(Node):
    def __init__(self):
        super().__init__('camera_info_client')
        self.client = self.create_client(SetCameraInfo, '/set_camera_info')

        # Wait for the service to be available
        while not self.client.wait_for_service(timeout_sec=3.0):
            self.get_logger().warn("Waiting for /set_camera_info service...")

        self.send_request()

    def send_request(self):
        request = SetCameraInfo.Request()

        # Fill the CameraInfo message
        request.camera_info = CameraInfo()
        request.camera_info.width = 1280
        request.camera_info.height = 720
        request.camera_info.k = [900.0, 0.0, 640.0, 0.0, 900.0, 360.0, 0.0, 0.0, 1.0]  # Intrinsic matrix
        request.camera_info.d = [0.1, -0.05, 0.001, 0.002, 0.0]  # Distortion coefficients
        request.camera_info.distortion_model = "plumb_bob"

        self.future = self.client.call_async(request)
        self.future.add_done_callback(self.response_callback)

    def response_callback(self, future):
        try:
            response = future.result()
            if response.success:
                self.get_logger().info("Camera info updated successfully!")
            else:
                self.get_logger().error("Failed to update camera info!")
        except Exception as e:
            self.get_logger().error(f"Service call failed: {str(e)}")


def main():
    rclpy.init()
    node = CameraInfoClient()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()