Skip to content

Common serialization options used with ZeroMQ

Serialization Speed Size Schema / Versioning Cross-language Zero-copy friendly Best for Downsides
Raw bytes (no serialization) ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ None N/A ⭐⭐⭐⭐⭐ Video frames, audio, tensors, opaque blobs You must define your own layout/metadata
struct (binary packing) ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ Manual (you define layout) ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ Fixed-format headers, tiny fast metadata Painful to evolve; endian/alignment concerns
MessagePack ⭐⭐⭐⭐ ⭐⭐⭐⭐ Manual (conventions) ⭐⭐⭐⭐ ⭐⭐⭐ Fast Python metadata, PUB/SUB payload headers No enforced schema; versioning is on you
Protocol Buffers ⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ (excellent) ⭐⭐⭐⭐⭐ ⭐⭐ Stable APIs, multi-language systems Compile step, less “dynamic”, not zero-copy
FlatBuffers ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ High-rate telemetry; fast reads without unpack More complex; schema rigidity; tooling learning curve
Cap’n Proto ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ Lowest latency; near zero-copy reads Ecosystem/tooling is heavier; less common in Python
JSON ⭐⭐ ⭐⭐ Manual ⭐⭐⭐⭐⭐ Control plane, debugging, configs Big + slow; numeric/typing ambiguity
Pickle (Python only) ⭐⭐⭐ ⭐⭐⭐ N/A ⭐⭐ Quick prototypes within trusted Python-only setup Unsafe with untrusted data; not cross-language

FlatBuffer

With FlatBuffers, the message stays in its binary form and fields are read directly from the buffer.

install
1
2
3
4
pip install flatbuffers

# install flatc compiler
sudo apt install flatbuffers-compiler

Demo: simple

telemetry.fbs
namespace Telemetry;

table Telemetry {
  seq:uint32;
  ts:double;
  temp:float;
  msg:string;
}

root_type Telemetry;
flatc --python telemetry.fbs
usage
import time
import flatbuffers

from Telemetry import Telemetry


def build_telemetry(seq: int, ts: float, temp: float, msg: str) -> bytes:
    b = flatbuffers.Builder(128)

    msg_off = b.CreateString(msg)

    Telemetry.Start(b)
    Telemetry.AddSeq(b, seq)
    Telemetry.AddTs(b, ts)
    Telemetry.AddTemp(b, temp)
    Telemetry.AddMsg(b, msg_off)
    obj = Telemetry.End(b)

    b.Finish(obj)
    return bytes(b.Output())


def read_telemetry(buf: bytes) -> None:
    t = Telemetry.Telemetry.GetRootAsTelemetry(buf, 0)
    print("seq:", t.Seq())
    print("ts :", t.Ts())
    print("temp:", t.Temp())
    print("msg:", t.Msg().decode("utf-8"))


if __name__ == "__main__":
    data = build_telemetry(
        seq=1,
        ts=time.time(),
        temp=36.5,
        msg="h",
    )
    print(len(data))
    read_telemetry(data)

Zero-copy

send
sock.send(flatbuf_bytes, copy=False)
receiving
frame = sock.recv(copy=False)

Demo: zmq

code
#!/usr/bin/env python3
import time
import multiprocessing as mp
from typing import Iterable

import zmq
import flatbuffers
from flatbuffers.table import Table
from flatbuffers import packer, encode, number_types
from Telemetry import Telemetry  # type: ignore
# ============================================================
# Minimal inline "generated" FlatBuffers code for:
#
# table Telemetry {
#   seq:uint32;
#   ts:double;
#   temp:float;
#   msg:string;
# }
# root_type Telemetry;
# ============================================================


def build_telemetry_flatbuf(seq: int, ts: float, temp: float, msg: str) -> bytes:
    """
    Build a Telemetry FlatBuffer and return the underlying bytes.
    """
    b = flatbuffers.Builder(128)

    msg_off = b.CreateString(msg)

    Telemetry.Start(b)
    Telemetry.AddSeq(b, seq)
    Telemetry.AddTs(b, ts)
    Telemetry.AddTemp(b, temp)
    Telemetry.AddMsg(b, msg_off)
    obj = Telemetry.End(b)

    b.Finish(obj)
    return bytes(b.Output())


