ZIO
Python and C++ interface to ZeroMQ and Zyre
example_handler.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 '''
3 Example of handlers spawned from backend.spawner
4 '''
5 import zmq
6 import json
7 from ..port import Port
8 from ..message import Message
9 from .proto import Flow
10 from .util import objectify
11 from pyre.zactor import ZActor
12 
13 def handshake(pipe, flow, bot):
14  pipe.signal()
15  flow.send_bot(bot)
16  bot = flow.recv_bot()
17 
18 
19 def dump_actor(ctx, pipe, flow, bot, *args):
20  '''
21  Dump flow messages
22  '''
23  print ("spawn dumper")
24  handshake(pipe, flow, bot)
25  flow.flush_pay()
26 
27  while True: # fixme, check pipe
28  msg = flow.get()
29  if msg is None:
30  flow.send_eot()
31  print("dumper sees EOT")
32  return
33  print (msg)
34  pipe.signal()
35  return
36 
37 def gen_actor(ctx, pipe, flow, bot, *args):
38  '''
39  Generate flow messages
40  '''
41  print ("spawn genner")
42  handshake(pipe, flow, bot)
43  flow.slurp_pay(0)
44 
45  while True: # fixme, check pipe
46  ok = flow.put(Message())
47  if not ok:
48  print("genner sees EOT")
49  return
50  pipe.signal()
51  return
52 
53 
54 class Factory:
55 
56  def __init__(self, server_address):
57  self.server_address = server_address
58  self.ctx = zmq.Context()
59 
60  def __call__(self, bot):
61  '''
62  Broker calls this to dispatch a BOT.
63 
64  Broker waits for return so don't dally.
65  '''
66  fobj = objectify(bot)
67  direction = fobj["direction"]
68 
69  if direction == "inject":
70  return self.spawn(dump_actor, bot)
71  elif direction == "extract":
72  return self.spawn(gen_actor, bot)
73  return
74 
75  def spawn(self, actor_func, bot):
76  '''
77  Spawn actor function.
78 
79  Function must take a flow and the BOT.
80  '''
81  port = Port("handler", zmq.CLIENT,'')
82  port.connect(self.server_address)
83  port.online(None)
84  flow = Flow(port)
85  actor = ZActor(self.ctx, actor_func, flow, bot)
86  return actor
87 
def dump_actor(ctx, pipe, flow, bot, args)
def __init__(self, server_address)
def gen_actor(ctx, pipe, flow, bot, args)
def objectify(morl)
Definition: util.py:5
def handshake(pipe, flow, bot)
def spawn(self, actor_func, bot)