ZMQ
| Scope |
Transport |
Performance |
Complexity |
Recommendation |
| Same process |
inproc:// |
⭐⭐⭐⭐⭐ |
⭐ |
✅ Best |
| Same machine |
ipc:// |
⭐⭐⭐⭐ |
⭐⭐ |
✅ Best |
| Network |
tcp:// |
⭐⭐⭐ |
⭐⭐ |
✅ Best |
| Any |
shm |
⭐⭐⭐⭐ |
⭐⭐⭐⭐⭐ |
❌ Avoid |
| Pattern |
Direction |
Fan-out |
Reliable |
Typical use |
| REQ/REP |
1↔1 |
❌ |
✅ |
RPC |
| PUB/SUB |
1→N |
✅ |
❌ |
Events, streaming |
| PUSH/PULL |
1→N |
❌ |
✅ |
Work queues |
| DEALER/ROUTER |
N↔N |
✅ |
✅ |
Async services |
| XPUB/XSUB |
1→N |
✅ |
❌ |
Brokers |
| PAIR |
1↔1 |
❌ |
✅ |
Simple pipes |
inproc
Communication in same process
| import zmq
import threading
import time
ctx = zmq.Context() # type: ignore[attr-defined]
def publisher():
pub = ctx.socket(zmq.PUB) # type: ignore[attr-defined]
pub.bind("inproc://events")
time.sleep(0.1) # allow subscribers to connect
pub.send_multipart([b"topic", b"hello"])
def subscriber():
sub = ctx.socket(zmq.SUB) # type: ignore[attr-defined]
sub.connect("inproc://events")
sub.setsockopt(zmq.SUBSCRIBE, b"topic") # type: ignore[attr-defined]
print(sub.recv_multipart())
t1 = threading.Thread(target=subscriber)
t2 = threading.Thread(target=publisher)
t1.start()
t2.start()
t1.join()
t2.join()
|
ipc
Same machine different process
| import zmq
import time
import os
import multiprocessing as mp
SOCKET_PATH = "/tmp/events.ipc"
def publisher():
# Clean stale socket
if os.path.exists(SOCKET_PATH):
os.unlink(SOCKET_PATH)
ctx = zmq.Context() # type: ignore[attr-defined]
pub = ctx.socket(zmq.PUB) # type: ignore[attr-defined]
pub.bind(f"ipc://{SOCKET_PATH}")
time.sleep(0.2) # allow subscribers to connect
pub.send_multipart([b"topic", b"hello"])
time.sleep(0.1)
pub.close()
ctx.term()
def subscriber():
ctx = zmq.Context() # type: ignore[attr-defined]
sub = ctx.socket(zmq.SUB) # type: ignore[attr-defined]
sub.connect(f"ipc://{SOCKET_PATH}")
sub.setsockopt(zmq.SUBSCRIBE, b"topic") # type: ignore[attr-defined]
print(sub.recv_multipart())
sub.close()
ctx.term()
if __name__ == "__main__":
mp.set_start_method("spawn", force=True) # safe default
p_sub = mp.Process(target=subscriber)
p_pub = mp.Process(target=publisher)
p_sub.start()
time.sleep(0.1) # ensure SUB starts first
p_pub.start()
p_pub.join()
p_sub.join()
|
tcp
| import zmq
import time
import multiprocessing as mp
ENDPOINT = "tcp://127.0.0.1:5555"
def publisher():
ctx = zmq.Context() # type: ignore[attr-defined]
pub = ctx.socket(zmq.PUB) # type: ignore[attr-defined]
pub.bind(ENDPOINT)
time.sleep(0.2) # allow subscribers to connect
pub.send_multipart([b"topic", b"hello"])
time.sleep(0.1)
pub.close()
ctx.term()
def subscriber():
ctx = zmq.Context() # type: ignore[attr-defined]
sub = ctx.socket(zmq.SUB) # type: ignore[attr-defined]
sub.connect(ENDPOINT)
sub.setsockopt(zmq.SUBSCRIBE, b"topic") # type: ignore[attr-defined]
print(sub.recv_multipart())
sub.close()
ctx.term()
if __name__ == "__main__":
mp.set_start_method("spawn", force=True)
p_sub = mp.Process(target=subscriber)
p_pub = mp.Process(target=publisher)
p_sub.start()
time.sleep(0.1) # SUB first (important)
p_pub.start()
p_pub.join()
p_sub.join()
|