# ============================================================
# ZMQ PUB/SUB multiprocessing demo
# ============================================================

def publisher_proc(pub_name: str, bind_endpoint: str, topics: Iterable[str], interval_s: float = 0.25) -> None:
    ctx = zmq.Context.instance()
    pub = ctx.socket(zmq.PUB)
    pub.bind(bind_endpoint)

    # Slow-joiner workaround: give subscriber time to connect + set SUBSCRIBE
    time.sleep(0.8)

    seq = 0
    topics_b = [t.encode("utf-8") for t in topics]

    while True:
        now = time.time()
        for topic in topics_b:
            msg = build_telemetry_flatbuf(
                seq=seq,
                ts=now,
                temp=20.0 + (seq % 10) * 0.5,
                msg=f"from {pub_name} on {topic.decode()}",
            )
            # multipart: [topic][flatbuffer_bytes]
            pub.send_multipart([topic, msg], copy=False)
        seq += 1
        time.sleep(interval_s)


def subscriber_proc(connect_endpoints: list[str], subscribe_prefixes: Iterable[str]) -> None:
    ctx = zmq.Context.instance()
    sub = ctx.socket(zmq.SUB)

    # Subscribe to selected topic prefixes (prefix match!)
    for p in subscribe_prefixes:
        sub.setsockopt(zmq.SUBSCRIBE, p.encode("utf-8"))

    for ep in connect_endpoints:
        sub.connect(ep)

    print("SUB connected to:")
    for ep in connect_endpoints:
        print("  ", ep)

    print("SUB subscriptions:")
    for p in subscribe_prefixes:
        print("  ", p)

    while True:
        topic_b, fb_buf = sub.recv_multipart()
        t = Telemetry.Telemetry.GetRootAsTelemetry(fb_buf, 0)

        print(
            f"[{topic_b.decode()}] "
            f"seq={t.Seq()} ts={t.Ts():.3f} temp={t.Temp():.2f} msg='{t.Msg()}'"
        )


def main() -> None:
    mp.set_start_method("spawn", force=True)

    publishers = [
        {
            "name": "pubA",
            "bind": "tcp://127.0.0.1:5555",
            "topics": ["telemetry/", "imu/", "debug/"],
        },
        {
            "name": "pubB",
            "bind": "tcp://127.0.0.1:5556",
            "topics": ["telemetry/", "cam/", "status/"],
        },
    ]

    # Subscriber only wants these prefixes:
    subscribe_prefixes = ["telemetry/", "cam/"]

    pub_procs: list[mp.Process] = []
    sub_proc: mp.Process | None = None

    try:
        for p in publishers:
            proc = mp.Process(
                target=publisher_proc,
                args=(p["name"], p["bind"], p["topics"]),
                daemon=True,
            )
            proc.start()
            pub_procs.append(proc)

        sub_proc = mp.Process(
            target=subscriber_proc,
            args=([p["bind"] for p in publishers], subscribe_prefixes),
            daemon=True,
        )
        sub_proc.start()

        # Keep parent alive
        while True:
            time.sleep(1)

    except KeyboardInterrupt:
        print("\nStopping...")

    finally:
        if sub_proc is not None and sub_proc.is_alive():
            sub_proc.terminate()
            sub_proc.join(timeout=1.0)

        for proc in pub_procs:
            if proc.is_alive():
                proc.terminate()
        for proc in pub_procs:
            proc.join(timeout=1.0)


if __name__ == "__main__":
    main()

RAW

Raw data = an opaque sequence of bytes.

Python object Valid raw ZMQ data?
bytes
bytearray
memoryview ✅ (best for zero-copy)
numpy.ndarray buffer ✅ (via memoryview)
zmq.Frame
1
2
3
4
5
6
# Sender
arr = np.ascontiguousarray(arr)
sock.send_multipart(
    [meta_bytes, memoryview(arr)],
    copy=False
)

