3 ZIO flow factories are used by the flow broker to service client flows. 5 from .util
import message_to_dict
7 from pyre.zactor
import ZActor
9 log = logging.getLogger(
"zio")
12 return t
if isinstance(t,tuple)
else (t,())
14 return tuple(map(wash_one, tt))
19 '''Use a ruleset to mate flow client to a file resource. 21 The ruleset factory will apply each rule in the set and used the 22 first one that evaluates True. This will govern the parameters 23 for the file resource. 25 The file resource is delegated to a pair of actor functions for 26 reading and another for writing. A single reader/writer actor is 27 launched to handle each new file and a reader or writer handler is 28 used to handle each new client. 33 wactors=(), ractor=()):
34 '''Create a Factory with a ruleset. 41 ruleset: array of dictionary 43 A ruleset is a sequence of rule specifications (see Rule 46 wactors : 2-tuple of 2-tuples specifing file writing actors 50 ractor : 2-tuple specifying file reading actors and any args 54 The "*actors" args are 2-tuples specifying actor functions and 55 optional user data arguments for writing and reading. 57 The head of the 2-tuple is for the file actor which directly 58 manages some file resource. The tail is for the client actor 59 which will connect to the file actor socket (and presumably 60 some other socket). If either element of this 2-tuple is 61 itself a tuple then the head is treated as the actor function 62 and any remainder is passed to the function as user data 65 A file actor is called as: 67 file_handler(ctx, pipe, 71 The file actor function must adhere to a simple protocol on 72 its actor pipe. After issuing the usual actor ready signal 73 and before entering its loop, the file actor shall return on 74 its actor pipe as a string the address of a bound socket to 75 which any subsequent client actors may connect. 77 A client actor is called as: 79 client_handler(ctx, pipe, 80 bot, rule_object, file_addr, 83 The client actor has no protocol on its actor pipe other than 84 the usual ready signal. 91 An S-expression in terms of attributes evaluating to a 92 Boolean. First rule returning True is used. Here and 93 below "attributes" refer to the combination of BOT message 94 header values and any additional specified in the `attr` 95 Rule Attribute (described below). 99 Takes value "read" or "write" and determines if the flow 100 is to be read from or written to file. 104 An f-string formatted against the attributes which 105 resolves to the name of the file to be read or written. 109 An f-string formatted against attributes to determine and 110 to add a "hdfgroup" parameter to the BOT message prior to 111 it being sent into a read or write handler. The value 112 will be used to define an HDF base group path into which 113 messages from one stream will be mapped. This path must 114 be unique to the flow client. 118 An optional, extra dictionary of attributes updated on the 119 dictionary of message attributes prior to applying to the 120 rule or either patterns. 133 Given a bot, return a live actor or None if bot is rejected. 138 log.info(
'factory Ruleset called %d, have %d rules' % (
141 log.debug(f
'check rule: "{maybe}"')
142 parsed = rules.parse(maybe, **attr)
143 log.debug (f
'parsed: {parsed}')
144 tf = rules.evaluate(parsed)
146 log.debug (
"rule does not apply")
148 rattr = dict(maybe.get(
"attr",{}), **attr)
149 filename = maybe[
"filepat"].
format(**rattr)
151 log.debug(f
'{filename} ({rw})')
157 log.debug(
'rule has no "rw" attribute')
163 ractor, rargs = self.
ractor 164 log.debug(f
'launch_read: {ractor}, {rargs}')
165 actor = ZActor(self.
ctx, ractor,
178 wactor, wargs = self.
wactors[0]
179 wfile = ZActor(self.
ctx, wactor, filename, wargs)
180 waddr = wfile.pipe.recv_string()
182 err = f
"failed to bind any {self.addrpat} for {filename}" 184 raise RuntimeError(err)
186 log.debug(f
"made writer actor for {filename}")
190 wactor, wargs = self.
wactors[1]
192 actor = ZActor(self.
ctx, wactor,
199 log.debug(
'stop %d handlers' % len(self.
handlers))
201 handler.pipe.signal()
202 for filename, wactor
in self.
writers.items():
203 log.debug(f
'stop writer for {filename}')
void append(Message &msg, message_t &&data, const std::vector< size_t > &shape, size_t word_size, const char *tn)
def __init__(self, ctx, ruleset, wactors=(), ractor=())
def launch_read(self, filename, bot, rule)
def launch_write(self, filename, bot, rule)