UP | HOME

ZIO Tutorial: Flow

Table of Contents

ZIO provides zio.flow.Flow class to provide Python side implementation of the ZIO data flow protocol (see flow writeup).

1 Create a flow

The flow object requires an externally prepared port to be provided. The port may be created stand-alone (see port tutorial) but is more simply and usefully created in a node (see node tutorial). Here we do the latter:

import zio, zmq
node = zio.Node("mynode")
port = node.port("myflow", zmq.CLIENT)
port.bind()
node.online()
flow = zio.flow.Flow(port)

Notes:

  • The application is free to bring the node, and thus the port online either before or after the flow is created. They must, of course, be online prior to using the flow object to exercise the flow protocol.
  • The ZIO flow protocol typically uses CLIENT/SERVER sockets and the zio.flow.Flow object typically is used for the CLIENT side. As expert level use, it can also provide the SERVER role.

2 Start a flow

The ZIO data flow protocol starts with both sides shaking hands to determine their mutually opposing directions and their agreed on number of flow control credit. This is done by exchanging a flow begin-of-transmission (BOT) message. From the point of view of a flow client:

cbot = Message(label='{"credit":2,"direction":"inject"}')
flow.send_bot(cbot)
sbot = flow.recv_bot(1000);
fobj = json.loads(sbot.label)
assert(fobj['direction'] == 'extract')
total_credit = fobj['credit']

The server is similar but it must first call recv_bot() and then send_bot(). In between it must accept the clients direction and accept or lower the client's suggested credit and return the result. If the server does not accept then it returns an EOT message.

Note the required form='FLOW' message header and the flow='BOT' attribute of the flow object in the message '.label' are both set implicitly inside the flow.send_bot() method so the application need not be concerned.

3 Perform data flow

The actual "flowing" phase of the data flow protocol involves explicit exchange of "payload" DAT messages and implicit exchange of PAY messages traveling in opposite directions.

A flow which has direction "inject" will receive payload messages:

flow.flush_pay()

timeout = 1000 # [ms]
dat = flow.get(timeout)
if not dat:
    print("timeout")
else:
    print (f'got payload {dat}')

Note the call to flow.flush_pay(). The other end shall not send DAT messages until it has credit and it starts out with none. This means the receiving end must signal it is ready to get flow. It does this by explicitly sending some initial credit (ie, all of it) to the other end with flush_pay().

Note, in this particular example, the call is redundant as flow.flush_pay() is called at the start of a flow.get(). However, it is typical for applications to rely on a ZeroMQ "poller" to watch the client socket for waiting messages before calling flow.get(). In such a case, no DAT will come because the other end is waiting for PAY and no PAY will be sent because the receiving end is waiting for DAT. Thus an explicit, initial call to flow.flush_pay() is needed to break the deadlock.

Goign the other way, a flow which has direction "extract" might send payload messages simply like:

dat = zio.Message(...)
flow.put(dat)

In this case there is an analogue to flow.flush_pay() called flow.slurp_pay(). This may be explicitly called but it is called in flow.put() and no deadlock should arise.

4 Counting messages

The Python (and C++) implementation of ZIO data flow requires that the seqno field in message from any flow port is strictly incrementing, starting with a value of 0 for the BOT. It is up to the application to assure this for the initial BOT and subsequent DAT and final EOT. Ignoring details like actual payload, a flow sender might then:

msg = zio.Message()
for count in range(100):
    msg.seqno = count
    flow.put(dat)
msg.seqno = count+1
flow.send_eot(msg)

And a flow receiver:

msg = flow.recv_bot()
assert(msg.seqno == 0)
last_seqno = 0
while True:
    msg = flow.get()
    if not msg:
        break
    assert(msg.seqno - last_seqno == 1)
    last_seqno += 1
flow.send_eot()

5 Blocking

The ZIO data flow protocol is designed to maximize high throughput while minimizing keeping latency and bounding memory usage. However, both the get() and the put() may still block:

  • a get() will block if the other end ceases to send DAT and the local end exhausts the queue.
  • a put() will block if the local end ceases to send PAY and the local end has used all accumulated credit.

If blocks occur do to variation in message processing time increasing the total amount of credit will use more memory but may smooth out the resulting latency variation.

An important and desirable cause of blocking in the ZIO data flow control protocol is discussed next.

6 End a flow

The final stage of the ZIO data flow protocol is its end-of-transmission (EOT) message. Like with BOT this is a handshake between both endpoints. One endpoint must initiate EOT which means the other endpoint will have a "surprise discovery" while it is either expecting a BOT, a DAT or a PAY message. In the first case, the code might be like:

# initiate EOT
flow.send_eot()
timeout = 10000 # [ms]
eot = flow.recv_eot(timeout)
if not eot:
    print("EOT timeout, may have lost data"

While on the ether end:

# "surprise" EOT
dat = flow.get()
if not dat:
    print ("got EOT")
    flow.send_eot()
    do_clean_shutdown()

Notes:

  • The initiator of the EOT should wait for a return EOT for as long as "comfortable" for the application. This assures that any queued DAT messages are processed.
  • The endpoint that got the "surprise" EOT may return an EOT immediately unless there is a reason to keep the other end "hanging" while clean shutdown occurs.
  • Flow methods can also raise exceptions.