3 logging.basicConfig(level=logging.DEBUG,
4 format=
'%(asctime)s.%(msecs)03d %(levelname)s\t%(message)s',
5 datefmt=
'%Y-%m-%d %H:%M:%S')
6 log = logging.getLogger(
"zio")
12 from zio.flow import objectify, Broker
14 from pyre.zactor
import ZActor
19 'An actor function talking to a broker on given address' 22 port =
zio.Port(
"client", zmq.CLIENT,
'')
25 log.debug (
"made flow")
29 label=json.dumps(dict(direction=
'extract',credit=2)))
31 log.debug (f
'client sent BOT: {msg}')
32 msg = cflow.recv_bot(1000)
34 log.debug (f
'client got BOT: {msg}')
37 log.debug (f
'client sent {msg}')
40 log.debug (f
'client sent {eot}')
41 eot = cflow.recv_eot()
43 log.debug (f
'client done with {eot}')
49 A dump handler which may be used as an actor talking to a broker's botport. 54 Our initiating BOT message 56 A ZeroMQ address string for a bound broker SERVER socket 59 poller.register(pipe, zmq.POLLIN)
62 port =
zio.Port(
"client", zmq.CLIENT,
'')
66 poller.register(flow.port.sock, zmq.POLLIN)
68 log.debug (f
'dumper: send {bot}')
71 bot = flow.recv_bot(1000)
83 for sock,_
in poller.poll():
86 log.debug (
"dumper: pipe hit")
89 log.debug (
"dumper: got STOP")
91 log.debug (
"dumper: got signal")
97 log.debug (f
'dumper: sock hit: {msg}')
99 log.debug (
"dumper: null message from get, sending EOT")
101 poller.unregister(sock)
105 log.debug(
"dumper: taking port offline")
108 log.debug(
"dumper: waiting for quit")
110 log.debug(
"dumper: done")
120 fobj = json.loads(bot.label)
121 if fobj[
'direction'] ==
'extract':
123 actor = ZActor(self.
ctx, dumper, bot, self.
address)
129 log.debug(
"factory stopping handler")
137 server = node.port(
"server", zmq.SERVER)
138 server_address =
"tcp://127.0.0.1:5678" 139 server.bind(server_address)
142 client = ZActor(ctx, client_actor, server_address)
144 factory =
Factory(server_address)
145 broker = Broker(server, factory)
147 for count
in range(10):
148 log.debug (f
"main: poll [{count}]")
151 except TimeoutError
as te:
156 log.debug (f
"main: stop broker")
158 log.debug (f
"main: node offline")
160 log.debug (f
"main: stop client")
164 if '__main__' == __name__:
An identified vertex in a ported, directed graph.
def __init__(self, address)
A port holds a socket in the context of a node.
def client_actor(ctx, pipe, address)
def dumper(ctx, pipe, bot, address)
void append(Message &msg, message_t &&data, const std::vector< size_t > &shape, size_t word_size, const char *tn)