ZIO
Python and C++ interface to ZeroMQ and Zyre
factories.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 '''
3 ZIO flow factories are used by the flow broker to service client flows.
4 '''
5 from .util import message_to_dict
6 from .. import rules
7 from pyre.zactor import ZActor
8 import logging
9 log = logging.getLogger("zio")
10 
11 def wash_one(t):
12  return t if isinstance(t,tuple) else (t,())
13 def wash(tt):
14  return tuple(map(wash_one, tt))
15 
16 
17 
18 class Ruleset:
19  '''Use a ruleset to mate flow client to a file resource.
20 
21  The ruleset factory will apply each rule in the set and used the
22  first one that evaluates True. This will govern the parameters
23  for the file resource.
24 
25  The file resource is delegated to a pair of actor functions for
26  reading and another for writing. A single reader/writer actor is
27  launched to handle each new file and a reader or writer handler is
28  used to handle each new client.
29 
30  '''
31 
32  def __init__(self, ctx, ruleset,
33  wactors=(), ractor=()):
34  '''Create a Factory with a ruleset.
35 
36  Parameters
37  ----------
38 
39  ctx : a zmq.Context
40 
41  ruleset: array of dictionary
42 
43  A ruleset is a sequence of rule specifications (see Rule
44  Attributes below).
45 
46  wactors : 2-tuple of 2-tuples specifing file writing actors
47 
48  See below.
49 
50  ractor : 2-tuple specifying file reading actors and any args
51 
52  See below
53 
54  The "*actors" args are 2-tuples specifying actor functions and
55  optional user data arguments for writing and reading.
56 
57  The head of the 2-tuple is for the file actor which directly
58  manages some file resource. The tail is for the client actor
59  which will connect to the file actor socket (and presumably
60  some other socket). If either element of this 2-tuple is
61  itself a tuple then the head is treated as the actor function
62  and any remainder is passed to the function as user data
63  arguments.
64 
65  A file actor is called as:
66 
67  file_handler(ctx, pipe,
68  filename,
69  *user_data):
70 
71  The file actor function must adhere to a simple protocol on
72  its actor pipe. After issuing the usual actor ready signal
73  and before entering its loop, the file actor shall return on
74  its actor pipe as a string the address of a bound socket to
75  which any subsequent client actors may connect.
76 
77  A client actor is called as:
78 
79  client_handler(ctx, pipe,
80  bot, rule_object, file_addr,
81  *user_data)
82 
83  The client actor has no protocol on its actor pipe other than
84  the usual ready signal.
85 
86  Rule Attributes
87  ---------------
88 
89  rule : string
90 
91  An S-expression in terms of attributes evaluating to a
92  Boolean. First rule returning True is used. Here and
93  below "attributes" refer to the combination of BOT message
94  header values and any additional specified in the `attr`
95  Rule Attribute (described below).
96 
97  rw : string
98 
99  Takes value "read" or "write" and determines if the flow
100  is to be read from or written to file.
101 
102  filepat : f-string
103 
104  An f-string formatted against the attributes which
105  resolves to the name of the file to be read or written.
106 
107  grouppat : f-string
108 
109  An f-string formatted against attributes to determine and
110  to add a "hdfgroup" parameter to the BOT message prior to
111  it being sent into a read or write handler. The value
112  will be used to define an HDF base group path into which
113  messages from one stream will be mapped. This path must
114  be unique to the flow client.
115 
116  attr : dictionary
117 
118  An optional, extra dictionary of attributes updated on the
119  dictionary of message attributes prior to applying to the
120  rule or either patterns.
121 
122  '''
123  self.ctx = ctx
124  self.ruleset = ruleset
125  self.wactors = wash(wactors)
126  self.ractor = ractor
127  self.writers = dict()
128  self.handlers = list()
129  return
130 
131  def __call__(self, bot):
132  '''
133  Given a bot, return a live actor or None if bot is rejected.
134  '''
135  attr = message_to_dict(bot)
136 
137  cid = attr['cid'] # required
138  log.info('factory Ruleset called %d, have %d rules' % (
139  (cid, len(self.ruleset))))
140  for maybe in self.ruleset:
141  log.debug(f'check rule: "{maybe}"')
142  parsed = rules.parse(maybe, **attr)
143  log.debug (f'parsed: {parsed}')
144  tf = rules.evaluate(parsed)
145  if not tf:
146  log.debug ("rule does not apply")
147  continue
148  rattr = dict(maybe.get("attr",{}), **attr)
149  filename = maybe["filepat"].format(**rattr)
150  rw = maybe["rw"][0]
151  log.debug(f'{filename} ({rw})')
152 
153  if rw == 'r':
154  return self.launch_read(filename, bot, maybe)
155  if rw == 'w':
156  return self.launch_write(filename, bot, maybe)
157  log.debug('rule has no "rw" attribute')
158  continue
159 
160  def launch_read(self, filename, bot, rule):
161  # fixme: for now assume file format allows for simultaneous
162  # reading so file and client handlers are merged into one.
163  ractor, rargs = self.ractor
164  log.debug(f'launch_read: {ractor}, {rargs}')
165  actor = ZActor(self.ctx, ractor,
166  bot, rule,
167  filename, *rargs)
168  self.handlers.append(actor)
169  return True
170 
171 
172  def launch_write(self, filename, bot, rule):
173 
174  # launch file actor if needed
175  try:
176  wfile = self.writers[filename]
177  except KeyError:
178  wactor, wargs = self.wactors[0]
179  wfile = ZActor(self.ctx, wactor, filename, wargs)
180  waddr = wfile.pipe.recv_string()
181  if not waddr:
182  err = f"failed to bind any {self.addrpat} for {filename}"
183  log.error(err)
184  raise RuntimeError(err)
185  wfile.addr = waddr # copascetic?
186  log.debug(f"made writer actor for {filename}")
187  self.writers[filename] = wfile
188 
189  # the client handler
190  wactor, wargs = self.wactors[1]
191 
192  actor = ZActor(self.ctx, wactor,
193  bot, rule,
194  wfile.addr, *wargs)
195  self.handlers.append(actor)
196  return True
197 
198  def stop(self):
199  log.debug('stop %d handlers' % len(self.handlers))
200  for handler in self.handlers:
201  handler.pipe.signal()
202  for filename, wactor in self.writers.items():
203  log.debug(f'stop writer for {filename}')
204  wactor.pipe.signal()
def __call__(self, bot)
Definition: factories.py:131
def message_to_dict(msg)
Definition: util.py:44
void append(Message &msg, message_t &&data, const std::vector< size_t > &shape, size_t word_size, const char *tn)
Definition: tens.cpp:34
def __init__(self, ctx, ruleset, wactors=(), ractor=())
Definition: factories.py:33
def launch_read(self, filename, bot, rule)
Definition: factories.py:160
def launch_write(self, filename, bot, rule)
Definition: factories.py:172