UP | HOME

ZIO Tutorial: Flow Broker

Table of Contents

ZIO provides zio.flow.Broker with which flow services can be easily built. The broker design is described in detail in flow broker writeup and it is suggested to read through that before using this tutorial to understand how to use the Python implementation.

1 Running a flow broker

A flow broker brings together a zio.Port and a "factory". The port should be prepared and online to use a SERVER socket. The factory provides the application-specific handling of flows based on the port receiving any BOT flow message.

node = zio.Node("mybroker")
port = node.port("myservice", zmq.CLIENT)
port.bind()
node.online()

factory = MyFactory()

broker = zio.flow.Broker(port, factory)

while True:
    try:
        broker.poll(1000) # [ms]
    except TimeoutError:
        node.peer.drain()
        print(node.peer.peers)
    except Exception as e:
        print(f'error {e}')
        break
broker.stop()

In this example we make a node with a single port on a CLIENT socket, bind that port, take the node online. That port goes to the broker as does our factory (see below). The main loop simply polls the broker to perform activity and if there is a timeout we notify the user as it implies flow to or from the broker has not occurred for a while (1s). Any error we simply exit and stop the broker.

In nominal running, each poll handles at most one message sent to the broker by a client or a handler. Thus this loop may spin quickly if the flow is heavy. Interrupting this loop will hang all flows.

2 Flow broker protocol

The broker is a man-in-the-middle between clients and any handlers of client messages. Likewise, handlers intermediate between a flow and some server-side resource (eg, a file for reading or writing flow data). The application provides the factory object and it is must adhere to a flow broker protocol.

The factory object must be callable with a single argument, that of a flow BOT message. While this call is active the broker will hang and so the call should return as quickly as possible. The return value of this call shall be True if the BOT message will be handled by the factory or its delegates. Otherwise the broker will immediately send EOT to the initiating client.

After a successful call to the factory, the initiating client does not receive an immediate reply. Instead the factory (or its delegate) will, based on the content of the given BOT message and prior knowledge of the broker's SERVER socket, send a modified BOT message to the broker. This message must retain the cid attribute of the flow object stored as JSON in the label header attribute of the BOT message. Through this cid value and the internal ID number associated with receiving the handler's BOT the broker can associate all future messages between client and handler.

3 An example flow broker factory

The provided test_flowbroker.py includes a simple factory that will create a handler for each BOT that with direction "inject". The "handling" is simply to dump it. The test can be run simply as:

$ python python/tests/test_flowbroker.py

This will dump various debug logging and will end cleanly and not hang. The test factory is created with the broker's address which it merely passes to each handler. Here is the factory in full:

class Factory:
    def __init__(self, address):
        self.handlers = list()
        self.address = address  # broker
        self.ctx = zmq.Context()

    def __call__(self, bot):
        fobj = json.loads(bot.label) 
        if fobj['direction'] == 'extract': # my direction.
            return                         # only handle inject
        actor = ZActor(self.ctx, dumper, bot, self.address)
        self.handlers.append(actor)
        return True

    def stop(self):
        for actor in self.handlers:
            log.debug("factory stopping handler")
            actor.pipe.signal()
            del(actor)

Remember that when the broker calls the factory the broker will hang until the call returns. For that reason, the factory must not try to handle messages directly but must "spawn" a concurrent handler. This is done using PyZMQ's implementation of ZeroMQ's "actor" pattern (eg as found in similarly named CZMQ zactor_t type). The dumper is defined as:

def dumper(ctx, pipe, bot, address):
    ....

The first to arguments are provided by ZActor while the remainder are whatever are passed to ZActor constructor after the dumper argument. This tutorial will not go into detail of the implementation of dumper but to sketch the design it:

  1. creates its own flow object with a bare port
  2. participates in BOT handshake
  3. because it will poll on its socket it must explicitly flush_pay() as described more in flow tutorial.
  4. Enters a loop, using the poller to know when a flow message is available
  5. Quits loop if the factory sends a signal or if a timeout occurs on flow.get().

Note, this termination condition is just for this test. Depending on the nature of a broker factory application, other termination conditions are likely needed.