Dealer Router Pattern in NNG
programming
NNG is a broker-less message queue that is a lightweight alternative to ZeroMQ and supersedes the older nanomsg project. It offers a more “orthogonal” API and does a few things better than ZeroMQ.
- Thread-safe sockets
- Websocket transport support
- POSIX-compliant sockets API
- True zero-copy messaging
A common messaging pattern is Dealer-Router which can be easily implemented using ZeroMQ’s DEALER and ROUTER sockets. Unfortunately, NNG does not have such sockets and implementing the pattern is not trivial. Fortunately, using NNG’s raw mode sockets we can emulate Dealer-Router.
How to implement Dealer-Router in NNG
Let’s first import pynng:
import pynng
Each message sent from a dealer (or request) contains a header used by the router to route a message back to the dealer. We can define a function to extract this header:
def extract_header(msg: pynng.Message) -> bytes:
header_p = pynng.lib.nng_msg_header(msg._nng_msg)
size = pynng.lib.nng_msg_header_len(msg._nng_msg)
return bytes(pynng.ffi.buffer(header_p, size))
We can define helper functions to create dealer request messages and router reply messages. A “cooked” socket handles the header automatically but because we’re using raw sockets, we’ll have to modify the header ourselves.
def create_request_message(data: bytes) -> pynng.Message:
msg = pynng.Message(data)
pynng.lib.nng_msg_header_append_u32(msg._nng_msg, 0x80000000)
return msg
def create_reply_message(header: bytes, data: bytes) -> pynng.Message:
msg = pynng.Message(data)
pynng.lib.nng_msg_header_append(msg._nng_msg, header, len(header))
return msg
Now we can open our raw sockets for Dealer-Router:
router = pynng.Rep0(opener=pynng.lib.nng_rep0_open_raw)
router.listen("tcp://127.0.0.1:8000")
dealer = pynng.Req0(opener=pynng.lib.nng_req0_open_raw)
dealer.dial("tcp://127.0.0.1:8000")
Sending one message from the dealer, we can see the router receives the message:
dealer.send_msg(create_request_message(b"one"))
msg = router.recv_msg()
assert msg.bytes == b"one"
To reply to the dealer, we need to extract the header and send reply messages using the extracted header:
header = extract_header(msg)
router.send_msg(create_reply_message(header, b"two"))
router.send_msg(create_reply_message(header, b"three"))
msg = dealer.recv_msg()
assert msg.bytes == b"two"
msg = dealer.recv_msg()
assert msg.bytes == b"three"
Message Polling
To poll messages from the router, we can use Python’s select module
import select
dealer.send_msg(create_request_message(b"ping"))
select.select([router.recv_fd], [], [router.recv_fd])
msg = router.recv_msg(block=False)
assert msg.bytes == b"ping"
router.send_msg(create_reply_message(extract_header(msg), b"pong"))
select.select([dealer.recv_fd], [], [dealer.recv_fd])
msg = dealer.recv_msg(block=False)
assert msg.bytes == b"pong"
Tracking Dealer Identities
The first 4-bytes of the header is known as the identity (also known as the peer ID) and uniquely identifies a dealer.
If we open a second dealer, the router can use the identities to differentiate between messages sent from different dealers.
dealer2 = pynng.Req0(opener=pynng.lib.nng_req0_open_raw)
dealer2.dial("tcp://127.0.0.1:8000")
dealer.send_msg(create_request_message(b"dealer"))
dealer2.send_msg(create_request_message(b"dealer2"))
identities = {}
for _ in range(2):
msg = router.recv_msg()
identities[extract_header(msg)[:4]] = msg.bytes
dealer.send_msg(create_request_message(b""))
msg = router.recv_msg()
assert identities[extract_header(msg)[:4]] == b"dealer"
dealer2.send_msg(create_request_message(b""))
msg = router.recv_msg()
assert identities[extract_header(msg)[:4]] == b"dealer2"
I hope this quick overview of the Dealer-Router pattern in NNG was helpful. Message onwards! 📨