Skip to content

ZMQ

Scope Transport Performance Complexity Recommendation
Same process inproc:// ⭐⭐⭐⭐⭐ ✅ Best
Same machine ipc:// ⭐⭐⭐⭐ ⭐⭐ ✅ Best
Network tcp:// ⭐⭐⭐ ⭐⭐ ✅ Best
Any shm ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ❌ Avoid
Pattern Direction Fan-out Reliable Typical use
REQ/REP 1↔1 RPC
PUB/SUB 1→N Events, streaming
PUSH/PULL 1→N Work queues
DEALER/ROUTER N↔N Async services
XPUB/XSUB 1→N Brokers
PAIR 1↔1 Simple pipes

inproc

Communication in same process

share the same context

import zmq
import threading
import time

ctx = zmq.Context() # type: ignore[attr-defined]

def publisher():
    pub = ctx.socket(zmq.PUB) # type: ignore[attr-defined]
    pub.bind("inproc://events")
    time.sleep(0.1)  # allow subscribers to connect
    pub.send_multipart([b"topic", b"hello"])

def subscriber():
    sub = ctx.socket(zmq.SUB) # type: ignore[attr-defined]
    sub.connect("inproc://events")
    sub.setsockopt(zmq.SUBSCRIBE, b"topic") # type: ignore[attr-defined]
    print(sub.recv_multipart())

t1 = threading.Thread(target=subscriber)
t2 = threading.Thread(target=publisher)

t1.start()
t2.start()

t1.join()
t2.join()

ipc

Same machine different process

import zmq
import time
import os
import multiprocessing as mp

SOCKET_PATH = "/tmp/events.ipc"

def publisher():
    # Clean stale socket
    if os.path.exists(SOCKET_PATH):
        os.unlink(SOCKET_PATH)

    ctx = zmq.Context()  # type: ignore[attr-defined]
    pub = ctx.socket(zmq.PUB)  # type: ignore[attr-defined]
    pub.bind(f"ipc://{SOCKET_PATH}")

    time.sleep(0.2)  # allow subscribers to connect
    pub.send_multipart([b"topic", b"hello"])
    time.sleep(0.1)

    pub.close()
    ctx.term()

def subscriber():
    ctx = zmq.Context()  # type: ignore[attr-defined]
    sub = ctx.socket(zmq.SUB)  # type: ignore[attr-defined]

    sub.connect(f"ipc://{SOCKET_PATH}")
    sub.setsockopt(zmq.SUBSCRIBE, b"topic")  # type: ignore[attr-defined]

    print(sub.recv_multipart())

    sub.close()
    ctx.term()

if __name__ == "__main__":
    mp.set_start_method("spawn", force=True)  # safe default

    p_sub = mp.Process(target=subscriber)
    p_pub = mp.Process(target=publisher)

    p_sub.start()
    time.sleep(0.1)   # ensure SUB starts first
    p_pub.start()

    p_pub.join()
    p_sub.join()

tcp

import zmq
import time
import multiprocessing as mp

ENDPOINT = "tcp://127.0.0.1:5555"

def publisher():
    ctx = zmq.Context()  # type: ignore[attr-defined]
    pub = ctx.socket(zmq.PUB)  # type: ignore[attr-defined]
    pub.bind(ENDPOINT)

    time.sleep(0.2)  # allow subscribers to connect
    pub.send_multipart([b"topic", b"hello"])
    time.sleep(0.1)

    pub.close()
    ctx.term()

def subscriber():
    ctx = zmq.Context()  # type: ignore[attr-defined]
    sub = ctx.socket(zmq.SUB)  # type: ignore[attr-defined]

    sub.connect(ENDPOINT)
    sub.setsockopt(zmq.SUBSCRIBE, b"topic")  # type: ignore[attr-defined]

    print(sub.recv_multipart())

    sub.close()
    ctx.term()

if __name__ == "__main__":
    mp.set_start_method("spawn", force=True)

    p_sub = mp.Process(target=subscriber)
    p_pub = mp.Process(target=publisher)

    p_sub.start()
    time.sleep(0.1)  # SUB first (important)
    p_pub.start()

    p_pub.join()
    p_sub.join()