ZIO
Python and C++ interface to ZeroMQ and Zyre
zmq_addon.hpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2016-2017 ZeroMQ community
3  Copyright (c) 2016 VOCA AS / Harald Nøkland
4 
5  Permission is hereby granted, free of charge, to any person obtaining a copy
6  of this software and associated documentation files (the "Software"), to
7  deal in the Software without restriction, including without limitation the
8  rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9  sell copies of the Software, and to permit persons to whom the Software is
10  furnished to do so, subject to the following conditions:
11 
12  The above copyright notice and this permission notice shall be included in
13  all copies or substantial portions of the Software.
14 
15  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21  IN THE SOFTWARE.
22 */
23 
24 #ifndef __ZMQ_ADDON_HPP_INCLUDED__
25 #define __ZMQ_ADDON_HPP_INCLUDED__
26 
27 #include "zio/zmq.hpp"
28 
29 #include <deque>
30 #include <iomanip>
31 #include <sstream>
32 #include <stdexcept>
33 #ifdef ZMQ_CPP11
34 #include <limits>
35 #include <functional>
36 #include <unordered_map>
37 #endif
38 
39 namespace zmq
40 {
41 #ifdef ZMQ_CPP11
42 
43 namespace detail
44 {
45 template<bool CheckN, class OutputIt>
46 recv_result_t
47 recv_multipart_n(socket_ref s, OutputIt out, size_t n, recv_flags flags)
48 {
49  size_t msg_count = 0;
50  message_t msg;
51  while (true) {
52  if (CheckN) {
53  if (msg_count >= n)
54  throw std::runtime_error(
55  "Too many message parts in recv_multipart_n");
56  }
57  if (!s.recv(msg, flags)) {
58  // zmq ensures atomic delivery of messages
59  assert(msg_count == 0);
60  return {};
61  }
62  ++msg_count;
63  const bool more = msg.more();
64  *out++ = std::move(msg);
65  if (!more)
66  break;
67  }
68  return msg_count;
69 }
70 } // namespace detail
71 
72 /* Receive a multipart message.
73 
74  Writes the zmq::message_t objects to OutputIterator out.
75  The out iterator must handle an unspecified number of writes,
76  e.g. by using std::back_inserter.
77 
78  Returns: the number of messages received or nullopt (on EAGAIN).
79  Throws: if recv throws. Any exceptions thrown
80  by the out iterator will be propagated and the message
81  may have been only partially received with pending
82  message parts. It is adviced to close this socket in that event.
83 */
84 template<class OutputIt>
85 ZMQ_NODISCARD recv_result_t recv_multipart(socket_ref s,
86  OutputIt out,
87  recv_flags flags = recv_flags::none)
88 {
89  return detail::recv_multipart_n<false>(s, std::move(out), 0, flags);
90 }
91 
92 /* Receive a multipart message.
93 
94  Writes at most n zmq::message_t objects to OutputIterator out.
95  If the number of message parts of the incoming message exceeds n
96  then an exception will be thrown.
97 
98  Returns: the number of messages received or nullopt (on EAGAIN).
99  Throws: if recv throws. Throws std::runtime_error if the number
100  of message parts exceeds n (exactly n messages will have been written
101  to out). Any exceptions thrown
102  by the out iterator will be propagated and the message
103  may have been only partially received with pending
104  message parts. It is adviced to close this socket in that event.
105 */
106 template<class OutputIt>
107 ZMQ_NODISCARD recv_result_t recv_multipart_n(socket_ref s,
108  OutputIt out,
109  size_t n,
110  recv_flags flags = recv_flags::none)
111 {
112  return detail::recv_multipart_n<true>(s, std::move(out), n, flags);
113 }
114 
115 /* Send a multipart message.
116 
117  The range must be a ForwardRange of zmq::message_t,
118  zmq::const_buffer or zmq::mutable_buffer.
119  The flags may be zmq::send_flags::sndmore if there are
120  more message parts to be sent after the call to this function.
121 
122  Returns: the number of messages sent (exactly msgs.size()) or nullopt (on EAGAIN).
123  Throws: if send throws. Any exceptions thrown
124  by the msgs range will be propagated and the message
125  may have been only partially sent. It is adviced to close this socket in that event.
126 */
127 template<class Range
128 #ifndef ZMQ_CPP11_PARTIAL
129  ,
130  typename = typename std::enable_if<
131  detail::is_range<Range>::value
132  && (std::is_same<detail::range_value_t<Range>, message_t>::value
133  || detail::is_buffer<detail::range_value_t<Range>>::value)>::type
134 #endif
135  >
136 send_result_t
137 send_multipart(socket_ref s, Range &&msgs, send_flags flags = send_flags::none)
138 {
139  using std::begin;
140  using std::end;
141  auto it = begin(msgs);
142  const auto end_it = end(msgs);
143  size_t msg_count = 0;
144  while (it != end_it) {
145  const auto next = std::next(it);
146  const auto msg_flags =
147  flags | (next == end_it ? send_flags::none : send_flags::sndmore);
148  if (!s.send(*it, msg_flags)) {
149  // zmq ensures atomic delivery of messages
150  assert(it == begin(msgs));
151  return {};
152  }
153  ++msg_count;
154  it = next;
155  }
156  return msg_count;
157 }
158 
159 /* Encode a multipart message.
160 
161  The range must be a ForwardRange of zmq::message_t. A
162  zmq::multipart_t or STL container may be passed for encoding.
163 
164  Returns: a zmq::message_t holding the encoded multipart data.
165 
166  Throws: std::range_error is thrown if the size of any single part
167  can not fit in an unsigned 32 bit integer.
168 
169  The encoding is compatible with that used by the CZMQ function
170  zmsg_encode(). Each part consists of a size followed by the data.
171  These are placed contiguously into the output message. A part of
172  size less than 255 bytes will have a single byte size value.
173  Larger parts will have a five byte size value with the first byte
174  set to 0xFF and the remaining four bytes holding the size of the
175  part's data.
176 */
177 template<class Range
178 #ifndef ZMQ_CPP11_PARTIAL
179  ,
180  typename = typename std::enable_if<
181  detail::is_range<Range>::value
182  && (std::is_same<detail::range_value_t<Range>, message_t>::value
183  || detail::is_buffer<detail::range_value_t<Range>>::value)>::type
184 #endif
185  >
186 message_t encode(const Range &parts)
187 {
188  size_t mmsg_size = 0;
189 
190  // First pass check sizes
191  for (const auto &part : parts) {
192  size_t part_size = part.size();
193  if (part_size > std::numeric_limits<std::uint32_t>::max()) {
194  // Size value must fit into uint32_t.
195  throw std::range_error("Invalid size, message part too large");
196  }
197  size_t count_size = 5;
198  if (part_size < std::numeric_limits<std::uint8_t>::max()) {
199  count_size = 1;
200  }
201  mmsg_size += part_size + count_size;
202  }
203 
204  message_t encoded(mmsg_size);
205  unsigned char *buf = encoded.data<unsigned char>();
206  for (const auto &part : parts) {
207  uint32_t part_size = part.size();
208  const unsigned char *part_data =
209  static_cast<const unsigned char *>(part.data());
210 
211  // small part
212  if (part_size < std::numeric_limits<std::uint8_t>::max()) {
213  *buf++ = (unsigned char) part_size;
214  memcpy(buf, part_data, part_size);
215  buf += part_size;
216  continue;
217  }
218 
219  // big part
220  *buf++ = std::numeric_limits<uint8_t>::max();
221  *buf++ = (part_size >> 24) & std::numeric_limits<std::uint8_t>::max();
222  *buf++ = (part_size >> 16) & std::numeric_limits<std::uint8_t>::max();
223  *buf++ = (part_size >> 8) & std::numeric_limits<std::uint8_t>::max();
224  *buf++ = part_size & std::numeric_limits<std::uint8_t>::max();
225  memcpy(buf, part_data, part_size);
226  buf += part_size;
227  }
228  return encoded;
229 }
230 
231 /* Decode an encoded message to multiple parts.
232 
233  The given output iterator must be a ForwardIterator to a container
234  holding zmq::message_t such as a zmq::multipart_t or various STL
235  containers.
236 
237  Returns the ForwardIterator advanced once past the last decoded
238  part.
239 
240  Throws: a std::out_of_range is thrown if the encoded part sizes
241  lead to exceeding the message data bounds.
242 
243  The decoding assumes the message is encoded in the manner
244  performed by zmq::encode().
245  */
246 template<class OutputIt> OutputIt decode(const message_t &encoded, OutputIt out)
247 {
248  const unsigned char *source = encoded.data<unsigned char>();
249  const unsigned char *const limit = source + encoded.size();
250 
251  while (source < limit) {
252  size_t part_size = *source++;
253  if (part_size == std::numeric_limits<std::uint8_t>::max()) {
254  if (source > limit - 4) {
255  throw std::out_of_range(
256  "Malformed encoding, overflow in reading size");
257  }
258  part_size = ((uint32_t) source[0] << 24) + ((uint32_t) source[1] << 16)
259  + ((uint32_t) source[2] << 8) + (uint32_t) source[3];
260  source += 4;
261  }
262 
263  if (source > limit - part_size) {
264  throw std::out_of_range("Malformed encoding, overflow in reading part");
265  }
266  *out = message_t(source, part_size);
267  ++out;
268  source += part_size;
269  }
270  return out;
271 }
272 
273 #endif
274 
275 
276 #ifdef ZMQ_HAS_RVALUE_REFS
277 
278 /*
279  This class handles multipart messaging. It is the C++ equivalent of zmsg.h,
280  which is part of CZMQ (the high-level C binding). Furthermore, it is a major
281  improvement compared to zmsg.hpp, which is part of the examples in the ØMQ
282  Guide. Unnecessary copying is avoided by using move semantics to efficiently
283  add/remove parts.
284 */
285 class multipart_t
286 {
287  private:
288  std::deque<message_t> m_parts;
289 
290  public:
291  typedef std::deque<message_t>::value_type value_type;
292 
293  typedef std::deque<message_t>::iterator iterator;
294  typedef std::deque<message_t>::const_iterator const_iterator;
295 
296  typedef std::deque<message_t>::reverse_iterator reverse_iterator;
297  typedef std::deque<message_t>::const_reverse_iterator const_reverse_iterator;
298 
299  // Default constructor
300  multipart_t() {}
301 
302  // Construct from socket receive
303  multipart_t(socket_t &socket) { recv(socket); }
304 
305  // Construct from memory block
306  multipart_t(const void *src, size_t size) { addmem(src, size); }
307 
308  // Construct from string
309  multipart_t(const std::string &string) { addstr(string); }
310 
311  // Construct from message part
312  multipart_t(message_t &&message) { add(std::move(message)); }
313 
314  // Move constructor
315  multipart_t(multipart_t &&other) { m_parts = std::move(other.m_parts); }
316 
317  // Move assignment operator
318  multipart_t &operator=(multipart_t &&other)
319  {
320  m_parts = std::move(other.m_parts);
321  return *this;
322  }
323 
324  // Destructor
325  virtual ~multipart_t() { clear(); }
326 
327  message_t &operator[](size_t n) { return m_parts[n]; }
328 
329  const message_t &operator[](size_t n) const { return m_parts[n]; }
330 
331  message_t &at(size_t n) { return m_parts.at(n); }
332 
333  const message_t &at(size_t n) const { return m_parts.at(n); }
334 
335  iterator begin() { return m_parts.begin(); }
336 
337  const_iterator begin() const { return m_parts.begin(); }
338 
339  const_iterator cbegin() const { return m_parts.cbegin(); }
340 
341  reverse_iterator rbegin() { return m_parts.rbegin(); }
342 
343  const_reverse_iterator rbegin() const { return m_parts.rbegin(); }
344 
345  iterator end() { return m_parts.end(); }
346 
347  const_iterator end() const { return m_parts.end(); }
348 
349  const_iterator cend() const { return m_parts.cend(); }
350 
351  reverse_iterator rend() { return m_parts.rend(); }
352 
353  const_reverse_iterator rend() const { return m_parts.rend(); }
354 
355  // Delete all parts
356  void clear() { m_parts.clear(); }
357 
358  // Get number of parts
359  size_t size() const { return m_parts.size(); }
360 
361  // Check if number of parts is zero
362  bool empty() const { return m_parts.empty(); }
363 
364  // Receive multipart message from socket
365  bool recv(socket_t &socket, int flags = 0)
366  {
367  clear();
368  bool more = true;
369  while (more) {
370  message_t message;
371 #ifdef ZMQ_CPP11
372  if (!socket.recv(message, static_cast<recv_flags>(flags)))
373  return false;
374 #else
375  if (!socket.recv(&message, flags))
376  return false;
377 #endif
378  more = message.more();
379  add(std::move(message));
380  }
381  return true;
382  }
383 
384  // Send multipart message to socket
385  bool send(socket_t &socket, int flags = 0)
386  {
387  flags &= ~(ZMQ_SNDMORE);
388  bool more = size() > 0;
389  while (more) {
390  message_t message = pop();
391  more = size() > 0;
392 #ifdef ZMQ_CPP11
393  if (!socket.send(message, static_cast<send_flags>(
394  (more ? ZMQ_SNDMORE : 0) | flags)))
395  return false;
396 #else
397  if (!socket.send(message, (more ? ZMQ_SNDMORE : 0) | flags))
398  return false;
399 #endif
400  }
401  clear();
402  return true;
403  }
404 
405  // Concatenate other multipart to front
406  void prepend(multipart_t &&other)
407  {
408  while (!other.empty())
409  push(other.remove());
410  }
411 
412  // Concatenate other multipart to back
413  void append(multipart_t &&other)
414  {
415  while (!other.empty())
416  add(other.pop());
417  }
418 
419  // Push memory block to front
420  void pushmem(const void *src, size_t size)
421  {
422  m_parts.push_front(message_t(src, size));
423  }
424 
425  // Push memory block to back
426  void addmem(const void *src, size_t size)
427  {
428  m_parts.push_back(message_t(src, size));
429  }
430 
431  // Push string to front
432  void pushstr(const std::string &string)
433  {
434  m_parts.push_front(message_t(string.data(), string.size()));
435  }
436 
437  // Push string to back
438  void addstr(const std::string &string)
439  {
440  m_parts.push_back(message_t(string.data(), string.size()));
441  }
442 
443  // Push type (fixed-size) to front
444  template<typename T> void pushtyp(const T &type)
445  {
446  static_assert(!std::is_same<T, std::string>::value,
447  "Use pushstr() instead of pushtyp<std::string>()");
448  m_parts.push_front(message_t(&type, sizeof(type)));
449  }
450 
451  // Push type (fixed-size) to back
452  template<typename T> void addtyp(const T &type)
453  {
454  static_assert(!std::is_same<T, std::string>::value,
455  "Use addstr() instead of addtyp<std::string>()");
456  m_parts.push_back(message_t(&type, sizeof(type)));
457  }
458 
459  // Push message part to front
460  void push(message_t &&message) { m_parts.push_front(std::move(message)); }
461 
462  // Push message part to back
463  void add(message_t &&message) { m_parts.push_back(std::move(message)); }
464 
465  // Alias to allow std::back_inserter()
466  void push_back(message_t &&message) { m_parts.push_back(std::move(message)); }
467 
468  // Pop string from front
469  std::string popstr()
470  {
471  std::string string(m_parts.front().data<char>(), m_parts.front().size());
472  m_parts.pop_front();
473  return string;
474  }
475 
476  // Pop type (fixed-size) from front
477  template<typename T> T poptyp()
478  {
479  static_assert(!std::is_same<T, std::string>::value,
480  "Use popstr() instead of poptyp<std::string>()");
481  if (sizeof(T) != m_parts.front().size())
482  throw std::runtime_error(
483  "Invalid type, size does not match the message size");
484  T type = *m_parts.front().data<T>();
485  m_parts.pop_front();
486  return type;
487  }
488 
489  // Pop message part from front
490  message_t pop()
491  {
492  message_t message = std::move(m_parts.front());
493  m_parts.pop_front();
494  return message;
495  }
496 
497  // Pop message part from back
498  message_t remove()
499  {
500  message_t message = std::move(m_parts.back());
501  m_parts.pop_back();
502  return message;
503  }
504 
505  // get message part from front
506  const message_t &front() { return m_parts.front(); }
507 
508  // get message part from back
509  const message_t &back() { return m_parts.back(); }
510 
511  // Get pointer to a specific message part
512  const message_t *peek(size_t index) const { return &m_parts[index]; }
513 
514  // Get a string copy of a specific message part
515  std::string peekstr(size_t index) const
516  {
517  std::string string(m_parts[index].data<char>(), m_parts[index].size());
518  return string;
519  }
520 
521  // Peek type (fixed-size) from front
522  template<typename T> T peektyp(size_t index) const
523  {
524  static_assert(!std::is_same<T, std::string>::value,
525  "Use peekstr() instead of peektyp<std::string>()");
526  if (sizeof(T) != m_parts[index].size())
527  throw std::runtime_error(
528  "Invalid type, size does not match the message size");
529  T type = *m_parts[index].data<T>();
530  return type;
531  }
532 
533  // Create multipart from type (fixed-size)
534  template<typename T> static multipart_t create(const T &type)
535  {
536  multipart_t multipart;
537  multipart.addtyp(type);
538  return multipart;
539  }
540 
541  // Copy multipart
542  multipart_t clone() const
543  {
544  multipart_t multipart;
545  for (size_t i = 0; i < size(); i++)
546  multipart.addmem(m_parts[i].data(), m_parts[i].size());
547  return multipart;
548  }
549 
550  // Dump content to string
551  std::string str() const
552  {
553  std::stringstream ss;
554  for (size_t i = 0; i < m_parts.size(); i++) {
555  const unsigned char *data = m_parts[i].data<unsigned char>();
556  size_t size = m_parts[i].size();
557 
558  // Dump the message as text or binary
559  bool isText = true;
560  for (size_t j = 0; j < size; j++) {
561  if (data[j] < 32 || data[j] > 127) {
562  isText = false;
563  break;
564  }
565  }
566  ss << "\n[" << std::dec << std::setw(3) << std::setfill('0') << size
567  << "] ";
568  if (size >= 1000) {
569  ss << "... (to big to print)";
570  continue;
571  }
572  for (size_t j = 0; j < size; j++) {
573  if (isText)
574  ss << static_cast<char>(data[j]);
575  else
576  ss << std::hex << std::setw(2) << std::setfill('0')
577  << static_cast<short>(data[j]);
578  }
579  }
580  return ss.str();
581  }
582 
583  // Check if equal to other multipart
584  bool equal(const multipart_t *other) const
585  {
586  if (size() != other->size())
587  return false;
588  for (size_t i = 0; i < size(); i++)
589  if (*peek(i) != *other->peek(i))
590  return false;
591  return true;
592  }
593 
594 #ifdef ZMQ_CPP11
595 
596  // Return single part message_t encoded from this multipart_t.
597  message_t encode() const { return zmq::encode(*this); }
598 
599  // Decode encoded message into multiple parts and append to self.
600  void decode_append(const message_t &encoded)
601  {
602  zmq::decode(encoded, std::back_inserter(*this));
603  }
604 
605  // Return a new multipart_t containing the decoded message_t.
606  static multipart_t decode(const message_t &encoded)
607  {
608  multipart_t tmp;
609  zmq::decode(encoded, std::back_inserter(tmp));
610  return tmp;
611  }
612 
613 #endif
614 
615  private:
616  // Disable implicit copying (moving is more efficient)
617  multipart_t(const multipart_t &other) ZMQ_DELETED_FUNCTION;
618  void operator=(const multipart_t &other) ZMQ_DELETED_FUNCTION;
619 }; // class multipart_t
620 
621 inline std::ostream &operator<<(std::ostream &os, const multipart_t &msg)
622 {
623  return os << msg.str();
624 }
625 
626 #endif // ZMQ_HAS_RVALUE_REFS
627 
628 #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
629 class active_poller_t
630 {
631  public:
632  active_poller_t() = default;
633  ~active_poller_t() = default;
634 
635  active_poller_t(const active_poller_t &) = delete;
636  active_poller_t &operator=(const active_poller_t &) = delete;
637 
638  active_poller_t(active_poller_t &&src) = default;
639  active_poller_t &operator=(active_poller_t &&src) = default;
640 
641  using handler_type = std::function<void(event_flags)>;
642 
643  void add(zmq::socket_ref socket, event_flags events, handler_type handler)
644  {
645  auto it = decltype(handlers)::iterator{};
646  auto inserted = bool{};
647  std::tie(it, inserted) = handlers.emplace(
648  socket, std::make_shared<handler_type>(std::move(handler)));
649  try {
650  base_poller.add(socket, events,
651  inserted && *(it->second) ? it->second.get() : nullptr);
652  need_rebuild |= inserted;
653  }
654  catch (const zmq::error_t &) {
655  // rollback
656  if (inserted) {
657  handlers.erase(socket);
658  }
659  throw;
660  }
661  }
662 
663  void remove(zmq::socket_ref socket)
664  {
665  base_poller.remove(socket);
666  handlers.erase(socket);
667  need_rebuild = true;
668  }
669 
670  void modify(zmq::socket_ref socket, event_flags events)
671  {
672  base_poller.modify(socket, events);
673  }
674 
675  size_t wait(std::chrono::milliseconds timeout)
676  {
677  if (need_rebuild) {
678  poller_events.resize(handlers.size());
679  poller_handlers.clear();
680  poller_handlers.reserve(handlers.size());
681  for (const auto &handler : handlers) {
682  poller_handlers.push_back(handler.second);
683  }
684  need_rebuild = false;
685  }
686  const auto count = base_poller.wait_all(poller_events, timeout);
687  std::for_each(poller_events.begin(),
688  poller_events.begin() + static_cast<ptrdiff_t>(count),
689  [](decltype(base_poller)::event_type &event) {
690  if (event.user_data != nullptr)
691  (*event.user_data)(event.events);
692  });
693  return count;
694  }
695 
696  ZMQ_NODISCARD bool empty() const noexcept { return handlers.empty(); }
697 
698  size_t size() const noexcept { return handlers.size(); }
699 
700  private:
701  bool need_rebuild{false};
702 
703  poller_t<handler_type> base_poller{};
704  std::unordered_map<socket_ref, std::shared_ptr<handler_type>> handlers{};
705  std::vector<decltype(base_poller)::event_type> poller_events{};
706  std::vector<std::shared_ptr<handler_type>> poller_handlers{};
707 }; // class active_poller_t
708 #endif // defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
709 
710 
711 } // namespace zmq
712 
713 #endif // __ZMQ_ADDON_HPP_INCLUDED__
#define ZMQ_DELETED_FUNCTION
Definition: zmq.hpp:146
const zio::message_t & at(const Message &msg, size_t index)
Definition: tens.hpp:58
#define ZMQ_NODISCARD
Definition: zmq.hpp:63
def handler(ctx, pipe, bot, rule_object, filename, broker_addr, rargs)
Definition: reader.py:85
std::ostream & operator<<(std::ostream &os, const message_t &msg)
Definition: zmq.hpp:2172
void append(Message &msg, message_t &&data, const std::vector< size_t > &shape, size_t word_size, const char *tn)
Definition: tens.cpp:34
size_t size() const ZMQ_NOTHROW
Definition: zmq.hpp:488
Definition: zmq.hpp:195