ZIO
Python and C++ interface to ZeroMQ and Zyre
check_flowc.cpp
Go to the documentation of this file.
1 #include "zio/flow.hpp"
2 #include "zio/node.hpp"
3 #include <random>
4 
5 using namespace std;
6 
7 int main()
8 {
9  zsys_init();
10 
11  zio::Node cnode("client", 2);
12  cnode.set_verbose();
13  auto cport = cnode.port("sender", ZMQ_CLIENT);
14  cport->connect("testflows","recver");
15  cnode.online();
16 
17  zsys_debug("create flows");
18 
19  zio::flow::Flow cflow(cport);
20 
21  zio::Message msg("FLOW");
22 
23  const int credits_in_play = 2;
24 
25  zio::json fobj = {{"flow","BOT"},
26  {"credit",credits_in_play},
27  {"direction","extract"}};
28  msg.set_label(fobj.dump());
29 
30  bool ok;
31 
32  zsys_debug("cflow send BOT");
33  cflow.send_bot(msg);
34 
35  zsys_debug("cflow recv BOT");
36  ok = cflow.recv_bot(msg);
37  assert(ok);
38 
39  assert (cflow.is_sender());
40 
41 
42  std::default_random_engine rng;
43  std::uniform_real_distribution<double> maybe(0,1);
44 
45  bool I_quit = false;
46  while (true) {
47  zsys_debug ("cflow send DAT");
48  zio::json lobj{{"flow","DAT"}};
49  msg.set_label(lobj.dump());
50  ok = cflow.put(msg);
51  if (!ok) {
52  zsys_debug("false return, maybe EOT");
53  break;
54  }
55  zsys_debug("have credit %d/%d",
56  cflow.credit(),
57  cflow.total_credit() );
58 
59  //assert(cflow.total_credit() - cflow.credit() == 1);
60  if (maybe(rng) > 0.9) {
61  I_quit = true;
62  break;
63  }
64  }
65  if (I_quit) {
66  zsys_debug("cflow send EOT");
67  cflow.send_eot(msg);
68  ok = cflow.recv_eot(msg, -1);
69  assert(ok);
70  }
71  else {
72  zsys_debug("cflow recv EOT");
73  cflow.send_eot(msg);
74  }
75  return 0;
76 }
bool recv_eot(Message &msg, int timeout=-1)
Receive an EOT.
Definition: flow.cpp:252
An identified vertex in a ported, directed graph.
Definition: node.hpp:30
void online(const headerset_t &extra_headers={})
Bring the node online.
Definition: node.cpp:59
void set_verbose(bool verbose=true)
Set verbose for underlying Zyre and internal debug messages.
Definition: node.cpp:96
int main()
Definition: check_flowc.cpp:7
void send_eot(Message &msg)
send EOT.
Definition: flow.cpp:238
portptr_t port(const std::string &name, int stype)
Create a named port with the given socket type.
Definition: node.cpp:37
bool is_sender() const
Definition: flow.hpp:108
bool I_quit
Definition: test_ugly.py:173
bool put(Message &dat)
put a payload message into the flow
Definition: flow.cpp:153
int credit() const
Definition: flow.hpp:109
bool recv_bot(Message &bot, int timeout=-1)
receive a BOT
Definition: flow.cpp:59
int total_credit() const
Definition: flow.hpp:110
void set_label(const std::string &label)
Definition: message.cpp:75
void send_bot(Message &bot)
send a BOT
Definition: flow.cpp:39
a ZIO message
Definition: message.hpp:59
nlohmann::json json
Definition: interned.hpp:9