ZIO
Python and C++ interface to ZeroMQ and Zyre
test_flow.cpp
Go to the documentation of this file.
1 #include "zio/flow.hpp"
2 #include "zio/node.hpp"
3 #include "zio/main.hpp"
4 #include "zio/logging.hpp"
5 
6 using namespace std;
7 
8 int main()
9 {
10  zio::init_all();
11 
12  zio::Node snode("server", 1);
13  snode.set_verbose();
14  auto sport = snode.port("recver", ZMQ_SERVER);
15  // C/S can't do inproc!
17  // avoid testing with tcp as we tend to use the same ports in multiple tests
19  // goldilocks
20  const char* addr = "ipc://testflow";
21  sport->bind(addr);
22  snode.online();
23 
24  zio::Node cnode("client", 2);
25  cnode.set_verbose();
26  auto cport = cnode.port("sender", ZMQ_CLIENT);
27  cport->connect(addr);
28  cnode.online();
29 
30  zio::debug("create flows");
31 
32  // flow normally acts as a client but can act as a server
33  zio::flow::Flow sflow(sport);
34  zio::flow::Flow cflow(cport);
35 
36  zio::Message msg("FLOW");
37 
38  const int credits_in_play = 2;
39 
40  zio::json fobj = {{"flow","BOT"},
41  {"credit",credits_in_play},
42  {"direction","extract"}};
43  msg.set_label(fobj.dump());
44 
45  bool ok;
46 
47  zio::debug("cflow send BOT");
48  cflow.send_bot(msg);
49 
50  zio::debug("sflow recv BOT");
51  ok = sflow.recv_bot(msg);
52  assert(ok);
53  zio::debug("sflow recv'ed");
54  auto rid = msg.routing_id();
55  zio::debug("sflow msg.label: {}, rid: {}", msg.label().c_str(), rid);
56  assert(rid);
57 
58 
59  fobj = zio::json::parse(msg.label());
60  std::string dir = fobj["direction"];
61  assert (dir == "extract");
62  fobj["direction"] = "inject";
63  msg.set_label(fobj.dump());
64  int credit = fobj["credit"];
65  zio::debug("sflow credit: {}, rid: {}, stype:{}",
66  credit, rid, zio::sock_type(sport->socket()));
67 
68  assert (!sflow.is_sender());
69  zio::debug("sflow msg.label: {}", msg.label());
70  zio::debug("sflow send BOT");
71  assert(msg.routing_id() == rid);
72  sflow.send_bot(msg);
73  zio::debug("cflow recv BOT");
74  ok = cflow.recv_bot(msg);
75  assert(ok);
76 
77  assert (cflow.is_sender());
78 
79  // at this point deadlock could occur because this test is
80  // synchronous between both client and server. We must manually
81  // prime the pump. Internally the server's get() would do this.
82  credit = sflow.flush_pay();
83  assert (credit == credits_in_play);
84  assert (credits_in_play == sflow.total_credit());
85  assert (0 == sflow.credit());
86 
87  zio::debug("cflow send DAT");
88  zio::json lobj{{"flow","DAT"}};
89  msg.set_label(lobj.dump());
90  ok = cflow.put(msg);
91  assert(ok);
92  assert(cflow.total_credit() - cflow.credit() == 1);
93 
94  zio::debug("sflow recv DAT, credit {}/{}",
95  sflow.credit(), sflow.total_credit());
96  assert(0 == sflow.credit());
97  ok = sflow.get(msg);
98  assert(ok);
99  assert(sflow.credit() == 1);
100 
101  zio::debug("sflow send EOT");
102  sflow.send_eot(msg);
103  zio::debug("cflow send EOT");
104  ok = cflow.recv_eot(msg);
105  assert(ok);
106  cflow.send_eot(msg);
107  sflow.recv_eot(msg);
108  zio::debug("done");
109  return 0;
110 }
int main()
Definition: test_flow.cpp:8
bool recv_eot(Message &msg, int timeout=-1)
Receive an EOT.
Definition: flow.cpp:252
An identified vertex in a ported, directed graph.
Definition: node.hpp:30
void online(const headerset_t &extra_headers={})
Bring the node online.
Definition: node.cpp:59
const char * addr
inproc hangs. no messages ever get received by server. tcp/ipc okay.
Definition: test_tcs.cpp:16
void set_verbose(bool verbose=true)
Set verbose for underlying Zyre and internal debug messages.
Definition: node.cpp:96
void send_eot(Message &msg)
send EOT.
Definition: flow.cpp:238
routing_id_t routing_id() const
Return routing ID if we have one.
Definition: message.hpp:115
portptr_t port(const std::string &name, int stype)
Create a named port with the given socket type.
Definition: node.cpp:37
std::string label() const
Definition: message.cpp:102
bool is_sender() const
Definition: flow.hpp:108
bool put(Message &dat)
put a payload message into the flow
Definition: flow.cpp:153
int credit() const
Definition: flow.hpp:109
bool recv_bot(Message &bot, int timeout=-1)
receive a BOT
Definition: flow.cpp:59
int total_credit() const
Definition: flow.hpp:110
void set_label(const std::string &label)
Definition: message.cpp:75
void init_all()
Definition: main.cpp:22
def parse(rule_object, params)
Definition: rules.py:11
int sock_type(const socket_t &sock)
Return the ZeroMQ socket type number for the socket.
Definition: interned.cpp:6
void send_bot(Message &bot)
send a BOT
Definition: flow.cpp:39
bool get(Message &dat, int timeout=-1)
get a payload message from the flow
Definition: flow.cpp:212
a ZIO message
Definition: message.hpp:59
int flush_pay()
send any accumulated credit as a PAY
Definition: flow.cpp:193
nlohmann::json json
Definition: interned.hpp:9