Skip to content

Client/Server Req/Rep zmq pattern with msgpack

Synchronous messaging pattern where one side (the Requester) sends a request, and the other side (the Replier) sends a response.

  • socket zmq.REQ will block on send unless it has successfully received a reply back.
  • socket zmq.REP will block on recv unless it has received a request.

Tip

If you want to set a timeout on the REQ socket, you can use the zmq.POLLIN flag with zmq.Poller to check if there is a message to receive. or use RCVTIMEO on the socket.

install

install dependencies
pip install msgpack
pip install pyzmq

Demo: Req/Rep zmq pattern with msgpack

ZMQ Req/Rep zmq pattern with python dataclasses and msgpack

code
req/rep
import multiprocessing
import logging
from dataclasses import dataclass, asdict
import msgpack
import zmq

FMT = "%(asctime)s - %(lineno)s - %(levelname)s - %(message)s"
logging.basicConfig(format=FMT, level=logging.INFO)

log = logging.getLogger(__name__)

TOPIC = b"topic"
SERVICE_PORT = 5555


@dataclass
class Data_Request:
    f_int: int
    f_float: float
    f_string: str


@dataclass
class Data_Response:
    success: bool


def server():
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind(f"tcp://*:{SERVICE_PORT}")
    topic, data = socket.recv_multipart()

    msg = msgpack.unpackb(data)
    log.info(f"server get request: {msg}")

    response = Data_Response(success=True)
    data = msgpack.packb(asdict(response))
    socket.send(data)


def client():
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect(f"tcp://127.0.0.1:{SERVICE_PORT}")

    # Create request msg
    msg = Data_Request(1, 2.0, "string")
    raw = asdict(msg)
    data = msgpack.packb(raw)
    socket.send_multipart([TOPIC, data])

    # Recv response from server
    data = socket.recv()
    # unpack socket data
    raw = msgpack.unpackb(data)
    # Convert to msg
    msg = Data_Response(**raw)
    log.info(f"server response: {msg.success}")


if __name__ == "__main__":
    p_server = multiprocessing.Process(target=server)
    p_client = multiprocessing.Process(target=client)
    p_client.start()
    p_server.start()

    p_server.join()
    p_client.join()

Demo2 : Using socket RCVTIMEO

code
import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")

# Set both send and receive timeouts
socket.setsockopt(zmq.RCVTIMEO, 5000)  # 5-second receive timeout
socket.setsockopt(zmq.SNDTIMEO, 5000)  # 5-second send timeout
socket.setsockopt(zmq.LINGER, 0)       # Prevent blocking on close

try:
    print("Sending request...")
    socket.send(b"Hello")  # Send request (this can block)

    message = socket.recv()  # Wait for response (times out after 5 sec)
    print(f"Received reply: {message.decode()}")

except zmq.error.Again:
    print("Timeout: No response received, exiting.")

finally:
    socket.close()  # Ensure socket is closed properly
    context.term()  # Terminate the context to free resources

print("Program exited.")

Demo3 : Using zmq.Poller

code
import zmq

context = zmq.Context()
timeout_ms = 5000  # 5-second timeout
max_retries = 3    # Number of retries

for attempt in range(1, max_retries + 1):
    print(f"Attempt {attempt}: Sending request...")

    # Create a new REQ socket for each attempt
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:5555")
    socket.setsockopt(zmq.LINGER, 0)  # Prevent blocking on close

    poller = zmq.Poller()
    poller.register(socket, zmq.POLLIN)  # Monitor socket for a response

    try:
        socket.send(b"Hello")  # Send request

        socks = dict(poller.poll(timeout_ms))  # Wait for response with timeout

        if socket in socks:  # If response is received
            message = socket.recv()
            print(f"Received reply: {message.decode()}")
            socket.close()
            break  # Exit loop on success
        else:
            print("Timeout: No response received.")

    except zmq.ZMQError as e:
        print(f"ZeroMQ Error: {e}")

    finally:
        socket.close()  # Ensure the socket is properly closed

    if attempt == max_retries:
        print("Max retries reached. Exiting.")

context.term()  # Clean up context
print("Program exited.")