ZIO
Python and C++ interface to ZeroMQ and Zyre
flow.cpp
Go to the documentation of this file.
1 #include "zio/flow.hpp"
2 #include "zio/logging.hpp"
3 
5  : m_port(port)
6  , m_credit(0)
7  , m_total_credit(0)
8  , m_sender(true)
9  , m_rid(0)
10 {
11 }
13 {
14  // fixme: should we eot here?
15 }
16 
17 
18 bool zio::flow::Flow::parse_label(Message& msg, zio::json& lobj)
19 {
20  std::string label = msg.label();
21  if (label.empty()) {
22  return true;
23  }
24  zio::info("parse_label({})", label.c_str());
25  try {
26  lobj = zio::json::parse(label);
27  }
28  catch (zio::json::exception& e) {
29  zio::warn("[flow {}]: {}",
30  m_port->name().c_str(), e.what());
31  zio::warn("[flow {}]: {}",
32  m_port->name().c_str(), label.c_str());
33  return false;
34  }
35  return true;
36 }
37 
38 
40 {
41  if (m_send_seqno != -1) {
42  throw std::runtime_error("flow::send_bot() already called");
43  }
44 
45  zio::debug("[flow {}]: send_bot", m_port->name().c_str());
46  zio::json fobj;
47  if (!parse_label(bot, fobj)) {
48  throw std::runtime_error("bad message label for flow::send_bot()");
49  }
50  fobj["flow"] = "BOT";
51  bot.set_seqno(m_send_seqno = 0);
52  bot.set_label(fobj.dump());
53  bot.set_form("FLOW");
54  if (m_rid) { bot.set_routing_id(m_rid); }
55  m_port->send(bot);
56 }
57 
58 
60 {
61  zio::debug("[flow {}]: recv_bot", m_port->name().c_str());
62  bool ok = m_port->recv(bot, timeout);
63  if (!ok) {
64  zio::warn("[flow {}]: timeout receiving BOT",
65  m_port->name().c_str());
66  return false;
67  }
68  std::string label = bot.label();
69  if (bot.seqno()) {
70  zio::warn("[flow {}]: bad BOT seqno: {}, label: {}",
71  m_port->name().c_str(), bot.seqno(), label.c_str());
72  return false;
73  }
74 
75  zio::debug("[flow {}]: seqno: {}, label: {}",
76  m_port->name().c_str(), bot.seqno(), label.c_str());
77 
78  zio::json fobj;
79  if (!parse_label(bot, fobj)) {
80  zio::warn("bad message label for flow::recv_bot()");
81  return false;
82  }
83  std::string flowtype = fobj["flow"];
84  if (flowtype != "BOT") {
85  zio::warn("[flow {}]: did not get BOT, got {}",
86  m_port->name().c_str(), flowtype.c_str());
87  return false;
88  }
89  // here, fobj is from the point of view of the OTHER end
90  std::string dir = fobj["direction"];
91  if (dir == "extract") {
92  m_sender = false; // we are receiver
93  m_total_credit = fobj["credit"];
94  m_credit = m_total_credit;
95  }
96  else if (dir == "inject") {
97  m_sender = true;
98  m_total_credit = fobj["credit"];
99  m_credit = 0;
100  }
101  else {
102  zio::warn("[flow {}]: unknown direction: {}",
103  m_port->name().c_str(), dir.c_str());
104  return false;
105  }
106  m_recv_seqno = bot.seqno();
107  m_rid = bot.routing_id();
108  if (m_rid) {
109  zio::debug("[flow {}]: routing id: {}",
110  m_port->name().c_str(), m_rid);
111  }
112 
113  return true;
114 }
115 
116 
117 
119 {
120  zio::Message msg;
121  bool ok = m_port->recv(msg, timeout);
122  if (!ok) { // timeout
123  return 0;
124  }
125  if (msg.seqno() - m_recv_seqno != 1) {
126  zio::warn("[flow {}] slurp_pay: bad seqno: {}, last seqno: {}",
127  m_port->name(), msg.seqno(), m_recv_seqno);
128  return slurp_pay(timeout);
129  }
130 
131  zio::json fobj;
132  if (!parse_label(msg, fobj)) {
133  zio::warn("[flow {}] slurp_pay: bad flow object: {}",
134  m_port->name(), msg.label().c_str());
135  return slurp_pay(timeout);
136  }
137 
138  std::string flowtype = fobj["flow"];
139  if (flowtype == "PAY") {
140  int credit = fobj["credit"];
141  zio::debug("[flow {}] recv PAY {} credit (rid:{})",
142  m_port->name().c_str(), credit, m_rid);
143  ++m_recv_seqno;
144  return credit;
145  }
146  if (flowtype == "EOT") {
147  return -1;
148  }
149  return -2;
150 }
151 
152 
154 {
155  if (m_credit < m_total_credit) {
156  // quick try to get any PAY already sitting in buffers
157  int c = slurp_pay(0);
158  if (c < 0) {
159  return false;
160  }
161  m_credit += c;
162  }
163  if (m_credit == 0) {
164  // no credit, we really have to wait until we get some PAY
165  int c = slurp_pay(-1);
166  if (c < 0) {
167  return false;
168  }
169  m_credit = c;
170  }
171 
172  if (m_send_seqno < 0) {
173  zio::error("[flow {}] send DAT {}",
174  m_port->name().c_str(), m_send_seqno);
175  throw std::runtime_error("flow::put() must send BOT first");
176  }
177 
178  zio::json fobj;
179  if (!parse_label(dat, fobj)) {
180  throw std::runtime_error("bad message label for Flow::put()");
181  }
182 
183  fobj["flow"] = "DAT";
184  dat.set_label(fobj.dump());
185  dat.set_form("FLOW");
186  dat.set_seqno(++m_send_seqno);
187  if (m_rid) { dat.set_routing_id(m_rid); }
188  m_port->send(dat);
189  --m_credit;
190  return true;
191 }
192 
194 {
195  if (!m_credit) {
196  return 0;
197  }
198  Message msg("FLOW");
199  zio::json obj{{"flow","PAY"},{"credit",m_credit}};
200  msg.set_label(obj.dump());
201  msg.set_seqno(++m_send_seqno);
202  zio::debug("[flow {}] send PAY {}, credit:{} (rid:{})",
203  m_port->name().c_str(), m_send_seqno, m_credit, m_rid);
204  const int nsent = m_credit;
205  m_credit=0;
206  if (m_rid) { msg.set_routing_id(m_rid); }
207  m_port->send(msg);
208 
209  return nsent;
210 }
211 
213 {
214  flush_pay();
215  zio::debug("[flow {}] get with {} credit (rid:{})",
216  m_port->name().c_str(), m_credit, m_rid);
217 
218  bool ok = m_port->recv(dat, timeout);
219  if (!ok) { return false; }
220 
221  if (dat.seqno() - m_recv_seqno != 1) {
222  zio::warn("[flow {}] get: bad seqno: {}, last seqno: {}",
223  m_port->name(), dat.seqno(), m_recv_seqno);
224  return false;
225  }
226  zio::json fobj;
227  if (!parse_label(dat, fobj)) {
228  throw std::runtime_error("bad message label for Flow::get()");
229  }
230  if (fobj["flow"] != "DAT") {
231  return false;
232  }
233  ++m_recv_seqno;
234  ++m_credit;
235  return true;
236 }
237 
239 {
240  msg.set_form("FLOW");
241  zio::json fobj;
242  if (!parse_label(msg, fobj)) {
243  throw std::runtime_error("bad message label for Flow::send_eot()");
244  }
245  fobj["flow"] = "EOT";
246  msg.set_label(fobj.dump());
247  msg.set_seqno(++m_send_seqno);
248  if (m_rid) { msg.set_routing_id(m_rid); }
249  m_port->send(msg);
250 }
251 
253 {
254  while (true) {
255  bool ok = m_port->recv(msg, timeout);
256  if (!ok) { // timeout
257  return false;
258  }
259  if (msg.seqno() - m_recv_seqno != 1) {
260  zio::warn("[flow {}] recv_eot: bad seqno: {}, last seqno: {}",
261  m_port->name(), msg.seqno(), m_recv_seqno);
262  return false;
263  }
264  zio::json fobj;
265  if (!parse_label(msg, fobj)) {
266  zio::warn("bad message label for Flow::recv_eot()");
267  return false;
268  }
269  std::string flowtype = fobj["flow"];
270  ++m_recv_seqno;
271  if (flowtype == "EOT") {
272  return true;
273  }
274  zio::debug("[flow {}] want EOT got {} (rid:{})",
275  m_port->name().c_str(), flowtype.c_str(), m_rid);
276  }
277 }
278 
279 
280 
281 
bool recv_eot(Message &msg, int timeout=-1)
Receive an EOT.
Definition: flow.cpp:252
void set_seqno(int seqno)
Explicit set.
Definition: message.hpp:93
Flow(portptr_t port)
create a flow.
Definition: flow.cpp:4
void send_eot(Message &msg)
send EOT.
Definition: flow.cpp:238
routing_id_t routing_id() const
Return routing ID if we have one.
Definition: message.hpp:115
std::string label() const
Definition: message.cpp:102
seqno_t seqno() const
Definition: message.hpp:86
bool put(Message &dat)
put a payload message into the flow
Definition: flow.cpp:153
int credit() const
Definition: flow.hpp:109
void set_form(const std::string &form)
Definition: message.cpp:93
std::shared_ptr< Port > portptr_t
The context can&#39;t be copied and ports like to be shared.
Definition: port.hpp:148
bool recv_bot(Message &bot, int timeout=-1)
receive a BOT
Definition: flow.cpp:59
void set_label(const std::string &label)
Definition: message.cpp:75
int slurp_pay(int timeout)
recv any waiting PAY messages
Definition: flow.cpp:118
def parse(rule_object, params)
Definition: rules.py:11
void send_bot(Message &bot)
send a BOT
Definition: flow.cpp:39
bool get(Message &dat, int timeout=-1)
get a payload message from the flow
Definition: flow.cpp:212
a ZIO message
Definition: message.hpp:59
int flush_pay()
send any accumulated credit as a PAY
Definition: flow.cpp:193
void set_routing_id(routing_id_t rid)
Set routing ID.
Definition: message.hpp:118
nlohmann::json json
Definition: interned.hpp:9