3 Helper module for example applications. 4 Mimics ZeroMQ Guide's zhelpers.h. 5 Extended to generalize SOCKET/ROUTER and CLIENT/DEALER 11 from random
import randint
16 """libzmq 2/3/4 compatible sethwm""" 18 socket.sndhwm = socket.rcvhwm = hwm
19 except AttributeError:
24 """Receives all message parts from socket, printing each frame neatly""" 26 if isinstance(msg_or_socket, zmq.Socket):
28 if msg_or_socket.type == zmq.SERVER:
31 msg = msg_or_socket.recv_multipart()
34 print(
"----------------------------------------")
36 print(
"[%03d]" % len(part), end=
' ')
39 print(part.decode(
'ascii'))
40 except UnicodeDecodeError:
41 print(
r"0x%s" % (binascii.hexlify(part).decode(
'ascii')))
45 """Set simple random printable identity on socket""" 46 identity =
u"%04x-%04x" % (randint(0, 0x10000), randint(0, 0x10000))
47 zsocket.setsockopt_string(zmq.IDENTITY, identity)
51 """build inproc pipe for talking to threads 53 mimic pipe used in czmq zthread_fork. 55 Returns a pair of PAIRs connected via inproc 57 a = ctx.socket(zmq.PAIR)
58 b = ctx.socket(zmq.PAIR)
59 a.linger = b.linger = 0
61 iface =
"inproc://%s" % binascii.hexlify(os.urandom(8))
77 Encode parts in a CZMQ-compatible (zmsg_decode()/zmsg_encode()) 80 A part may be either a PyZMQ zmq.Frame or bytes. 82 Parts are serialized to bytes and prefixing with a size. Parts 83 smaller than 255 bytes are prefixed with a 1-byte size value. 84 Larger parts are prefixed by a fixed 1-byte value of 0xFF and a 89 if isinstance(p, zmq.Frame):
93 s = struct.pack(
'=B', siz)
95 s = struct.pack(
'=BI', 0xFF, siz)
103 Decode parts in a CZMQ-compatible way. 105 This provides the inverse function of encode_message() with the 106 exception that each part in the returned list is of type bytes, 115 raise ValueError(
"corrupt message part in size")
116 size = struct.unpack(
'=B',encoded[beg:end])[0]
122 raise ValueError(
"corrupt message part in size")
123 size = struct.unpack(
'=I',encoded[beg:end])[0]
128 raise ValueError(
"corrupt message part in data")
129 ret.append(encoded[beg:end])
135 '''Return a message from a serverish socket. 137 The socket may be of type ROUTER or SERVER. Return list of 141 if sock.type == zmq.SERVER:
142 frame = sock.recv(copy=
False)
143 cid = frame.routing_id
147 if sock.type == zmq.ROUTER:
148 msg = sock.recv_multipart(*args, **kwds)
151 assert (empty == b
'')
154 raise ValueError(f
'unsupported socket type {sock.type}')
157 '''Send a message via a serverish socket.''' 158 if not isinstance(msg, list):
161 if sock.type == zmq.SERVER:
163 if isinstance(cid, bytes):
165 cid = int.from_bytes(cid, sys.byteorder)
166 frame.routing_id = cid
167 return sock.send(frame)
169 if sock.type == zmq.ROUTER:
170 msg = [cid, b
''] + msg
171 return sock.send_multipart(msg)
173 raise ValueError(f
'unsupported socket type {sock.type}')
176 '''Receive a message via a clientish socket''' 178 if sock.type == zmq.CLIENT:
179 frame = sock.recv(copy=
False, *args, **kwds)
183 if sock.type == zmq.DEALER:
184 msg = sock.recv_multipart(*args, **kwds)
189 raise ValueError(f
'unsupported socket type {sock.type}')
192 '''Send a message via a clientish socket''' 193 if not isinstance(msg, list):
196 if sock.type == zmq.CLIENT:
198 return sock.send(frame, *args, **kwds)
200 if sock.type == zmq.DEALER:
202 return sock.send_multipart(msg, *args, **kwds)
204 raise ValueError(f
'unsupported socket type {sock.type}')
def clientish_recv(sock, args, kwds)
def clientish_send(sock, msg, args, kwds)
def decode_message(encoded)
def serverish_send(sock, cid, msg, args, kwds)
def socket_set_hwm(socket, hwm=-1)
def serverish_recv(sock, args, kwds)
def encode_message(parts)