ZIO
Python and C++ interface to ZeroMQ and Zyre
message.cpp
Go to the documentation of this file.
1 #include "zio/message.hpp"
2 #include "zio/exceptions.hpp"
3 #include <czmq.h>
4 #include <sstream>
5 #include <chrono>
6 
8 {
9  const char* level_names[] = {
10  "undefined",
11  "trace","verbose","debug","info","summary",
12  "warning","error","fatal",
13  0
14  };
15  return level_names[lvl];
16 }
17 
18 
19 
20 std::string zio::PrefixHeader::dumps() const
21 {
22  std::stringstream ss;
23  char num = '0' + level;
24  ss << "ZIO" << num << form;
25  if (label.size() > 0) {
26  ss << label;
27  }
28  return ss.str();
29 }
30 
31 bool zio::PrefixHeader::loads(const std::string& p)
32 {
33  const size_t siz = p.size();
34  if (siz < 8) return false;
35  if (p.substr(0,3) != "ZIO") return false;
36  level = static_cast<zio::level::MessageLevel>(p[3]-'0');
37  form = p.substr(4,4);
38  label = p.substr(8);
39  return true;
40 }
41 
43  : m_rid(0)
44 {
45 }
46 
47 zio::Message::Message(const header_t h, multipart_t&& pl)
48  : m_header(h)
49  , m_payload(std::move(pl))
50  , m_rid(0)
51 {
52 }
53 
55  : m_header(h)
56  , m_rid(0)
57 {
58 }
59 
61  : m_header({{lvl," ",""},{0,0,0}})
62  , m_rid(0)
63 {
64  set_form(form);
65 }
66 
68 {
69  return m_header.prefix.level;
70 }
72 {
73  m_header.prefix.level = level;
74 }
75 void zio::Message::set_label(const std::string& label)
76 {
77  m_header.prefix.label = label;
78 }
79 
81 {
82  return json::parse(label());
83 }
85 {
86  set_label(lobj.dump());
87 }
88 
89 
90 std::string zio::Message::form() const {
91  return m_header.prefix.form;
92 }
93 void zio::Message::set_form(const std::string& form)
94 {
95  m_header.prefix.form = " ";
96  const size_t maxsiz = 4;
97  const size_t siz = std::min(form.size(), maxsiz);
98  for (size_t ind=0; ind<siz; ++ind) {
99  m_header.prefix.form[ind] = form[ind];
100  }
101 }
102 std::string zio::Message::label() const {
103  return m_header.prefix.label;
104 }
105 
107 {
108  if (gran == 0) {
109  auto tmp = std::chrono::system_clock::now().time_since_epoch();
110  gran = std::chrono::duration_cast<std::chrono::microseconds>(tmp).count();
111  }
112  m_header.coord.granule = gran;
113  if (origin) {
114  m_header.coord.origin = origin;
115  }
116 }
117 
119 {
120  auto mmsg = toparts();
121  auto spmsg = mmsg.encode();
122  if (m_rid>0) {
123  spmsg.set_routing_id(m_rid);
124  }
125  return spmsg;
126 }
127 
129 {
130  fromparts(zio::multipart_t::decode(data));
131  m_rid = data.routing_id();
132 }
133 
134 zio::multipart_t zio::Message::toparts() const
135 {
136  zio::multipart_t mpmsg;
137 
138  std::string p = m_header.prefix.dumps();
139  mpmsg.addmem(p.data(), p.size());
140  mpmsg.addtyp(m_header.coord);
141  for (const auto& spmsg: m_payload) {
142  mpmsg.addmem(spmsg.data(), spmsg.size());
143  }
144  return mpmsg;
145 }
146 
147 void zio::Message::fromparts(const zio::multipart_t& mpmsg)
148 {
149  m_rid = 0;
150  const size_t nparts = mpmsg.size();
151 
152  const auto& m0 = mpmsg[0];
153  std::string p(static_cast<const char*>(m0.data()), m0.size());
154  bool ok = m_header.prefix.loads(p);
155  if (!ok) {
156  throw std::runtime_error("failed to parse prefix from parts");
157  }
158 
159  const auto& m1 = mpmsg[1];
160  m_header.coord = *m1.data<zio::CoordHeader>();
161 
162  m_payload.clear();
163  for (size_t ind=2; ind<nparts; ++ind) {
164  const auto& m = mpmsg[ind];
165  m_payload.addmem(m.data(), m.size());
166  }
167 }
std::string form() const
Definition: message.cpp:90
def now()
Definition: test_ugly.py:10
level::MessageLevel level() const
Definition: message.cpp:67
std::string label() const
Definition: message.cpp:102
std::string label
Definition: message.hpp:29
CoordHeader coord
Definition: message.hpp:47
zio::json label_object() const
Helper, when label holds a JSON object.
Definition: message.cpp:80
origin_t origin
Definition: message.hpp:41
message_t encode() const
Definition: message.cpp:118
void set_level(level::MessageLevel level)
Definition: message.cpp:71
multipart_t toparts() const
Serialize self to multipart.
Definition: message.cpp:134
origin_t origin() const
Definition: message.hpp:84
void set_form(const std::string &form)
Definition: message.cpp:93
void set_coord(origin_t origin=0, granule_t gran=0)
Definition: message.cpp:106
PrefixHeader prefix
Definition: message.hpp:46
granule_t granule
Definition: message.hpp:42
uint64_t granule_t
Definition: message.hpp:37
level::MessageLevel level
Definition: message.hpp:27
void decode(const message_t &dat)
Definition: message.cpp:128
void set_label(const std::string &label)
Definition: message.cpp:75
std::string dumps() const
Definition: message.cpp:20
uint64_t origin_t
Definition: message.hpp:36
const char * name(MessageLevel lvl)
Definition: message.cpp:7
def parse(rule_object, params)
Definition: rules.py:11
std::string form
Definition: message.hpp:28
void set_label_object(const zio::json &lobj)
Definition: message.cpp:84
nlohmann::json json
Definition: interned.hpp:9
void fromparts(const multipart_t &allparts)
Set self from multipart. Nullifyies routing ID.
bool loads(const std::string &s)
Definition: message.cpp:31