14 auto sport = snode.
port(
"recver", ZMQ_SERVER);
20 const char*
addr =
"ipc://testflow";
26 auto cport = cnode.
port(
"sender", ZMQ_CLIENT);
38 const int credits_in_play = 2;
41 {
"credit",credits_in_play},
42 {
"direction",
"extract"}};
60 std::string dir = fobj[
"direction"];
61 assert (dir ==
"extract");
62 fobj[
"direction"] =
"inject";
64 int credit = fobj[
"credit"];
65 zio::debug(
"sflow credit: {}, rid: {}, stype:{}",
83 assert (credit == credits_in_play);
85 assert (0 == sflow.
credit());
96 assert(0 == sflow.
credit());
99 assert(sflow.
credit() == 1);
bool recv_eot(Message &msg, int timeout=-1)
Receive an EOT.
An identified vertex in a ported, directed graph.
void online(const headerset_t &extra_headers={})
Bring the node online.
const char * addr
inproc hangs. no messages ever get received by server. tcp/ipc okay.
void set_verbose(bool verbose=true)
Set verbose for underlying Zyre and internal debug messages.
void send_eot(Message &msg)
send EOT.
routing_id_t routing_id() const
Return routing ID if we have one.
portptr_t port(const std::string &name, int stype)
Create a named port with the given socket type.
std::string label() const
bool put(Message &dat)
put a payload message into the flow
bool recv_bot(Message &bot, int timeout=-1)
receive a BOT
void set_label(const std::string &label)
def parse(rule_object, params)
int sock_type(const socket_t &sock)
Return the ZeroMQ socket type number for the socket.
void send_bot(Message &bot)
send a BOT
bool get(Message &dat, int timeout=-1)
get a payload message from the flow
int flush_pay()
send any accumulated credit as a PAY