3 zio.flow implements ZIO flow protocol helper 5 This is equivalent to the C++ zio::flow namespace 9 from ..message
import Message
13 log = logging.getLogger(
"zio")
23 A Flow object provides ZIO flow protocol API 25 It is equivalent to the C++ zio::flow::Flow class 27 All timeouts are in milliseconds. A timeout of None means forever. 39 Construct a flow object on a port. 41 Application shall handle ports bind/connect and online states. 47 Send a BOT message to the other end. 49 Client calls send_bot() first, server calls send_bot() second. 52 raise RuntimeError(
"BOT must be sent first")
59 log.debug(f
'send_bot: {self.send_seqno} {msg}')
65 Receive and return BOT message or None. 67 Returns None if EOT was received. 72 raise RuntimeError(
"BOT must be recv first")
74 msg = self.
port.recv(timeout)
76 raise TimeoutError(
'flow.recv_bot: timeout')
78 raise RuntimeError(f
'flow.recv_bot: BOT not seqno 0 {msg.seqno}')
79 if msg.form !=
'FLOW':
80 raise TypeError(
'flow.recv_bot: not FLOW message')
82 ftype = fobj.get(
"flow",
None)
84 raise TypeError(f
'flow.recv_bot: unknown FLOW message {ftype}')
86 credit = fobj.get(
"credit",
None)
88 raise ValueError(
'flow.recv_bot: no credit')
90 fdir = fobj.get(
"direction",
None)
94 elif fdir ==
"inject":
98 raise ValueError(f
'flow.recv_bot: uknonwn direction {fdir}')
107 Receive any waiting PAY messages 109 The flow object will slurp prior to a sending a DAT but the 110 application may call this at any time after BOT. Number of 111 credits slurped is returned. None is returned if other than a 112 PAY is received. Caller should likely respond to that with 116 raise RuntimeError(
"must recv BOT before PAY")
118 msg = self.
port.recv(timeout)
123 if msg.form !=
'FLOW':
126 if fobj.get(
"flow",
None) !=
"PAY":
127 log.debug(
"malformed PAY flow: %s" % (msg,))
130 credit = fobj[
"credit"]
132 log.debug(
"malformed PAY credit: %s" % (msg,))
141 Send a DAT message and slurp for any waiting PAY. 143 Return True if sent, None if an EOT was received instead of 155 raise RuntimeError(
"must send BOT before DAT")
166 Send any accumulated credit as PAY. 168 This is called in a get() but app may call any time after a 169 BOT exchange. Number of credits sent is returned. This does 180 log.debug(f
'flush_pay: {self.send_seqno} {msg}')
182 raise RuntimeError(
"must recv BOT before PAY")
187 def get(self, timeout=None):
189 Receive and return a DAT message and send any accumulated PAY. 191 Return None if EOT was received instead of DAT. 196 raise RuntimeError(
"must recv BOT before DAT")
198 log.debug (f
'flow.get({timeout})')
200 msg = self.
port.recv(timeout)
202 log.debug(
"flow.get timeout")
203 raise TimeoutError(
"flow.get timeout")
205 log.debug(f
'flow.get bad seqno: {msg.seqno}, last {self.recv_seqno}\n{msg}')
206 raise ValueError(
'flow.get bad seqno')
207 if msg.form !=
'FLOW':
208 log.debug(f
'flow.get not FLOW message\n{msg}')
209 raise TypeError(
'flow.get not FLOW message')
211 if fobj.get(
'flow',
None) ==
'EOT':
212 log.debug(
"EOT during flow:\n%s" % (msg,))
214 ftype = fobj.get(
'flow',
None)
216 log.warning(
"malformed DAT flow:\n%s" % (msg,))
217 raise TypeError(f
'flow.get unexpected FLOW type {ftype}')
226 Send EOT message to other end. 228 Note, if app initiates the EOT, it should then call 229 recv_eot(). If it unexpectedly got EOT when recving another 230 then it should send EOT as a response. 236 raise RuntimeError(
"must send BOT before EOT")
240 log.debug(f
'send_eot: {self.send_seqno} {msg}')
248 EOT message is returned or None if timeout occurs. 250 If app explicitly calls send_eot() it should call recv_eot() 251 to wait for the ack from the other end. If an app receives 252 EOT as an unepxected message while receiving PAY or DAT then 253 it should send_eot() but not expect another EOT ack. 257 raise RuntimeError(
"must recv BOT before EOT")
259 msg = self.
port.recv(timeout)
264 if msg.form !=
'FLOW':
268 if fobj.get(
'flow',
None) ==
'EOT':
def recv_bot(self, timeout=-1)
def send_eot(self, msg=Message())
def slurp_pay(self, timeout=None)
def stringify(flowtype, params)
def recv_eot(self, timeout=None)
def get(self, timeout=None)