3 Test flow broker with example handler 8 from zio.flow import objectify, Broker
10 from zio.flow.backend
import spawner
11 from pyre.zactor
import ZActor
13 server_address =
"tcp://127.0.0.1:5679" 18 port =
zio.Port(
"client", zmq.CLIENT,
'')
19 port.connect(server_address)
23 msg =
zio.Message(label=json.dumps(dict(direction=
'extract',credit=2)))
25 print (
"client sent BOT:\n%s\n" % (msg,))
26 msg = cflow.recv_bot(1000)
28 print (
"client got BOT:\n%s\n" % (msg,))
30 for count
in range(10):
33 print (
"client sent DAT")
35 print (
"client sent EOT")
36 msg = cflow.recv_eot()
47 sport =
zio.Port(
"server", zmq.SERVER);
48 sport.bind(server_address)
52 client = ZActor(ctx, client_actor)
54 factory =
Factory(server_address)
55 backend = ZActor(ctx, spawner, factory)
56 broker = Broker(sport, backend.pipe)
58 for count
in range(30):
59 print (f
"main: poll [{count}]")
60 ok = broker.poll(1000)
64 print (
"main: stopping")
68 if '__main__' == __name__:
A port holds a socket in the context of a node.
def client_actor(ctx, pipe, args)