Skip to content

Betaflight MSP protocol

Config betaflight uart port for msp usage

alt text

Demo: python msp

MSP get status
#!/usr/bin/env python3

from __future__ import annotations

import argparse
import json
import struct
import time


DEFAULT_DEVICE = "/dev/ttyUSB0"
DEFAULT_BAUDRATE = 115200
MSP_STATUS_EX = 150

ARMING_DISABLE_FLAGS = {
    0: "NO_GYRO",
    1: "FAILSAFE",
    2: "RX_FAILSAFE",
    3: "NOT_DISARMED",
    4: "BOXFAILSAFE",
    5: "RUNAWAY_TAKEOFF",
    6: "CRASH_DETECTED",
    7: "THROTTLE",
    8: "ANGLE",
    9: "BOOT_GRACE_TIME",
    10: "NOPREARM",
    11: "LOAD",
    12: "CALIBRATING",
    13: "CLI",
    14: "CMS_MENU",
    15: "BST",
    16: "MSP",
    17: "PARALYZE",
    18: "GPS",
    19: "RESCUE_SW",
    20: "RPMFILTER",
    21: "REBOOT_REQUIRED",
    22: "DSHOT_BITBANG",
    23: "ACC_CALIBRATION",
    24: "MOTOR_PROTOCOL",
    25: "ARMING_DISABLED_ARM_SWITCH",
    26: "ALTITUDE",
    27: "POSITION",
    28: "ARM_SWITCH",
}


def msp_v1_request(command: int, payload: bytes = b"") -> bytes:
    size = len(payload)
    checksum = size ^ command
    for byte in payload:
        checksum ^= byte
    return b"$M<" + bytes([size, command]) + payload + bytes([checksum])


def crc8_dvb_s2(data: bytes) -> int:
    crc = 0
    for byte in data:
        crc ^= byte
        for _ in range(8):
            if crc & 0x80:
                crc = ((crc << 1) ^ 0xD5) & 0xFF
            else:
                crc = (crc << 1) & 0xFF
    return crc


def msp_v2_request(command: int, payload: bytes = b"") -> bytes:
    flags = 0
    body = (
        bytes([flags])
        + command.to_bytes(2, "little")
        + len(payload).to_bytes(2, "little")
        + payload
    )
    return b"$X<" + body + bytes([crc8_dvb_s2(body)])


def read_exact(serial_port, size: int, timeout: float) -> bytes:
    serial_port.timeout = timeout
    data = serial_port.read(size)
    if len(data) != size:
        raise TimeoutError(f"Timeout while reading {size} serial bytes")
    return data


def read_msp_response(serial_port, timeout: float) -> tuple[int, bytes]:
    deadline = time.monotonic() + timeout

    while time.monotonic() < deadline:
        remaining = max(0.001, deadline - time.monotonic())
        if read_exact(serial_port, 1, remaining) != b"$":
            continue
        protocol = read_exact(serial_port, 1, remaining)

        if protocol == b"M":
            return read_msp_v1_response_after_header(serial_port, deadline)
        if protocol == b"X":
            return read_msp_v2_response_after_header(serial_port, deadline)

    raise TimeoutError("No MSP response")


def read_msp_v1_response_after_header(serial_port, deadline: float) -> tuple[int, bytes]:
    remaining = max(0.001, deadline - time.monotonic())
    direction = read_exact(serial_port, 1, remaining)
    if direction not in (b">", b"!"):
        raise ValueError(f"Invalid MSP v1 direction: {direction!r}")

    size = read_exact(serial_port, 1, remaining)[0]
    command = read_exact(serial_port, 1, remaining)[0]
    payload = read_exact(serial_port, size, remaining)
    received_checksum = read_exact(serial_port, 1, remaining)[0]

    checksum = size ^ command
    for byte in payload:
        checksum ^= byte
    if checksum != received_checksum:
        raise ValueError(
            f"Bad MSP v1 checksum: got 0x{received_checksum:02x}, "
            f"expected 0x{checksum:02x}"
        )
    if direction == b"!":
        raise RuntimeError(f"MSP error response for command {command}")

    return command, payload


def read_msp_v2_response_after_header(serial_port, deadline: float) -> tuple[int, bytes]:
    remaining = max(0.001, deadline - time.monotonic())
    direction = read_exact(serial_port, 1, remaining)
    if direction not in (b">", b"!"):
        raise ValueError(f"Invalid MSP v2 direction: {direction!r}")

    flags = read_exact(serial_port, 1, remaining)
    command_bytes = read_exact(serial_port, 2, remaining)
    size_bytes = read_exact(serial_port, 2, remaining)
    size = int.from_bytes(size_bytes, "little")
    payload = read_exact(serial_port, size, remaining)
    received_checksum = read_exact(serial_port, 1, remaining)[0]

    checksum_data = flags + command_bytes + size_bytes + payload
    checksum = crc8_dvb_s2(checksum_data)
    if checksum != received_checksum:
        raise ValueError(
            f"Bad MSP v2 checksum: got 0x{received_checksum:02x}, "
            f"expected 0x{checksum:02x}"
        )

    command = int.from_bytes(command_bytes, "little")
    if direction == b"!":
        raise RuntimeError(f"MSP error response for command {command}")

    return command, payload


