ZIO
Python and C++ interface to ZeroMQ and Zyre
proto.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 '''
3 zio.flow implements ZIO flow protocol helper
4 
5 This is equivalent to the C++ zio::flow namespace
6 '''
7 
8 
9 from ..message import Message
10 from .util import *
11 
12 import logging
13 log = logging.getLogger("zio")
14 
15 from enum import Enum
16 class Direction(Enum):
17  undefined=0
18  extract=1
19  inject=2
20 
21 class Flow:
22  '''
23  A Flow object provides ZIO flow protocol API
24 
25  It is equivalent to the C++ zio::flow::Flow class
26 
27  All timeouts are in milliseconds. A timeout of None means forever.
28  '''
29 
30  credit = 0
31  total_credit = 0
32  is_sender = True
33  routing_id = 0
34  send_seqno = -1
35  recv_seqno = -1
36 
37  def __init__(self, port):
38  '''
39  Construct a flow object on a port.
40 
41  Application shall handle ports bind/connect and online states.
42  '''
43  self.port = port
44 
45  def send_bot(self, msg):
46  '''
47  Send a BOT message to the other end.
48 
49  Client calls send_bot() first, server calls send_bot() second.
50  '''
51  if self.send_seqno != -1:
52  raise RuntimeError("BOT must be sent first")
53 
54  msg.form = 'FLOW'
55  fobj = objectify(msg)
56  msg.label = stringify('BOT', **fobj)
57  msg.routing_id = self.routing_id
58  msg.seqno = self.send_seqno = 0
59  log.debug(f'send_bot: {self.send_seqno} {msg}')
60  self.port.send(msg)
61 
62 
63  def recv_bot(self, timeout=-1):
64  '''
65  Receive and return BOT message or None.
66 
67  Returns None if EOT was received.
68 
69  Raises exceptions
70  '''
71  if self.recv_seqno != -1:
72  raise RuntimeError("BOT must be recv first")
73 
74  msg = self.port.recv(timeout)
75  if msg is None:
76  raise TimeoutError('flow.recv_bot: timeout')
77  if msg.seqno != 0:
78  raise RuntimeError(f'flow.recv_bot: BOT not seqno 0 {msg.seqno}')
79  if msg.form != 'FLOW':
80  raise TypeError('flow.recv_bot: not FLOW message')
81  fobj = objectify(msg)
82  ftype = fobj.get("flow",None)
83  if ftype != "BOT":
84  raise TypeError(f'flow.recv_bot: unknown FLOW message {ftype}')
85 
86  credit = fobj.get("credit",None)
87  if credit is None:
88  raise ValueError('flow.recv_bot: no credit')
89 
90  fdir = fobj.get("direction", None)
91  if fdir == "extract":
92  self.is_sender = False
93  self.credit = credit
94  elif fdir == "inject":
95  self.is_sender = True
96  self.credit = 0
97  else:
98  raise ValueError(f'flow.recv_bot: uknonwn direction {fdir}')
99 
100  self.total_credit = credit
101  self.recv_seqno = 0
102  self.routing_id = msg.routing_id
103  return msg
104 
105  def slurp_pay(self, timeout=None):
106  '''
107  Receive any waiting PAY messages
108 
109  The flow object will slurp prior to a sending a DAT but the
110  application may call this at any time after BOT. Number of
111  credits slurped is returned. None is returned if other than a
112  PAY is received. Caller should likely respond to that with
113  send_eot(msg,0).
114  '''
115  if self.recv_seqno < 0:
116  raise RuntimeError("must recv BOT before PAY")
117 
118  msg = self.port.recv(timeout)
119  if msg is None:
120  return None
121  if msg.seqno - self.recv_seqno != 1:
122  return None
123  if msg.form != 'FLOW':
124  return None
125  fobj = objectify(msg)
126  if fobj.get("flow",None) != "PAY":
127  log.debug("malformed PAY flow: %s" % (msg,))
128  return None
129  try:
130  credit = fobj["credit"]
131  except KeyError:
132  log.debug("malformed PAY credit: %s" % (msg,))
133  return None
134  self.credit += credit
135  self.recv_seqno += 1
136  return credit
137 
138 
139  def put(self, msg):
140  '''
141  Send a DAT message and slurp for any waiting PAY.
142 
143  Return True if sent, None if an EOT was received instead of
144  PAY.
145  '''
146  if self.credit < self.total_credit:
147  self.slurp_pay(0) # first do fast try
148  while self.credit == 0: # next really must have
149  self.slurp_pay(-1) # some credit to continue
150 
151  msg.form = 'FLOW'
152  msg.label = stringify('DAT', **objectify(msg))
153  msg.routing_id = self.routing_id
154  if self.send_seqno < 0:
155  raise RuntimeError("must send BOT before DAT")
156  self.send_seqno += 1
157  msg.seqno = self.send_seqno
158  #log.debug (f"port send with credit %d: %s" % (self.credit, msg))
159  self.port.send(msg)
160  self.credit -= 1
161  return True
162 
163 
164  def flush_pay(self):
165  '''
166  Send any accumulated credit as PAY.
167 
168  This is called in a get() but app may call any time after a
169  BOT exchange. Number of credits sent is returned. This does
170  not block.
171  '''
172  if self.credit == 0:
173  return 0
174  nsent = self.credit
175  self.credit = 0
176  msg = Message(form='FLOW', label=stringify('PAY', credit=nsent))
177  self.send_seqno += 1
178  msg.seqno = self.send_seqno
179  msg.routing_id = self.routing_id
180  log.debug(f'flush_pay: {self.send_seqno} {msg}')
181  if self.send_seqno < 0:
182  raise RuntimeError("must recv BOT before PAY")
183  self.port.send(msg)
184  return nsent
185 
186 
187  def get(self, timeout=None):
188  '''
189  Receive and return a DAT message and send any accumulated PAY.
190 
191  Return None if EOT was received instead of DAT.
192 
193  Exceptions raised.
194  '''
195  if self.recv_seqno < 0:
196  raise RuntimeError("must recv BOT before DAT")
197 
198  log.debug (f'flow.get({timeout})')
199  self.flush_pay()
200  msg = self.port.recv(timeout)
201  if msg is None:
202  log.debug("flow.get timeout")
203  raise TimeoutError("flow.get timeout")
204  if msg.seqno - self.recv_seqno != 1:
205  log.debug(f'flow.get bad seqno: {msg.seqno}, last {self.recv_seqno}\n{msg}')
206  raise ValueError('flow.get bad seqno')
207  if msg.form != 'FLOW':
208  log.debug(f'flow.get not FLOW message\n{msg}')
209  raise TypeError('flow.get not FLOW message')
210  fobj = objectify(msg)
211  if fobj.get('flow',None) == 'EOT':
212  log.debug("EOT during flow:\n%s" % (msg,))
213  return None
214  ftype = fobj.get('flow',None)
215  if ftype != 'DAT':
216  log.warning("malformed DAT flow:\n%s" % (msg,))
217  raise TypeError(f'flow.get unexpected FLOW type {ftype}')
218 
219  self.credit += 1
220  self.recv_seqno += 1
221  self.flush_pay()
222  return msg;
223 
224  def send_eot(self, msg = Message()):
225  '''
226  Send EOT message to other end.
227 
228  Note, if app initiates the EOT, it should then call
229  recv_eot(). If it unexpectedly got EOT when recving another
230  then it should send EOT as a response.
231  '''
232  msg.form = 'FLOW'
233  msg.label = stringify('EOT', **objectify(msg))
234  msg.routing_id = self.routing_id
235  if self.send_seqno < 0:
236  raise RuntimeError("must send BOT before EOT")
237 
238  self.send_seqno += 1
239  msg.seqno = self.send_seqno
240  log.debug(f'send_eot: {self.send_seqno} {msg}')
241  self.port.send(msg)
242  self.send_seqno = -1
243 
244  def recv_eot(self, timeout=None):
245  '''
246  Recv an EOT message.
247 
248  EOT message is returned or None if timeout occurs.
249 
250  If app explicitly calls send_eot() it should call recv_eot()
251  to wait for the ack from the other end. If an app receives
252  EOT as an unepxected message while receiving PAY or DAT then
253  it should send_eot() but not expect another EOT ack.
254  '''
255  while True:
256  if self.recv_seqno < 0:
257  raise RuntimeError("must recv BOT before EOT")
258 
259  msg = self.port.recv(timeout)
260  if msg is None:
261  return None
262  if msg.seqno - self.recv_seqno != 1:
263  continue
264  if msg.form != 'FLOW':
265  continue # who's knocking at my door?
266  self.recv_seqno += 1
267  fobj = objectify(msg)
268  if fobj.get('flow',None) == 'EOT':
269  self.recv_seqno = -1
270  return msg
271  continue # try again, probably got a PAY/DAT
272  return # won't reach
273 
274 
275 
276  pass
def recv_bot(self, timeout=-1)
Definition: proto.py:63
def send_eot(self, msg=Message())
Definition: proto.py:224
def send_bot(self, msg)
Definition: proto.py:45
def slurp_pay(self, timeout=None)
Definition: proto.py:105
def stringify(flowtype, params)
Definition: util.py:21
def recv_eot(self, timeout=None)
Definition: proto.py:244
def objectify(morl)
Definition: util.py:5
def flush_pay(self)
Definition: proto.py:164
def get(self, timeout=None)
Definition: proto.py:187
def __init__(self, port)
Definition: proto.py:37
def put(self, msg)
Definition: proto.py:139