ZIO
Python and C++ interface to ZeroMQ and Zyre
zhelpers.py
Go to the documentation of this file.
1 # encoding: utf-8
2 """
3 Helper module for example applications.
4 Mimics ZeroMQ Guide's zhelpers.h.
5 Extended to generalize SOCKET/ROUTER and CLIENT/DEALER
6 """
7 
8 import sys
9 import binascii
10 import os
11 from random import randint
12 import struct
13 import zmq
14 
15 def socket_set_hwm(socket, hwm=-1):
16  """libzmq 2/3/4 compatible sethwm"""
17  try:
18  socket.sndhwm = socket.rcvhwm = hwm
19  except AttributeError:
20  socket.hwm = hwm
21 
22 
23 def dump(msg_or_socket):
24  """Receives all message parts from socket, printing each frame neatly"""
25  print(msg_or_socket)
26  if isinstance(msg_or_socket, zmq.Socket):
27  # it's a socket, call on current message
28  if msg_or_socket.type == zmq.SERVER:
29  msg = serverish_recv(msg_or_socket)
30  else:
31  msg = msg_or_socket.recv_multipart()
32  else:
33  msg = msg_or_socket
34  print("----------------------------------------")
35  for part in msg:
36  print("[%03d]" % len(part), end=' ')
37  is_text = True
38  try:
39  print(part.decode('ascii'))
40  except UnicodeDecodeError:
41  print(r"0x%s" % (binascii.hexlify(part).decode('ascii')))
42 
43 
44 def set_id(zsocket):
45  """Set simple random printable identity on socket"""
46  identity = u"%04x-%04x" % (randint(0, 0x10000), randint(0, 0x10000))
47  zsocket.setsockopt_string(zmq.IDENTITY, identity)
48 
49 
50 def zpipe(ctx):
51  """build inproc pipe for talking to threads
52 
53  mimic pipe used in czmq zthread_fork.
54 
55  Returns a pair of PAIRs connected via inproc
56  """
57  a = ctx.socket(zmq.PAIR)
58  b = ctx.socket(zmq.PAIR)
59  a.linger = b.linger = 0
60  a.hwm = b.hwm = 1
61  iface = "inproc://%s" % binascii.hexlify(os.urandom(8))
62  a.bind(iface)
63  b.connect(iface)
64  return a,b
65 
66 
67 # send/recv message for server-like (ROUTER, SERVER) and client-like
68 # (DEALER, CLIENT) sockets which does type-erasure on the socket type.
69 # Serverish deals in a 2-tuple (pid, msg) while clientish deals with
70 # just a msg. In all cases a msg is a list of zmq.Frame elements.
71 # SERVER and CLIENT inherently pass only single-part messages and so
72 # this list of frames is packed/unpacked (even if list is length 1).
73 # For ROUTER/DEALER it is sent as a multipart.
74 
75 def encode_message(parts):
76  '''
77  Encode parts in a CZMQ-compatible (zmsg_decode()/zmsg_encode())
78  way.
79 
80  A part may be either a PyZMQ zmq.Frame or bytes.
81 
82  Parts are serialized to bytes and prefixing with a size. Parts
83  smaller than 255 bytes are prefixed with a 1-byte size value.
84  Larger parts are prefixed by a fixed 1-byte value of 0xFF and a
85  4-byte size value.
86  '''
87  ret = b''
88  for p in parts:
89  if isinstance(p, zmq.Frame):
90  p = p.bytes
91  siz = len(p)
92  if siz < 255:
93  s = struct.pack('=B', siz)
94  else:
95  s = struct.pack('=BI', 0xFF, siz)
96  one = s + p
97  ret += one
98 
99  return ret
100 
101 def decode_message(encoded):
102  '''
103  Decode parts in a CZMQ-compatible way.
104 
105  This provides the inverse function of encode_message() with the
106  exception that each part in the returned list is of type bytes,
107  an not zmq.Frame.
108  '''
109  tot = len(encoded)
110  ret = list()
111  beg = 0
112  while beg < tot:
113  end = beg + 1 # small size of 0xFF
114  if end >= tot:
115  raise ValueError("corrupt message part in size")
116  size = struct.unpack('=B',encoded[beg:end])[0]
117  beg = end
118 
119  if size == 0xFF: # large message
120  end = beg + 4
121  if end >= tot:
122  raise ValueError("corrupt message part in size")
123  size = struct.unpack('=I',encoded[beg:end])[0]
124  beg = end
125 
126  end = beg + size
127  if end > tot:
128  raise ValueError("corrupt message part in data")
129  ret.append(encoded[beg:end])
130  beg = end
131  return ret
132 
133 
134 def serverish_recv(sock, *args, **kwds):
135  '''Return a message from a serverish socket.
136 
137  The socket may be of type ROUTER or SERVER. Return list of
138  [id,msg]
139 
140  '''
141  if sock.type == zmq.SERVER:
142  frame = sock.recv(copy=False)
143  cid = frame.routing_id
144  msg = decode_message(frame.bytes)
145  return [cid, msg]
146 
147  if sock.type == zmq.ROUTER:
148  msg = sock.recv_multipart(*args, **kwds)
149  cid = msg.pop(0)
150  empty = msg.pop(0)
151  assert (empty == b'')
152  return [cid, msg]
153 
154  raise ValueError(f'unsupported socket type {sock.type}')
155 
156 def serverish_send(sock, cid, msg, *args, **kwds):
157  '''Send a message via a serverish socket.'''
158  if not isinstance(msg, list):
159  msg = [msg]
160 
161  if sock.type == zmq.SERVER:
162  frame = zmq.Frame(data=encode_message(msg))
163  if isinstance(cid, bytes):
164  # hope-and-pray oriented programming
165  cid = int.from_bytes(cid, sys.byteorder)
166  frame.routing_id = cid
167  return sock.send(frame)
168 
169  if sock.type == zmq.ROUTER:
170  msg = [cid, b''] + msg
171  return sock.send_multipart(msg)
172 
173  raise ValueError(f'unsupported socket type {sock.type}')
174 
175 def clientish_recv(sock, *args, **kwds):
176  '''Receive a message via a clientish socket'''
177 
178  if sock.type == zmq.CLIENT:
179  frame = sock.recv(copy=False, *args, **kwds)
180  msg = decode_message(frame.bytes)
181  return msg
182 
183  if sock.type == zmq.DEALER:
184  msg = sock.recv_multipart(*args, **kwds)
185  empty = msg.pop(0)
186  assert empty == b''
187  return msg
188 
189  raise ValueError(f'unsupported socket type {sock.type}')
190 
191 def clientish_send(sock, msg, *args, **kwds):
192  '''Send a message via a clientish socket'''
193  if not isinstance(msg, list):
194  msg = [msg]
195 
196  if sock.type == zmq.CLIENT:
197  frame = zmq.Frame(data = encode_message(msg))
198  return sock.send(frame, *args, **kwds)
199 
200  if sock.type == zmq.DEALER:
201  msg = [b''] + msg
202  return sock.send_multipart(msg, *args, **kwds)
203 
204  raise ValueError(f'unsupported socket type {sock.type}')
205 
def clientish_recv(sock, args, kwds)
Definition: zhelpers.py:175
def set_id(zsocket)
Definition: zhelpers.py:44
def zpipe(ctx)
Definition: zhelpers.py:50
def clientish_send(sock, msg, args, kwds)
Definition: zhelpers.py:191
def decode_message(encoded)
Definition: zhelpers.py:101
def serverish_send(sock, cid, msg, args, kwds)
Definition: zhelpers.py:156
def socket_set_hwm(socket, hwm=-1)
Definition: zhelpers.py:15
def serverish_recv(sock, args, kwds)
Definition: zhelpers.py:134
def encode_message(parts)
Definition: zhelpers.py:75
def dump(msg_or_socket)
Definition: zhelpers.py:23