def request_msp(
    serial_port,
    command: int,
    timeout: float,
    version: int = 1,
) -> bytes:
    if version == 1:
        request = msp_v1_request(command)
    elif version == 2:
        request = msp_v2_request(command)
    else:
        raise ValueError(f"Unsupported MSP version: {version}")

    serial_port.write(request)
    serial_port.flush()

    while True:
        response_command, payload = read_msp_response(serial_port, timeout)
        if response_command == command:
            return payload


def decode_arming_mask(mask: int) -> list[str]:
    return [
        name
        for bit, name in ARMING_DISABLE_FLAGS.items()
        if mask & (1 << bit)
    ]


def parse_status_ex(payload: bytes) -> dict[str, object]:
    if len(payload) < 16:
        raise ValueError(f"MSP_STATUS_EX payload too short: {len(payload)} bytes")

    cycle_time_us = struct.unpack_from("<H", payload, 0)[0]
    i2c_errors = struct.unpack_from("<H", payload, 2)[0]
    sensors_mask = struct.unpack_from("<H", payload, 4)[0]
    box_mode_flags = struct.unpack_from("<I", payload, 6)[0]
    pid_profile = payload[10]
    cpu_load = struct.unpack_from("<H", payload, 11)[0]
    pid_profile_count = payload[13]
    rate_profile = payload[14]
    flight_mode_byte_count = payload[15]
    arming_offset = 16 + flight_mode_byte_count

    if len(payload) < arming_offset + 5:
        raise ValueError(
            f"Cannot decode arming flags. payload_len={len(payload)}, "
            f"flight_mode_byte_count={flight_mode_byte_count}"
        )

    arming_disable_flag_count = payload[arming_offset]
    arming_disable_mask = struct.unpack_from("<I", payload, arming_offset + 1)[0]
    arming_disable_flags = decode_arming_mask(arming_disable_mask)

    return {
        "cycle_time_us": cycle_time_us,
        "i2c_errors": i2c_errors,
        "sensors_mask": sensors_mask,
        "sensors_mask_hex": f"0x{sensors_mask:04x}",
        "box_mode_flags": box_mode_flags,
        "box_mode_flags_hex": f"0x{box_mode_flags:08x}",
        "pid_profile": pid_profile,
        "pid_profile_count": pid_profile_count,
        "rate_profile": rate_profile,
        "cpu_load_raw": cpu_load,
        "flight_mode_byte_count": flight_mode_byte_count,
        "arming_disable_flag_count": arming_disable_flag_count,
        "arming_disable_mask": arming_disable_mask,
        "arming_disable_mask_hex": f"0x{arming_disable_mask:08x}",
        "arming_disable_flags": arming_disable_flags,
        "arming_disabled": bool(arming_disable_flags),
        "armable": not arming_disable_flags,
        "calibrating": "CALIBRATING" in arming_disable_flags,
        "failsafe": "FAILSAFE" in arming_disable_flags
        or "RX_FAILSAFE" in arming_disable_flags,
    }


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Read Betaflight MSP status telemetry from a serial device."
    )
    parser.add_argument("--device", default=DEFAULT_DEVICE)
    parser.add_argument("--baudrate", default=DEFAULT_BAUDRATE, type=int)
    parser.add_argument("--rate-hz", default=2.0, type=float)
    parser.add_argument("--timeout", default=0.5, type=float)
    parser.add_argument("--count", default=0, type=int, help="0 means run forever")
    parser.add_argument("--msp-version", default=1, choices=(1, 2), type=int)
    return parser.parse_args()


def main() -> None:
    import serial

    args = parse_args()
    interval_s = 1.0 / args.rate_hz if args.rate_hz > 0 else 0.0

    with serial.Serial(
        port=args.device,
        baudrate=args.baudrate,
        timeout=0,
        write_timeout=1,
    ) as serial_port:
        reads = 0
        while args.count <= 0 or reads < args.count:
            started = time.monotonic()
            payload = request_msp(
                serial_port,
                MSP_STATUS_EX,
                args.timeout,
                version=args.msp_version,
            )
            telemetry = parse_status_ex(payload)
            telemetry["timestamp_s"] = time.time()
            print(json.dumps(telemetry, indent=2, sort_keys=True), flush=True)

            reads += 1

            if interval_s:
                elapsed = time.monotonic() - started
                time.sleep(max(0.0, interval_s - elapsed))


if __name__ == "__main__":
    main()