3 ZIO brokers bring together ZIO clients. 8 from collections
import namedtuple
11 from .util
import objectify, switch_direction
13 log = logging.getLogger(
"zio")
17 '''Create a flow broker. 26 This broker routes messages between a pair of clients: a 27 remote client and its handler. It does this by adding a flow 28 initiation protocol to ZIO flow protocol. 30 On receipt of a BOT, the broker passes it to the factory which 31 should return True if the BOT was accepted else None. The 32 factory call should return as promptly as possible as the 35 If factory returns True then it is expected that the factory 36 has started some other client which contacs the broker with 37 the BOT. The factory or the new client may modify the BOT as 38 per flow protorocl but must leave intact the `cid` attribute 39 placed in the flow object by the broker. 41 Note that the flow handler protocol does not communicate the 42 location of the broker's SERVER socket. It is up to handler 43 implementations to locate the the server. 51 def poll(self, timeout=None):
53 Poll for at most one message from the SERVER port 58 msg = self.
server.recv(timeout)
60 log.error (f
'broker poll timeout with {timeout}')
61 raise TimeoutError(f
'broker poll timeout with {timeout}')
62 log.debug (f
'broker recv {msg}')
65 orid = self.
other.get(rid,
None)
68 log.debug (f
"broker route c2h {rid} -> {orid}:\n{msg}\n")
70 log.debug (f
"broker route h2c {rid} -> {orid}:\n{msg}\n")
78 ftype = fobj.get(
"flow",
None)
80 raise TypeError(f
'message not type FLOW')
82 raise TypeError(f
'flow message not type BOT')
85 cid = fobj.get(
'cid',
None)
91 log.debug(f
'broker route from handler {rid} <--> {cid}')
93 label_for_client = json.dumps(fobj)
96 msg.label = json.dumps(fobj)
98 log.debug (f
"broker route c2h {cid} -> {rid}:\n{msg}\n")
101 msg.label = label_for_client
103 log.debug (f
"broker route h2c {rid} -> {cid}:\n{msg}\n")
108 log.debug(f
'broker route from client {rid}')
112 msg.label = json.dumps(fobj)
120 msg.label = json.dumps(fobj)
123 raise RuntimeError(
'broker factory rejects ' + str(msg.label))
def __init__(self, server, factory)
def switch_direction(fobj)
def poll(self, timeout=None)