ZMQ with shm

Using shm to publish large data and zmq as metadata signaling control

import zmq
import numpy as np
import time
import multiprocessing as mp
from multiprocessing import shared_memory
from dataclasses import dataclass, asdict
from typing import Tuple, Dict, Any, cast

# ================= CONFIG =================
HEIGHT = 480
WIDTH = 640
CHANNELS = 3
DTYPE = np.uint8
SHAPE = (HEIGHT, WIDTH, CHANNELS)

ENDPOINT = "ipc:///tmp/image_stream.ipc"
FPS = 5
NUM_FRAMES = 10
# =========================================


# ================= METADATA =================
@dataclass(frozen=True)
class ImageMeta:
    shm_name: str
    shape: Tuple[int, int, int]
    dtype: str
    frame_id: int
    timestamp: float

    @classmethod
    def create(cls, shm_name, shape, dtype, frame_id):
        return cls(
            shm_name=shm_name,
            shape=shape,
            dtype=dtype,
            frame_id=frame_id,
            timestamp=time.time(),
        )
# ===========================================


def publisher():
    """Generate images → write to SHM → notify via ZMQ"""

    size = np.prod(SHAPE) * np.dtype(DTYPE).itemsize

    # Create SHM once
    shm = shared_memory.SharedMemory(create=True, size=int(size))
    img = np.ndarray(SHAPE, dtype=DTYPE, buffer=shm.buf)

    ctx = zmq.Context()
    pub = ctx.socket(zmq.PUB)
    pub.bind(ENDPOINT)

    time.sleep(0.3)  # allow subscriber to connect

    for frame_id in range(NUM_FRAMES):
        # Generate new image directly INTO SHM
        img[:] = np.random.randint(0, 256, SHAPE, dtype=DTYPE)

        meta = ImageMeta.create(
            shm_name=shm.name,
            shape=SHAPE,
            dtype="uint8",
            frame_id=frame_id,
        )

        pub.send_json(asdict(meta))
        print(f"[PUB] frame {frame_id}")

        time.sleep(1.0 / FPS)

    pub.close()
    ctx.term()
    shm.close()
    shm.unlink()


def subscriber():
    """Wait for ZMQ signal → read image from SHM"""

    ctx = zmq.Context()
    sub = ctx.socket(zmq.SUB)
    sub.connect(ENDPOINT)
    sub.setsockopt(zmq.SUBSCRIBE, b"")

    last_frame = -1

    while True:
        raw: Dict = cast(Dict[str, Any], sub.recv_json())
        meta = ImageMeta(**raw)

        shm = shared_memory.SharedMemory(name=meta.shm_name)
        img = np.ndarray(
            meta.shape,
            dtype=np.dtype(meta.dtype),
            buffer=shm.buf,
        )

        # Detect dropped frames (normal behavior)
        if last_frame != -1 and meta.frame_id != last_frame + 1:
            print(f"[SUB] dropped {last_frame + 1}..{meta.frame_id - 1}")

        last_frame = meta.frame_id

        print(
            f"[SUB] frame {meta.frame_id}, "
            f"first pixel = {img[0, 0]}"
        )

        shm.close()

        if meta.frame_id >= NUM_FRAMES - 1:
            break

    sub.close()
    ctx.term()


if __name__ == "__main__":
    mp.set_start_method("spawn", force=True)

    p_sub = mp.Process(target=subscriber)
    p_pub = mp.Process(target=publisher)

    p_sub.start()
    time.sleep(0.1)  # SUB first (important)
    p_pub.start()

    p_pub.join()
    p_sub.join()