ZIO
Python and C++ interface to ZeroMQ and Zyre
reader.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 '''
3 Support for reading HDF5 files.
4 '''
5 
6 import json
7 import h5py
8 import numpy
9 from zmq import CLIENT
10 from ..util import message_to_dict
11 from zio import Port, Message
12 from zio.flow import Flow
13 
14 import logging
15 log = logging.getLogger("zio")
16 
17 class TensReader:
18  '''Read ZIO TENS messages from HDF5
19 
20  This is the inverse of @ref writer.TensWriter. See that class for
21  details.
22 
23  '''
24 
25  # needs to match TensWriter
26  seqno_interp = "%04d"
27  part_interp = "%02d"
28 
29  def __init__(self, group):
30  assert(group)
31  self.group = group
32  self.seqno = 0
33 
34  def read(self):
35  'Read and return a message'
36  # start at 1 because writer doesn't save anything for BOT
37  self.seqno += 1 # fixme: should iterate seqnos over subgroups?
38  gn = self.seqno_interp % self.seqno
39  seq = self.group.get(gn)
40  if not seq:
41  log.error(f'TensReader: failed to get {gn} from {self.group}')
42  return
43 
44  tens = seq.get("tensors")
45 
46  attrs = dict(tens.attrs)
47  msg = Message(form='FLOW',
48  origin = attrs.pop("origin"),
49  granule = attrs.pop("granule"),
50  seqno = attrs.pop("seqno"))
51  for k,v in attrs.items():
52  if type(v) == numpy.int64:
53  attrs[k] = int(v)
54 
55  umd = seq.get("metadata")
56  if umd:
57  attrs["metadata"] = dict(umd.attrs)
58  msg.label_object = attrs
59 
60  partnums = [int(p) for p in tens.keys()]
61  ntens = len(partnums)
62  maxpart = max(partnums)
63  nparts = maxpart+1
64  payload = [None]*nparts
65  tensors = list()
66  for part, ds in tens.items():
67  part = int(part)
68  # fixme there are more TENS attr which might be needed if
69  # the file wasn't written by writer.TensWriter!
70  md = dict(ds.attrs)
71  dtype = str(ds.dtype)
72  log.debug(f'TENS part {part}/{nparts} {dtype} {type(ds.dtype)} {ds.shape}')
73  md.update(dict(
74  shape = ds.shape,
75  dtype = ds.dtype.kind,
76  word = ds.dtype.alignment,
77  part = part))
78  tensors.append(md)
79  payload[part] = ds[:].tostring()
80 
81  msg.payload = payload
82  return msg
83 
84 
85 def handler(ctx, pipe, bot, rule_object, filename, broker_addr, *rargs):
86 
87  log.debug(f'actor: reader "{filename}"')
88  fp = h5py.File(filename,'r')
89 
90  mattr = message_to_dict(bot)
91  rattr = dict(rule_object.get("attr",{}), **mattr)
92  base_path = rule_object.get("grouppat","/").format(**rattr)
93  log.debug(f'reader(msg, "{base_path}", "{broker_addr}")')
94  log.debug(bot)
95  pipe.signal()
96 
97  sock = ctx.socket(CLIENT)
98  port = Port("read-handler", sock)
99  port.connect(broker_addr)
100  port.online(None)
101  flow = Flow(port)
102  log.debug (f'reader({base_path}) send BOT to {broker_addr}')
103 
104  sg = fp.get(base_path)
105  if not sg:
106  log.error(f'reader failed to get {base_path} from {filename}')
107  return
108  fr = TensReader(sg, *rargs)
109 
110  flow.send_bot(bot) # this introduces us to the server
111  bot = flow.recv_bot()
112  log.debug (f'reader({base_path}) got response:\n{bot}')
113  flow.slurp_pay()
114 
115  while True:
116  msg = fr.read()
117  log.debug(f'reader: {msg}')
118  if not msg:
119  break
120  ok = flow.put(msg)
121  if not ok:
122  break;
123  flow.send_eot()
124  flow.recv_eot()
125 
ZIO data flow.
Definition: flow.hpp:17
def message_to_dict(msg)
Definition: util.py:44
def handler(ctx, pipe, bot, rule_object, filename, broker_addr, rargs)
Definition: reader.py:85
A port holds a socket in the context of a node.
Definition: port.hpp:27
a ZIO message
Definition: message.hpp:59
def __init__(self, group)
Definition: reader.py:29