3 Python interface to ZIO messages 9 from collections
import namedtuple
24 from .util
import byteify_list
27 A ZIO message prefix header. 29 It is a triplet of (level, form, label) with a "ZIO" leading 32 level = MessageLevel.undefined
36 def __init__(self, *args, level=0, form=" "*4, label=""):
38 Create a prefix header 40 >>> PrefixHeader(0, "FLOW", json.dumps(dict(flow="DAT"))) 41 >>> PrefixHeader(form="TEXT", level=3) 49 if type(args[0]) == int:
52 self.
form=
"%-4s"%args[1]
57 if type(args[0]) == str
and len(args[0]) > 5:
59 if phs.startswith(
"ZIO"):
63 self.
form=
"%-4s"%phs[:4]
66 if type(args[0]) == bytes:
74 return "<zio.message.PrefixHeader %s>" % bytes(self)
77 a = b
'ZIO%d' % self.
level.value
78 b = bytes(self.
form,
'utf-8')
79 c = bytes(self.
label,
'utf-8')
87 def __init__(self, *args, origin=0, granule=0, seqno=0):
96 if type(args[0]) == bytes:
107 return "<zio.message.CoordHeader %s>" % bytes(self)
112 A zio.Message fixes some of the message schema. 114 It is equivalent to a C++ zio::Message. 125 level=None, form=None, label=None, routing_id=None,
126 origin=None, granule=None, seqno=None,
127 prefix=None, coord=None, payload=None,
128 parts=None, encoded=None, frame=None):
129 '''Construct a zio.Message. 131 Construction applies arguments in reverse order. Thus one 132 may, eg, construct a message with a frame and then override 133 the payload and label. The ingredients may be considered 136 frame = encoding + routing_id 138 encoding = packing of parts 140 parts = [prefix, coord, ...payloads] 142 A frame should be used when the zio.Message will be used with 143 a SERVER socket. Or else the routing_id must be explicitly 150 if frame
is not None:
152 if encoded
is not None:
154 if parts
is not None:
156 if payload
is not None:
158 if coord
is not None:
160 if prefix
is not None:
162 if routing_id
is not None:
164 if label
is not None:
168 if level
is not None:
170 if origin
is not None:
172 if granule
is not None:
174 if seqno
is not None:
204 return json.loads(self.
decode(
'utf-8'))
205 return json.loads(self.
label)
209 self.
label = json.dumps(val)
213 return self.
coord.origin
216 self.
coord.origin = val
220 return self.
coord.granule
223 self.
coord.granule = val
227 return self.
coord.seqno
230 self.
coord.seqno = val
243 Return self as a frame. 245 frame = zmq.Frame(self.
encode())
247 frame.routing_id = routing_id
252 Set self from a frame 259 Return encoded byte array of self. 261 It is suitable for use as the data arg to a zmq.Frame 276 Return self as a multipart set of encoded data 282 Set self from multipart message / array of encoded data. 285 raise ValueError(
"must have at least two parts")
291 return "zio.Message: \"%s\" + %s + [%d]" % \
297 Return a binary encoded concatenation of parts in the input 298 sequence. Result is suitable for use as a single-part message. 299 For ZIO messages the first two parts should be encoded header 300 prefix and coord, respectively. Subseqent parts can be payload of 305 s = struct.pack(
'I', len(p))
311 Unpack an encoded single-part ZIO message such as returned by 312 socket.recv() on a SERVER socket. It's the moral opposite of 313 zio::Message::encode(). What is returned is sequence of message 322 raise ValueError(
"corrupt ZIO message in size")
323 size = struct.unpack(
'i',encoded[beg:end])[0]
327 raise ValueError(
"corrupt ZIO message in data")
328 ret.append(encoded[beg:end])
334 Return a binary encoded header prefix suitable for use as a 337 pre =
'ZIO%d%-4s' % (level, mform[:4])
340 return struct.pack(
'I', len(pre)) + pre
344 Parse the bytes of one encoded message part into a ZIO message 345 header prefix. This is usually the first part of a multipart 346 message or as returned by decode(). Returns tuple (level, format, 347 label) or None if parse error. 349 if henc[:3] != b
'ZIO':
353 level = henc[3]-ord(
'0')
354 mform = henc[4:8].decode()
355 label = henc[8:].decode()
356 return (level, mform, label)
360 Return a binary encoded header coord suitable for use as a message 361 part. Arguments are taken to be 64 bit unsigned ints. 363 return struct.pack(
'LLL', origin, granule, seqno);
368 Parse the bytes of one encoded message part into a ZIO message 369 header coord. This is ususally the second part of a multipart 370 message or as returend by decode(). Returns tuple (origin, 371 granule, seqno) or None if parse error. 375 return struct.unpack(
'LLL', henc);
def encode_message(parts)
def decode(self, encoded)
def __init__(self, level=None, form=None, label=None, routing_id=None, origin=None, granule=None, seqno=None, prefix=None, coord=None, payload=None, parts=None, encoded=None, frame=None)
def decode_message(encoded)
def decode_header_prefix(henc)
def encode_header_coord(origin, granule, seqno)
def encode_header_prefix(mform="FLOW", level=0, label="")
def fromframe(self, frame)
def fromparts(self, parts)
def decode_header_coord(henc)