Skip to content

Diagnostic Tasks

DiagnosticTask is an abstract base class for collecting diagnostic data.

A DiagnosticTask has a name, and a function that is called to create a DiagnosticStatusWrapper.

DiagnosticsTask subclass by

  • CompositeDiagnosticTask
  • FrequencyStatus
  • GenericFunctionDiagnosticTask
  • Heartbeat
  • TimeStampStatus

DiagnosticStatus as a function

Implement DiagnosticStatus as function that register to DiagnosticUpdater

Demo code
#!/usr/bin/env python3

import rclpy
from rclpy.node import Node
from diagnostic_msgs.msg import DiagnosticStatus
import diagnostic_updater

def dummy_diagnostic(stat: diagnostic_updater.DiagnosticStatusWrapper):
   stat.message ="message dummy_diagnostic"
   stat.level = DiagnosticStatus.WARN
   stat.name = "dummy_diagnostic"

   return stat

class MyNode(Node):
    def __init__(self):
        node_name="minimal"
        super().__init__(node_name)
        updater = diagnostic_updater.Updater(self)
        updater.add("Function updater", dummy_diagnostic)
        self.get_logger().info("Hello ROS2")

def main(args=None):
    rclpy.init(args=args)
    node = MyNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()
  • dummy_diagnostic method get one argument stat of type diagnostic_updater.DiagnosticStatusWrapper DiagnosticStatusWrapper is a derived class of diagnostic_msgs.msg.DiagnosticStatus that provides a set of convenience methods.

  • diagnostic_updater use add method to register diagnostic method

  • The updater publish diagnostic message every one second
/diagnostics topic
ros2 topic echo /diagnostics
---
header:
  stamp:
    sec: 1743564548
    nanosec: 360259990
  frame_id: ''
status:
- level: "\x01"
  name: 'minimal: dummy_diagnostic'
  message: message dummy_diagnostic
  hardware_id: ''
  values: []
---
header:
  stamp:
    sec: 1743564549
    nanosec: 360183120
  frame_id: ''
status:
- level: "\x01"
  name: 'minimal: dummy_diagnostic'
  message: message dummy_diagnostic
  hardware_id: ''
  values: []

Heartbeat

Diagnostic task to monitor whether a node is alive. This diagnostic task always reports as OK and 'Alive' when it runs

Demo code
#!/usr/bin/env python3

import rclpy
from rclpy.node import Node
from diagnostic_updater import Heartbeat, Updater

class MyNode(Node):
    def __init__(self):
        node_name="node_name"
        super().__init__(node_name)
        updater = Updater(self)
        updater.hwid = "hwid"
        self.task = Heartbeat()
        updater.add(self.task)
        self.get_logger().info("Hello ROS2")



def main(args=None):
    rclpy.init(args=args)
    node = MyNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()
/diagnostic topic
---
header:
  stamp:
    sec: 1743565018
    nanosec: 240255516
  frame_id: ''
status:
- level: "\0"
  name: 'node_name: Heartbeat'
  message: Alive
  hardware_id: hwid
  values: []
---
header:
  stamp:
    sec: 1743565019
    nanosec: 239948956
  frame_id: ''
status:
- level: "\0"
  name: 'node_name: Heartbeat'
  message: Alive
  hardware_id: hwid
  values: []