3 Support for reading HDF5 files. 10 from ..util
import message_to_dict
11 from zio
import Port, Message
15 log = logging.getLogger(
"zio")
18 '''Read ZIO TENS messages from HDF5 20 This is the inverse of @ref writer.TensWriter. See that class for 35 'Read and return a message' 39 seq = self.
group.get(gn)
41 log.error(f
'TensReader: failed to get {gn} from {self.group}')
44 tens = seq.get(
"tensors")
46 attrs = dict(tens.attrs)
48 origin = attrs.pop(
"origin"),
49 granule = attrs.pop(
"granule"),
50 seqno = attrs.pop(
"seqno"))
51 for k,v
in attrs.items():
52 if type(v) == numpy.int64:
55 umd = seq.get(
"metadata")
57 attrs[
"metadata"] = dict(umd.attrs)
58 msg.label_object = attrs
60 partnums = [int(p)
for p
in tens.keys()]
62 maxpart = max(partnums)
64 payload = [
None]*nparts
66 for part, ds
in tens.items():
72 log.debug(f
'TENS part {part}/{nparts} {dtype} {type(ds.dtype)} {ds.shape}')
75 dtype = ds.dtype.kind,
76 word = ds.dtype.alignment,
79 payload[part] = ds[:].tostring()
85 def handler(ctx, pipe, bot, rule_object, filename, broker_addr, *rargs):
87 log.debug(f
'actor: reader "{filename}"')
88 fp = h5py.File(filename,
'r') 91 rattr = dict(rule_object.get("attr",{}), **mattr)
92 base_path = rule_object.get(
"grouppat",
"/").
format(**rattr)
93 log.debug(f
'reader(msg, "{base_path}", "{broker_addr}")')
97 sock = ctx.socket(CLIENT)
98 port =
Port(
"read-handler", sock)
99 port.connect(broker_addr)
102 log.debug (f
'reader({base_path}) send BOT to {broker_addr}')
104 sg = fp.get(base_path)
106 log.error(f
'reader failed to get {base_path} from {filename}')
111 bot = flow.recv_bot()
112 log.debug (f
'reader({base_path}) got response:\n{bot}')
117 log.debug(f
'reader: {msg}')
def handler(ctx, pipe, bot, rule_object, filename, broker_addr, rargs)
A port holds a socket in the context of a node.
def __init__(self, group)