ZIO
Python and C++ interface to ZeroMQ and Zyre
message.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 '''
3 Python interface to ZIO messages
4 '''
5 
6 import zmq
7 import json
8 import struct
9 from collections import namedtuple
10 
11 
12 from enum import Enum
13 class MessageLevel(Enum):
14  undefined=0
15  trace=1
16  verbose=2
17  debug=3
18  info=4
19  summary=5
20  warning=6
21  error=7
22  fatal=8
23 
24 from .util import byteify_list
26  '''
27  A ZIO message prefix header.
28 
29  It is a triplet of (level, form, label) with a "ZIO" leading
30  magic.
31  '''
32  level = MessageLevel.undefined # zio::level::MessageLevel
33  form=" "*4 # up to 4 character format
34  label = "" # free form string
35 
36  def __init__(self, *args, level=0, form=" "*4, label=""):
37  '''
38  Create a prefix header
39 
40  >>> PrefixHeader(0, "FLOW", json.dumps(dict(flow="DAT")))
41  >>> PrefixHeader(form="TEXT", level=3)
42  '''
43  self.level = MessageLevel(int(level))
44  self.form="%-4s"%form
45  self.label=label
46  if not args:
47  return
48  # (level,form,label) constructor
49  if type(args[0]) == int:
50  self.level = MessageLevel(args[0])
51  if len(args) > 1:
52  self.form="%-4s"%args[1]
53  if len(args) > 2:
54  self.label=args[2]
55  return
56  # string constructor
57  if type(args[0]) == str and len(args[0]) > 5:
58  phs = args[0]
59  if phs.startswith("ZIO"):
60  phs = phs[3:]
61  self.level = MessageLevel(int(phs[0]))
62  phs = phs[1:]
63  self.form="%-4s"%phs[:4]
64  self.label = phs[4:]
65  # bytes constructor
66  if type(args[0]) == bytes:
67  level, self.form, self.label = decode_header_prefix(args[0])
68  self.level = MessageLevel(int(level))
69 
70  def __str__(self):
71  return "ZIO%d%s%s" % (self.level.value, self.form, self.label)
72 
73  def __repr__(self):
74  return "<zio.message.PrefixHeader %s>" % bytes(self)
75 
76  def __bytes__(self):
77  a = b'ZIO%d' % self.level.value
78  b = bytes(self.form, 'utf-8')
79  c = bytes(self.label, 'utf-8')
80  return a + b + c
81 
83  origin=0 # where a message came from
84  granule=0 # when a message came from
85  seqno=0 # which message
86 
87  def __init__(self, *args, origin=0, granule=0, seqno=0):
88  self.origin = origin
89  self.granule = granule
90  self.seqno = seqno
91  if len(args) == 0:
92  return
93  if len(args) == 3:
94  self.origin,self.granule,self.seqno = args
95  return
96  if type(args[0]) == bytes:
97  self.origin,self.granule,self.seqno = decode_header_coord(args[0])
98  return
99 
100  def __bytes__(self):
101  return encode_header_coord(self.origin, self.granule, self.seqno)
102 
103  def __str__(self):
104  return "[0x%x,%ld,%ld]" %(self.origin,self.granule,self.seqno)
105 
106  def __repr__(self):
107  return "<zio.message.CoordHeader %s>" % bytes(self)
108 
109 
110 class Message:
111  '''
112  A zio.Message fixes some of the message schema.
113 
114  It is equivalent to a C++ zio::Message.
115 
116  '''
117 
118 
119  routing_id = 0
120  prefix = None
121  coord = None
122  _payload = ()
123 
124  def __init__(self,
125  level=None, form=None, label=None, routing_id=None,
126  origin=None, granule=None, seqno=None,
127  prefix=None, coord=None, payload=None,
128  parts=None, encoded=None, frame=None):
129  '''Construct a zio.Message.
130 
131  Construction applies arguments in reverse order. Thus one
132  may, eg, construct a message with a frame and then override
133  the payload and label. The ingredients may be considered
134  deconstructed as:
135 
136  frame = encoding + routing_id
137 
138  encoding = packing of parts
139 
140  parts = [prefix, coord, ...payloads]
141 
142  A frame should be used when the zio.Message will be used with
143  a SERVER socket. Or else the routing_id must be explicitly
144  set.
145 
146  '''
147  self.prefix = PrefixHeader()
148  self.coord = CoordHeader()
149 
150  if frame is not None:
151  self.fromframe(frame)
152  if encoded is not None:
153  self.decode(encoded)
154  if parts is not None:
155  self.fromparts(parts)
156  if payload is not None:
157  self.payload = payload
158  if coord is not None:
159  self.coord = coord
160  if prefix is not None:
161  self.prefix = prefix
162  if routing_id is not None:
163  self.routing_id = routing_id
164  if label is not None:
165  self.label = label
166  if form is not None:
167  self.form = form
168  if level is not None:
169  self.level = level
170  if origin is not None:
171  self.origin = origin
172  if granule is not None:
173  self.granule = granule
174  if seqno is not None:
175  self.seqno = seqno
176  return
177 
178  @property
179  def form(self):
180  return self.prefix.form
181  @form.setter
182  def form(self, val):
183  self.prefix.form = val
184 
185  @property
186  def level(self):
187  return self.prefix.level
188  @level.setter
189  def level(self, val):
190  self.prefix.level = val
191 
192  @property
193  def label(self):
194  return self.prefix.label
195  @label.setter
196  def label(self, val):
197  self.prefix.label = val
198 
199  @property
200  def label_object(self):
201  if not self.label:
202  return dict()
203  if type(self.label) is bytes:
204  return json.loads(self.decode('utf-8'))
205  return json.loads(self.label)
206 
207  @label_object.setter
208  def label_object(self, val):
209  self.label = json.dumps(val)
210 
211  @property
212  def origin(self):
213  return self.coord.origin
214  @origin.setter
215  def origin(self, val):
216  self.coord.origin = val
217 
218  @property
219  def granule(self):
220  return self.coord.granule
221  @granule.setter
222  def granule(self, val):
223  self.coord.granule = val
224 
225  @property
226  def seqno(self):
227  return self.coord.seqno
228  @seqno.setter
229  def seqno(self, val):
230  self.coord.seqno = val
231 
232 
233  @property
234  def payload(self):
235  return self._payload or list()
236 
237  @payload.setter
238  def payload(self, pl):
239  self._payload = byteify_list(pl)
240 
241  def toframe(self):
242  '''
243  Return self as a frame.
244  '''
245  frame = zmq.Frame(self.encode())
246  if self.routing_id:
247  frame.routing_id = routing_id
248  return frame
249 
250  def fromframe(self, frame):
251  '''
252  Set self from a frame
253  '''
254  self.routing_id = frame.routing_id
255  self.decode(frame.bytes)
256 
257  def encode(self):
258  '''
259  Return encoded byte array of self.
260 
261  It is suitable for use as the data arg to a zmq.Frame
262  '''
263  parts = self.toparts()
264  return encode_message(parts)
265 
266  def decode(self, encoded):
267  '''
268  Decode to self.
269  '''
270  parts = decode_message(encoded)
271  self.fromparts(parts)
272  pass
273 
274  def toparts(self):
275  '''
276  Return self as a multipart set of encoded data
277  '''
278  return [bytes(self.prefix),bytes(self.coord)] + self.payload
279 
280  def fromparts(self, parts):
281  '''
282  Set self from multipart message / array of encoded data.
283  '''
284  if len(parts) < 2:
285  raise ValueError("must have at least two parts")
286  self.prefix = PrefixHeader(parts[0])
287  self.coord = CoordHeader(parts[1])
288  self.payload = parts[2:]
289 
290  def __str__(self):
291  return "zio.Message: \"%s\" + %s + [%d]" % \
292  (self.prefix, self.coord, len(self.payload))
293 
294 
295 def encode_message(parts):
296  '''
297  Return a binary encoded concatenation of parts in the input
298  sequence. Result is suitable for use as a single-part message.
299  For ZIO messages the first two parts should be encoded header
300  prefix and coord, respectively. Subseqent parts can be payload of
301  arbitrary encoding.
302  '''
303  ret = b''
304  for p in parts:
305  s = struct.pack('I', len(p))
306  ret += s + p
307  return ret
308 
309 def decode_message(encoded):
310  '''
311  Unpack an encoded single-part ZIO message such as returned by
312  socket.recv() on a SERVER socket. It's the moral opposite of
313  zio::Message::encode(). What is returned is sequence of message
314  parts.
315  '''
316  tot = len(encoded)
317  ret = list()
318  beg = 0
319  while beg < tot:
320  end = beg + 4
321  if end >= tot:
322  raise ValueError("corrupt ZIO message in size")
323  size = struct.unpack('i',encoded[beg:end])[0]
324  beg = end
325  end = beg + size
326  if end > tot:
327  raise ValueError("corrupt ZIO message in data")
328  ret.append(encoded[beg:end])
329  beg = end
330  return ret
331 
332 def encode_header_prefix(mform="FLOW", level=0, label=""):
333  '''
334  Return a binary encoded header prefix suitable for use as a
335  message part.
336  '''
337  pre = 'ZIO%d%-4s' % (level, mform[:4])
338  pre += label
339  pre = pre.encode()
340  return struct.pack('I', len(pre)) + pre
341 
343  '''
344  Parse the bytes of one encoded message part into a ZIO message
345  header prefix. This is usually the first part of a multipart
346  message or as returned by decode(). Returns tuple (level, format,
347  label) or None if parse error.
348  '''
349  if henc[:3] != b'ZIO':
350  return None
351  if len(henc) < 8:
352  return None
353  level = henc[3]-ord('0')
354  mform = henc[4:8].decode()
355  label = henc[8:].decode()
356  return (level, mform, label)
357 
358 def encode_header_coord(origin, granule, seqno):
359  '''
360  Return a binary encoded header coord suitable for use as a message
361  part. Arguments are taken to be 64 bit unsigned ints.
362  '''
363  return struct.pack('LLL', origin, granule, seqno);
364 
365 
367  '''
368  Parse the bytes of one encoded message part into a ZIO message
369  header coord. This is ususally the second part of a multipart
370  message or as returend by decode(). Returns tuple (origin,
371  granule, seqno) or None if parse error.
372  '''
373  if len(henc) != 24:
374  return None
375  return struct.unpack('LLL', henc);
376 
def encode_message(parts)
Definition: message.py:295
def decode(self, encoded)
Definition: message.py:266
def __init__(self, level=None, form=None, label=None, routing_id=None, origin=None, granule=None, seqno=None, prefix=None, coord=None, payload=None, parts=None, encoded=None, frame=None)
Definition: message.py:128
def decode_message(encoded)
Definition: message.py:309
def __init__(self, args, origin=0, granule=0, seqno=0)
Definition: message.py:87
def __init__(self, args, level=0, form=" " *4, label="")
Definition: message.py:36
def label_object(self)
Definition: message.py:200
def toframe(self)
Definition: message.py:241
def encode(self)
Definition: message.py:257
def decode_header_prefix(henc)
Definition: message.py:342
def encode_header_coord(origin, granule, seqno)
Definition: message.py:358
def encode_header_prefix(mform="FLOW", level=0, label="")
Definition: message.py:332
def byteify_list(lst)
Definition: util.py:56
def fromframe(self, frame)
Definition: message.py:250
def fromparts(self, parts)
Definition: message.py:280
def decode_header_coord(henc)
Definition: message.py:366
def __str__(self)
Definition: message.py:290
def toparts(self)
Definition: message.py:274