#!/usr/bin/env python3importrclpyfromrclpy.nodeimportNodefromstd_srvs.srvimportTriggerfromrclpy.qosimportqos_profile_services_defaultTOPIC="trigger_srv"classMyNode(Node):def__init__(self):node_name="minimal_srv"super().__init__(node_name)self.srv=self.create_service(Trigger,TOPIC,self.handler,qos_profile=qos_profile_services_default)self.get_logger().info("Hello ROS2")defhandler(self,request:Trigger.Request,response:Trigger.Response):self.get_logger().info("Trigger service called")response.success=Trueresponse.message="success"print(response)returnresponsedefmain(args=None):rclpy.init(args=args)node=MyNode()rclpy.spin(node)node.destroy_node()rclpy.shutdown()if__name__=='__main__':main()
importzenohfromdataclassesimportdataclassfrompycdr2importIdlStructfromzenohimportReplyimporttime# ros2 service call /trigger_srv std_srvs/srv/TriggerTOPIC="trigger_srv"@dataclassclassTriggerRequest(IdlStruct):pass@dataclassclassTriggerResponse(IdlStruct):success:boolmessage:strdefhandle_replay(reply:Reply):try:print(reply.ok)message=TriggerResponse.deserialize(reply.ok.payload.to_bytes())print(f">> Received {message}")exceptExceptionase:print(e)print(">> Received ERROR")conf=zenoh.Config()session=zenoh.open(conf)req=TriggerRequest().serialize()replies=session.get(TOPIC,payload=req,handler=handle_replay)time.sleep(2)session.close()