ZIO
Python and C++ interface to ZeroMQ and Zyre
test_gdp.cpp
Go to the documentation of this file.
1 
26 #include "zio/domo/broker.hpp"
27 #include "zio/domo/client.hpp"
28 #include "zio/domo/worker.hpp"
29 #include "zio/logging.hpp"
30 #include "zio/actor.hpp"
31 
32 using namespace zio::domo;
33 
34 void countdown_echo(zio::socket_t& link, std::string address, int socktype)
35 {
37  zio::socket_t sock(ctx, socktype);
38  Client client(sock, address);
39 
40  link.send(zio::message_t{}, zio::send_flags::none);
41 
42  int countdown = 4;
43 
44  while (countdown) {
45  --countdown;
46  std::stringstream ss;
47  if (countdown) {
48  ss << countdown << "...";
49  }
50  else {
51  ss << "blast off!";
52  }
53  zio::multipart_t mmsg(ss.str());
54  client.send("echo", mmsg);
55  mmsg.clear();
56  client.recv(mmsg);
57  if (mmsg.empty()) {
58  zio::error("countdown echo timeout");
59  break;
60  }
61  {
62  std::stringstream ss;
63  ss << "countdown echo [" << mmsg.size() << "]:";
64  while (mmsg.size()) {
65  ss << "\n\t" << mmsg.popstr();
66  }
67  zio::info(ss.str());
68  }
69  }
70  link.send(zio::message_t{}, zio::send_flags::none);
71  zio::message_t die;
72  auto res = link.recv(die);
73  zio::debug("countdown echo exiting");
74 }
75 
76 
77 void doit(int serverish, int clientish, int nclients, int nworkers)
78 {
79  std::stringstream ss;
80  ss<<"main doit("<<serverish<<","<<clientish<<","<<nworkers<<","<<nclients<<")";
81  zio::info(ss.str());
82 
84  std::string broker_address = "tcp://127.0.0.1:5555";
85 
86  std::vector<zio::zactor_t*> clients;
87  std::vector<std::pair<std::string, zio::zactor_t*> > actors;
88 
89 
90  zio::debug("main make broker actor");
91  actors.push_back({"broker",new zio::zactor_t(ctx, broker_actor, broker_address, serverish)});
92 
93  while (nworkers--) {
94  zio::debug("main make worker actor");
95  actors.push_back({"worker",new zio::zactor_t(ctx, echo_worker, broker_address, clientish)});
96  }
97 
98  while (nclients--) {
99  zio::debug("main make client actor");
100  auto client = new zio::zactor_t(ctx, countdown_echo, broker_address, clientish);
101  actors.push_back({"client",client});
102  clients.push_back(client);
103  }
104 
105  for (auto client : clients) {
106  zio::debug("main wait for client");
107  zio::message_t done;
108  auto res = client->link().recv(done);
109  }
110 
111  // terminate backwards
112  for (auto it = actors.rbegin(); it != actors.rend(); ++it) {
113  zio::debug("main terminate actor " + it->first);
114  zio::zactor_t* actor = it->second;
115  actor->link().send(zio::message_t{}, zio::send_flags::none);
116  delete actor;
117  }
118  zio::debug("main doit exiting");
119 }
120 
121 int main(int argc, char* argv[])
122 {
124 
125  std::string which = "server";
126  if (argc > 1) {
127  which = argv[1];
128  }
129 
130  int nclients = 1;
131  if (argc > 2) {
132  nclients = atoi(argv[2]);
133  }
134  int nworkers = 1;
135  if (argc > 3) {
136  nworkers = atoi(argv[3]);
137  }
138 
139  if (which == "server") {
140  doit(ZMQ_SERVER, ZMQ_CLIENT, nclients, nworkers);
141  }
142  else {
143  doit(ZMQ_ROUTER, ZMQ_DEALER, nclients, nworkers);
144  }
145  return 0;
146 }
147 
148 
void catch_signals()
Definition: util.cpp:200
int main(int argc, char *argv[])
Definition: test_gdp.cpp:121
size_t send(const void *buf_, size_t len_, int flags_=0)
Definition: zmq.hpp:1345
void echo_worker(zio::socket_t &link, std::string address, int socktype)
void doit(int serverish, int clientish, int nclients, int nworkers)
Definition: test_gdp.cpp:77
void broker_actor(zio::socket_t &link, std::string address, int socktype)
socket_ref link()
Definition: actor.hpp:99
void countdown_echo(zio::socket_t &link, std::string address, int socktype)
Definition: test_gdp.cpp:34
void client(std::string str)
void recv(zio::multipart_t &reply)
Definition: domo_client.cpp:54
void send(std::string service, zio::multipart_t &request)
Definition: domo_client.cpp:44