diagnostic
ros
tasks
Diagnostic Tasks
DiagnosticTask is an abstract base class for collecting diagnostic data.
A DiagnosticTask has a name, and a function that is called to create a DiagnosticStatusWrapper.
DiagnosticsTask subclass by
CompositeDiagnosticTask
FrequencyStatus
GenericFunctionDiagnosticTask
Heartbeat
TimeStampStatus
DiagnosticStatus as a function
Implement DiagnosticStatus as function that register to DiagnosticUpdater
Demo code
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from diagnostic_msgs.msg import DiagnosticStatus
import diagnostic_updater
def dummy_diagnostic ( stat : diagnostic_updater . DiagnosticStatusWrapper ):
stat . message = "message dummy_diagnostic"
stat . level = DiagnosticStatus . WARN
stat . name = "dummy_diagnostic"
return stat
class MyNode ( Node ):
def __init__ ( self ):
node_name = "minimal"
super () . __init__ ( node_name )
updater = diagnostic_updater . Updater ( self )
updater . add ( "Function updater" , dummy_diagnostic )
self . get_logger () . info ( "Hello ROS2" )
def main ( args = None ):
rclpy . init ( args = args )
node = MyNode ()
rclpy . spin ( node )
node . destroy_node ()
rclpy . shutdown ()
if __name__ == '__main__' :
main ()
dummy_diagnostic method get one argument stat of type diagnostic_updater.DiagnosticStatusWrapper DiagnosticStatusWrapper is a derived class of diagnostic_msgs.msg.DiagnosticStatus that provides a set of convenience methods.
diagnostic_updater use add
method to register diagnostic method
The updater publish diagnostic message every one second
/diagnostics topic
ros2 topic echo /diagnostics ---
header :
stamp :
sec : 1743564548
nanosec : 360259990
frame_id : ''
status :
- level : " \x01 "
name : 'minimal: dummy_diagnostic'
message : message dummy_diagnostic
hardware_id : ''
values : []
---
header :
stamp :
sec : 1743564549
nanosec : 360183120
frame_id : ''
status :
- level : " \x01 "
name : 'minimal: dummy_diagnostic'
message : message dummy_diagnostic
hardware_id : ''
values : []
Heartbeat
Diagnostic task to monitor whether a node is alive. This diagnostic task always reports as OK and 'Alive' when it runs
Demo code
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from diagnostic_updater import Heartbeat , Updater
class MyNode ( Node ):
def __init__ ( self ):
node_name = "node_name"
super () . __init__ ( node_name )
updater = Updater ( self )
updater . hwid = "hwid"
self . task = Heartbeat ()
updater . add ( self . task )
self . get_logger () . info ( "Hello ROS2" )
def main ( args = None ):
rclpy . init ( args = args )
node = MyNode ()
rclpy . spin ( node )
node . destroy_node ()
rclpy . shutdown ()
if __name__ == '__main__' :
main ()
/diagnostic topic
---
header :
stamp :
sec : 1743565018
nanosec : 240255516
frame_id : ''
status :
- level : "\0"
name : 'node_name: Heartbeat'
message : Alive
hardware_id : hwid
values : []
---
header :
stamp :
sec : 1743565019
nanosec : 239948956
frame_id : ''
status :
- level : "\0"
name : 'node_name: Heartbeat'
message : Alive
hardware_id : hwid
values : []