In this profile the depth set to 5 but the the durability set to VOLATILE (no history, new subscribers get only new messages), the result saw that late subscriber got only new messages
Durability: TRANSIENT_LOCAL stores past messages for late joiners
depth: 10
Subscriber QOS settings
The queue depth in the subscriber refer to the number of messages the subscriber can store before processing them
The durability must be transient local because in VOLATILE means it won't receive old message
#!/usr/bin/env python3importrclpyfromrclpy.executorsimportMultiThreadedExecutorfromrclpy.nodeimportNodefromstd_msgs.msgimportStringfromrclpy.qosimportQoSProfile,ReliabilityPolicy,DurabilityPolicy,HistoryPolicy,LivelinessPolicyfromrclpy.qosimportqos_profile_system_default,qos_profile_sensor_datafromrclpy.durationimportDuration# Correct importTOPIC="test_topic"SUB_START_DELAY=5PUB_RATE=1classSubNode(Node):def__init__(self):super().__init__("sub_node")self.sub_start_delay=self.create_timer(SUB_START_DELAY,self.timer_handler)self.get_logger().info("Hello SUB")deftimer_handler(self):self.destroy_timer(self.sub_start_delay)self.get_logger().info("--Create subscriber--")qos_profile=QoSProfile(reliability=ReliabilityPolicy.RELIABLE,# or BEST_EFFORTdurability=DurabilityPolicy.TRANSIENT_LOCAL,# or VOLATILEhistory=HistoryPolicy.KEEP_LAST,# or KEEP_ALLdepth=10,# Queue depthliveliness=LivelinessPolicy.AUTOMATIC,# or MANUAL_BY_TOPIC)self.create_subscription(String,TOPIC,self.callback,qos_profile=qos_profile)defcallback(self,msg):self.get_logger().info(f"Received: {msg.data}")classPubNode(Node):def__init__(self):super().__init__("pub_node")qos_profile=QoSProfile(reliability=ReliabilityPolicy.RELIABLE,# or BEST_EFFORTdurability=DurabilityPolicy.TRANSIENT_LOCAL,# or VOLATILEhistory=HistoryPolicy.KEEP_LAST,# or KEEP_ALLdepth=10,# Queue depthliveliness=LivelinessPolicy.AUTOMATIC,# or MANUAL_BY_TOPIC)self.counter=0self.pub=self.create_publisher(String,TOPIC,qos_profile)# self.get_logger().info(f"{self.pub.qos_profile}")self.timer=self.create_timer(PUB_RATE,self.timer_handler)self.get_logger().info("Hello PUB")deftimer_handler(self):msg=String()msg.data=f"hello {self.counter}"self.counter+=1self.pub.publish(msg)defmain(args=None):rclpy.init(args=args)pub_node=PubNode()sub_node=SubNode()executer=MultiThreadedExecutor()executer.add_node(pub_node)executer.add_node(sub_node)executer.spin()rclpy.shutdown()if__name__=='__main__':main()
From the result we saw that we got five (0-4) message from publisher history in the same time slot almost
#!/usr/bin/env python3importrclpyfromrclpy.executorsimportMultiThreadedExecutorfromrclpy.nodeimportNodefromstd_msgs.msgimportStringfromrclpy.qosimportQoSProfile,ReliabilityPolicy,DurabilityPolicy,HistoryPolicy,LivelinessPolicyfromrclpy.durationimportDuration# Correct importTOPIC="test_topic"SUB_START_DELAY=5PUB_RATE=1INFINITY=0THREE=3FIVE=5DURATION=FIVEclassSubNode(Node):def__init__(self):super().__init__("sub_node")self.sub_start_delay=self.create_timer(SUB_START_DELAY,self.timer_handler)self.get_logger().info("Hello SUB")deftimer_handler(self):self.destroy_timer(self.sub_start_delay)self.get_logger().info("--Create subscriber--")qos_profile=QoSProfile(reliability=ReliabilityPolicy.RELIABLE,# or BEST_EFFORTdurability=DurabilityPolicy.TRANSIENT_LOCAL,# or VOLATILEhistory=HistoryPolicy.KEEP_LAST,# or KEEP_ALLdepth=10,# Queue depthliveliness=LivelinessPolicy.AUTOMATIC,# or MANUAL_BY_TOPIC)self.create_subscription(String,TOPIC,self.callback,qos_profile=qos_profile)defcallback(self,msg):self.get_logger().info(f"Received: {msg.data}")classPubNode(Node):def__init__(self):super().__init__("pub_node")qos_profile=QoSProfile(reliability=ReliabilityPolicy.RELIABLE,# or BEST_EFFORTdurability=DurabilityPolicy.TRANSIENT_LOCAL,# or VOLATILEhistory=HistoryPolicy.KEEP_LAST,# or KEEP_ALLdepth=10,# Queue depthliveliness=LivelinessPolicy.AUTOMATIC,# or MANUAL_BY_TOPIC)qos_profile.lifespan=Duration(seconds=DURATION)self.counter=0self.pub=self.create_publisher(String,TOPIC,qos_profile)self.timer=self.create_timer(PUB_RATE,self.timer_handler)self.get_logger().info("Hello PUB")deftimer_handler(self):msg=String()msg.data=f"hello {self.counter}"self.counter+=1self.pub.publish(msg)defmain(args=None):rclpy.init(args=args)pub_node=PubNode()sub_node=SubNode()executer=MultiThreadedExecutor()executer.add_node(pub_node)executer.add_node(sub_node)executer.spin()rclpy.shutdown()if__name__=='__main__':main()
#!/usr/bin/env python3importrclpyfromrclpy.executorsimportMultiThreadedExecutorfromrclpy.nodeimportNodefromstd_msgs.msgimportStringfromrclpy.qosimportQoSProfile,ReliabilityPolicy,DurabilityPolicy,HistoryPolicy,LivelinessPolicyfromrclpy.durationimportDuration# Correct importfromrclpy.qos_eventimportSubscriptionEventCallbacksTOPIC="test_topic"SUB_START_DELAY=5PUB_RATE=1classSubNode(Node):def__init__(self):super().__init__("sub_node")self.get_logger().info("--Create subscriber--")qos_profile=QoSProfile(reliability=ReliabilityPolicy.RELIABLE,# or BEST_EFFORTdurability=DurabilityPolicy.TRANSIENT_LOCAL,# or VOLATILEhistory=HistoryPolicy.KEEP_LAST,# or KEEP_ALLdepth=10,# Queue depthliveliness=LivelinessPolicy.AUTOMATIC,# or MANUAL_BY_TOPIC)qos_profile.deadline=Duration(seconds=1,nanoseconds=0)# Deadline durationself.sub=self.create_subscription(String,TOPIC,self.callback,qos_profile,event_callbacks=SubscriptionEventCallbacks(deadline=self.deadline_missed_callback))self.get_logger().info("Hello SUB")defdeadline_missed_callback(self,event):self.get_logger().warning("Deadline missed !!")defcallback(self,msg):self.get_logger().info(f"Received: {msg.data}")classPubNode(Node):def__init__(self):super().__init__("pub_node")qos_profile=QoSProfile(reliability=ReliabilityPolicy.RELIABLE,# or BEST_EFFORTdurability=DurabilityPolicy.TRANSIENT_LOCAL,# or VOLATILEhistory=HistoryPolicy.KEEP_LAST,# or KEEP_ALLdepth=10,# Queue depthliveliness=LivelinessPolicy.AUTOMATIC,# or MANUAL_BY_TOPIC)qos_profile.deadline=Duration(seconds=1,nanoseconds=0)# Deadline durationself.counter=0self.pub=self.create_publisher(String,TOPIC,qos_profile)self.timer=self.create_timer(PUB_RATE,self.timer_handler)self.get_logger().info("Hello PUB")deftimer_handler(self):msg=String()msg.data=f"hello {self.counter}"self.counter+=1self.pub.publish(msg)ifself.counter>5:self.destroy_timer(self.timer)self.destroy_publisher(self.pub)defmain(args=None):rclpy.init(args=args)pub_node=PubNode()sub_node=SubNode()executer=MultiThreadedExecutor()executer.add_node(pub_node)executer.add_node(sub_node)executer.spin()rclpy.shutdown()if__name__=='__main__':main()
The demo code stop to publish after 5 sec, the subscriber trigger the deadline_missed_callback callback
Note
The publisher rate and deadline both are 1 hz,
the alert in line 6,8 occurs because the narrower delta
#!/usr/bin/env python3importrclpyfromrclpy.nodeimportNodefromstd_msgs.msgimportStringfromrclpy.qosimportQoSProfile,LivelinessPolicyfromrclpy.durationimportDurationclassLivelinessPublisher(Node):def__init__(self):super().__init__('liveliness_publisher')qos_profile=QoSProfile(depth=10,liveliness=LivelinessPolicy.MANUAL_BY_TOPIC,# Publisher must assert livelinessliveliness_lease_duration=Duration(seconds=3)# Must assert within 3 seconds)self.publisher_=self.create_publisher(String,'liveliness_topic',qos_profile)self.timer=self.create_timer(1.0,self.publish_message)# Publish every 1s# self.liveliness_timer = self.create_timer(2.0, self.assert_liveliness) # Assert liveliness every 2sself.counter=0defpublish_message(self):msg=String()msg.data=f'Hello {self.counter}'self.publisher_.publish(msg)self.get_logger().info(f'Published: {msg.data}')self.counter+=1defassert_liveliness(self):self.publisher_.assert_liveliness()# Manually assert that publisher is aliveself.get_logger().info('Liveliness asserted')defmain(args=None):rclpy.init(args=args)node=LivelinessPublisher()rclpy.spin(node)node.destroy_node()rclpy.shutdown()if__name__=='__main__':main()