1
2
3
4
5
6
7
# Receiver
meta, frame = sock.recv_multipart(copy=False)
arr = np.frombuffer(frame, dtype=...,).reshape(...)

# The NumPy array shares memory with the ZMQ frame.

# That memory is only valid until the next recv or socket close.

Demo: Send image with metadata

Code
#!/usr/bin/env python3
"""
Minimal ZMQ PUB/SUB image demo
Metadata: frame_id only (struct-packed uint32)
Message layout:
  [ topic ][ meta(frame_id) ][ payload(raw image bytes) ]
"""
import time
import struct
import multiprocessing as mp

import numpy as np
import zmq


# ----------------------------
# Metadata (just frame_id)
# ----------------------------
# little-endian uint32
META_FMT = "<I"
META_SIZE = struct.calcsize(META_FMT)


def pack_meta(frame_id: int) -> bytes:
    return struct.pack(META_FMT, frame_id)


def unpack_meta(b: bytes) -> int:
    if len(b) != META_SIZE:
        raise ValueError("bad meta size")
    (frame_id,) = struct.unpack(META_FMT, b)
    return frame_id


# ----------------------------
# Publisher
# ----------------------------
def publisher_proc(bind_ep: str, topic: bytes = b"cam/", fps: float = 5.0) -> None:
    ctx = zmq.Context.instance()
    pub = ctx.socket(zmq.PUB)
    pub.bind(bind_ep)

    # Slow joiner workaround
    time.sleep(0.8)

    frame_id = 0
    period = 1.0 / fps

    # Example image: 240x320 grayscale uint8
    h, w = 240, 320

    while True:
        img = np.random.randint(0, 256, size=(h, w), dtype=np.uint8)
        img = np.ascontiguousarray(img)

        meta = pack_meta(frame_id)

        # Multipart: [topic][meta][payload]
        pub.send_multipart(
            [topic, meta, memoryview(img)],
            copy=False,
        )

        frame_id += 1
        time.sleep(period)


# ----------------------------
# Subscriber
# ----------------------------
def subscriber_proc(connect_ep: str, subscribe_prefix: bytes = b"cam/") -> None:
    ctx = zmq.Context.instance()
    sub = ctx.socket(zmq.SUB)
    sub.setsockopt(zmq.SUBSCRIBE, subscribe_prefix)
    sub.connect(connect_ep)

    print("SUB connected to:", connect_ep)

    while True:
        topic_f, meta_f, payload_f = sub.recv_multipart(copy=False)

        frame_id = unpack_meta(bytes(meta_f))

        # Zero-copy view of payload
        buf = memoryview(payload_f)

        # Reconstruct image (we must know shape/dtype out-of-band here)
        img = np.frombuffer(buf, dtype=np.uint8).reshape((240, 320))

        print(f"topic={bytes(topic_f).decode("utf-8")} frame_id={frame_id} img.shape={img.shape}")


# ----------------------------
# Main (multiprocessing)
# ----------------------------
def main() -> None:
    mp.set_start_method("spawn", force=True)

    ep = "tcp://127.0.0.1:5555"

    pub_p = mp.Process(target=publisher_proc, args=(ep,), daemon=True)
    sub_p = mp.Process(target=subscriber_proc, args=(ep,), daemon=True)

    pub_p.start()
    sub_p.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        print("\nStopping...")
    finally:
        for p in (sub_p, pub_p):
            if p.is_alive():
                p.terminate()
        for p in (sub_p, pub_p):
            p.join(timeout=1.0)


if __name__ == "__main__":
    main()

MemoryView

memoryview is a zero-copy view onto an existing bytes-like buffer.

  • It does not allocate a new bytes object.
  • It lets you slice/read (and sometimes write) without copying.
  • It works with anything that supports the buffer protocol: bytes, bytearray, array, NumPy arrays, and zmq.Frame.

TODO: code example