ZIO Tutorial: Flow Broker
Table of Contents
ZIO provides zio.flow.Broker
with which flow services can be easily
built. The broker design is described in detail in flow broker
writeup and it is suggested to read through that before using this
tutorial to understand how to use the Python implementation.
1 Running a flow broker
A flow broker brings together a zio.Port
and a "factory". The port
should be prepared and online to use a SERVER socket. The factory
provides the application-specific handling of flows based on the port
receiving any BOT flow message.
node = zio.Node("mybroker") port = node.port("myservice", zmq.CLIENT) port.bind() node.online() factory = MyFactory() broker = zio.flow.Broker(port, factory) while True: try: broker.poll(1000) # [ms] except TimeoutError: node.peer.drain() print(node.peer.peers) except Exception as e: print(f'error {e}') break broker.stop()
In this example we make a node with a single port on a CLIENT socket, bind that port, take the node online. That port goes to the broker as does our factory (see below). The main loop simply polls the broker to perform activity and if there is a timeout we notify the user as it implies flow to or from the broker has not occurred for a while (1s). Any error we simply exit and stop the broker.
In nominal running, each poll handles at most one message sent to the broker by a client or a handler. Thus this loop may spin quickly if the flow is heavy. Interrupting this loop will hang all flows.
2 Flow broker protocol
The broker is a man-in-the-middle between clients and any handlers of client messages. Likewise, handlers intermediate between a flow and some server-side resource (eg, a file for reading or writing flow data). The application provides the factory object and it is must adhere to a flow broker protocol.
The factory object must be callable with a single argument, that of a
flow BOT message. While this call is active the broker will hang and
so the call should return as quickly as possible. The return value of
this call shall be True
if the BOT message will be handled by the
factory or its delegates. Otherwise the broker will immediately send
EOT to the initiating client.
After a successful call to the factory, the initiating client does not
receive an immediate reply. Instead the factory (or its delegate)
will, based on the content of the given BOT message and prior
knowledge of the broker's SERVER socket, send a modified BOT message
to the broker. This message must retain the cid
attribute of the flow
object stored as JSON in the label
header attribute of the BOT
message. Through this cid
value and the internal ID number associated
with receiving the handler's BOT the broker can associate all future
messages between client and handler.
3 An example flow broker factory
The provided test_flowbroker.py
includes a simple factory that will
create a handler for each BOT that with direction "inject". The
"handling" is simply to dump it. The test can be run simply as:
$ python python/tests/test_flowbroker.py
This will dump various debug logging and will end cleanly and not hang. The test factory is created with the broker's address which it merely passes to each handler. Here is the factory in full:
class Factory: def __init__(self, address): self.handlers = list() self.address = address # broker self.ctx = zmq.Context() def __call__(self, bot): fobj = json.loads(bot.label) if fobj['direction'] == 'extract': # my direction. return # only handle inject actor = ZActor(self.ctx, dumper, bot, self.address) self.handlers.append(actor) return True def stop(self): for actor in self.handlers: log.debug("factory stopping handler") actor.pipe.signal() del(actor)
Remember that when the broker calls the factory the broker will hang
until the call returns. For that reason, the factory must not try to
handle messages directly but must "spawn" a concurrent handler. This
is done using PyZMQ's implementation of ZeroMQ's "actor" pattern (eg
as found in similarly named CZMQ zactor_t
type). The dumper
is
defined as:
def dumper(ctx, pipe, bot, address): ....
The first to arguments are provided by ZActor
while the remainder are
whatever are passed to ZActor
constructor after the dumper
argument.
This tutorial will not go into detail of the implementation of dumper
but to sketch the design it:
- creates its own flow object with a bare port
- participates in BOT handshake
- because it will poll on its socket it must explicitly
flush_pay()
as described more in flow tutorial. - Enters a loop, using the poller to know when a flow message is available
- Quits loop if the factory sends a signal or if a timeout occurs on
flow.get()
.
Note, this termination condition is just for this test. Depending on the nature of a broker factory application, other termination conditions are likely needed.