20 std::string label = msg.
label();
24 zio::info(
"parse_label({})", label.c_str());
28 catch (zio::json::exception& e) {
29 zio::warn(
"[flow {}]: {}",
30 m_port->name().c_str(), e.what());
31 zio::warn(
"[flow {}]: {}",
32 m_port->name().c_str(), label.c_str());
41 if (m_send_seqno != -1) {
42 throw std::runtime_error(
"flow::send_bot() already called");
45 zio::debug(
"[flow {}]: send_bot", m_port->name().c_str());
47 if (!parse_label(bot, fobj)) {
48 throw std::runtime_error(
"bad message label for flow::send_bot()");
61 zio::debug(
"[flow {}]: recv_bot", m_port->name().c_str());
62 bool ok = m_port->recv(bot, timeout);
64 zio::warn(
"[flow {}]: timeout receiving BOT",
65 m_port->name().c_str());
68 std::string label = bot.
label();
70 zio::warn(
"[flow {}]: bad BOT seqno: {}, label: {}",
71 m_port->name().c_str(), bot.
seqno(), label.c_str());
76 m_port->name().c_str(), bot.
seqno(), label.c_str());
79 if (!parse_label(bot, fobj)) {
80 zio::warn(
"bad message label for flow::recv_bot()");
83 std::string flowtype = fobj[
"flow"];
84 if (flowtype !=
"BOT") {
85 zio::warn(
"[flow {}]: did not get BOT, got {}",
86 m_port->name().c_str(), flowtype.c_str());
90 std::string dir = fobj[
"direction"];
91 if (dir ==
"extract") {
93 m_total_credit = fobj[
"credit"];
94 m_credit = m_total_credit;
96 else if (dir ==
"inject") {
98 m_total_credit = fobj[
"credit"];
102 zio::warn(
"[flow {}]: unknown direction: {}",
103 m_port->name().c_str(), dir.c_str());
106 m_recv_seqno = bot.
seqno();
110 m_port->name().c_str(), m_rid);
121 bool ok = m_port->recv(msg, timeout);
125 if (msg.
seqno() - m_recv_seqno != 1) {
126 zio::warn(
"[flow {}] slurp_pay: bad seqno: {}, last seqno: {}",
127 m_port->name(), msg.
seqno(), m_recv_seqno);
132 if (!parse_label(msg, fobj)) {
133 zio::warn(
"[flow {}] slurp_pay: bad flow object: {}",
134 m_port->name(), msg.
label().c_str());
138 std::string flowtype = fobj[
"flow"];
139 if (flowtype ==
"PAY") {
140 int credit = fobj[
"credit"];
141 zio::debug(
"[flow {}] recv PAY {} credit (rid:{})",
142 m_port->name().c_str(),
credit, m_rid);
146 if (flowtype ==
"EOT") {
155 if (m_credit < m_total_credit) {
172 if (m_send_seqno < 0) {
174 m_port->name().c_str(), m_send_seqno);
175 throw std::runtime_error(
"flow::put() must send BOT first");
179 if (!parse_label(dat, fobj)) {
180 throw std::runtime_error(
"bad message label for Flow::put()");
183 fobj[
"flow"] =
"DAT";
199 zio::json obj{{
"flow",
"PAY"},{
"credit",m_credit}};
202 zio::debug(
"[flow {}] send PAY {}, credit:{} (rid:{})",
203 m_port->name().c_str(), m_send_seqno, m_credit, m_rid);
204 const int nsent = m_credit;
215 zio::debug(
"[flow {}] get with {} credit (rid:{})",
216 m_port->name().c_str(), m_credit, m_rid);
218 bool ok = m_port->recv(dat, timeout);
219 if (!ok) {
return false; }
221 if (dat.
seqno() - m_recv_seqno != 1) {
222 zio::warn(
"[flow {}] get: bad seqno: {}, last seqno: {}",
223 m_port->name(), dat.
seqno(), m_recv_seqno);
227 if (!parse_label(dat, fobj)) {
228 throw std::runtime_error(
"bad message label for Flow::get()");
230 if (fobj[
"flow"] !=
"DAT") {
242 if (!parse_label(msg, fobj)) {
243 throw std::runtime_error(
"bad message label for Flow::send_eot()");
245 fobj[
"flow"] =
"EOT";
255 bool ok = m_port->recv(msg, timeout);
259 if (msg.
seqno() - m_recv_seqno != 1) {
260 zio::warn(
"[flow {}] recv_eot: bad seqno: {}, last seqno: {}",
261 m_port->name(), msg.
seqno(), m_recv_seqno);
265 if (!parse_label(msg, fobj)) {
266 zio::warn(
"bad message label for Flow::recv_eot()");
269 std::string flowtype = fobj[
"flow"];
271 if (flowtype ==
"EOT") {
274 zio::debug(
"[flow {}] want EOT got {} (rid:{})",
275 m_port->name().c_str(), flowtype.c_str(), m_rid);
bool recv_eot(Message &msg, int timeout=-1)
Receive an EOT.
void set_seqno(int seqno)
Explicit set.
Flow(portptr_t port)
create a flow.
void send_eot(Message &msg)
send EOT.
routing_id_t routing_id() const
Return routing ID if we have one.
std::string label() const
bool put(Message &dat)
put a payload message into the flow
void set_form(const std::string &form)
std::shared_ptr< Port > portptr_t
The context can't be copied and ports like to be shared.
bool recv_bot(Message &bot, int timeout=-1)
receive a BOT
void set_label(const std::string &label)
int slurp_pay(int timeout)
recv any waiting PAY messages
def parse(rule_object, params)
void send_bot(Message &bot)
send a BOT
bool get(Message &dat, int timeout=-1)
get a payload message from the flow
int flush_pay()
send any accumulated credit as a PAY
void set_routing_id(routing_id_t rid)
Set routing ID.