ZIO Tutorial: Flow
Table of Contents
ZIO provides zio.flow.Flow
class to provide Python side implementation
of the ZIO data flow protocol (see flow writeup).
1 Create a flow
The flow object requires an externally prepared port to be provided. The port may be created stand-alone (see port tutorial) but is more simply and usefully created in a node (see node tutorial). Here we do the latter:
import zio, zmq node = zio.Node("mynode") port = node.port("myflow", zmq.CLIENT) port.bind() node.online() flow = zio.flow.Flow(port)
Notes:
- The application is free to bring the node, and thus the port online either before or after the flow is created. They must, of course, be online prior to using the flow object to exercise the flow protocol.
- The ZIO flow protocol typically uses CLIENT/SERVER sockets and the
zio.flow.Flow
object typically is used for the CLIENT side. As expert level use, it can also provide the SERVER role.
2 Start a flow
The ZIO data flow protocol starts with both sides shaking hands to determine their mutually opposing directions and their agreed on number of flow control credit. This is done by exchanging a flow begin-of-transmission (BOT) message. From the point of view of a flow client:
cbot = Message(label='{"credit":2,"direction":"inject"}') flow.send_bot(cbot) sbot = flow.recv_bot(1000); fobj = json.loads(sbot.label) assert(fobj['direction'] == 'extract') total_credit = fobj['credit']
The server is similar but it must first call recv_bot()
and then
send_bot()
. In between it must accept the clients direction and
accept or lower the client's suggested credit and return the result.
If the server does not accept then it returns an EOT message.
Note the required form='FLOW'
message header and the flow='BOT'
attribute of the flow object in the message '.label' are both set
implicitly inside the flow.send_bot()
method so the application need
not be concerned.
3 Perform data flow
The actual "flowing" phase of the data flow protocol involves explicit exchange of "payload" DAT messages and implicit exchange of PAY messages traveling in opposite directions.
A flow which has direction "inject" will receive payload messages:
flow.flush_pay() timeout = 1000 # [ms] dat = flow.get(timeout) if not dat: print("timeout") else: print (f'got payload {dat}')
Note the call to flow.flush_pay()
. The other end shall not send DAT
messages until it has credit and it starts out with none. This means
the receiving end must signal it is ready to get flow. It does this
by explicitly sending some initial credit (ie, all of it) to the other
end with flush_pay()
.
Note, in this particular example, the call is redundant as
flow.flush_pay()
is called at the start of a flow.get()
. However, it
is typical for applications to rely on a ZeroMQ "poller" to watch the
client socket for waiting messages before calling flow.get()
. In such
a case, no DAT will come because the other end is waiting for PAY and
no PAY will be sent because the receiving end is waiting for DAT.
Thus an explicit, initial call to flow.flush_pay()
is needed to break
the deadlock.
Goign the other way, a flow which has direction "extract" might send payload messages simply like:
dat = zio.Message(...)
flow.put(dat)
In this case there is an analogue to flow.flush_pay()
called
flow.slurp_pay()
. This may be explicitly called but it is called in
flow.put()
and no deadlock should arise.
4 Counting messages
The Python (and C++) implementation of ZIO data flow requires that the seqno field in message from any flow port is strictly incrementing, starting with a value of 0 for the BOT. It is up to the application to assure this for the initial BOT and subsequent DAT and final EOT. Ignoring details like actual payload, a flow sender might then:
msg = zio.Message() for count in range(100): msg.seqno = count flow.put(dat) msg.seqno = count+1 flow.send_eot(msg)
And a flow receiver:
msg = flow.recv_bot() assert(msg.seqno == 0) last_seqno = 0 while True: msg = flow.get() if not msg: break assert(msg.seqno - last_seqno == 1) last_seqno += 1 flow.send_eot()
5 Blocking
The ZIO data flow protocol is designed to maximize high throughput
while minimizing keeping latency and bounding memory usage. However,
both the get()
and the put()
may still block:
- a
get()
will block if the other end ceases to send DAT and the local end exhausts the queue. - a
put()
will block if the local end ceases to send PAY and the local end has used all accumulated credit.
If blocks occur do to variation in message processing time increasing the total amount of credit will use more memory but may smooth out the resulting latency variation.
An important and desirable cause of blocking in the ZIO data flow control protocol is discussed next.
6 End a flow
The final stage of the ZIO data flow protocol is its end-of-transmission (EOT) message. Like with BOT this is a handshake between both endpoints. One endpoint must initiate EOT which means the other endpoint will have a "surprise discovery" while it is either expecting a BOT, a DAT or a PAY message. In the first case, the code might be like:
# initiate EOT flow.send_eot() timeout = 10000 # [ms] eot = flow.recv_eot(timeout) if not eot: print("EOT timeout, may have lost data"
While on the ether end:
# "surprise" EOT dat = flow.get() if not dat: print ("got EOT") flow.send_eot() do_clean_shutdown()
Notes:
- The initiator of the EOT should wait for a return EOT for as long as "comfortable" for the application. This assures that any queued DAT messages are processed.
- The endpoint that got the "surprise" EOT may return an EOT immediately unless there is a reason to keep the other end "hanging" while clean shutdown occurs.
- Flow methods can also raise exceptions.