ZIO
Python and C++ interface to ZeroMQ and Zyre
zmq.hpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2016-2017 ZeroMQ community
3  Copyright (c) 2009-2011 250bpm s.r.o.
4  Copyright (c) 2011 Botond Ballo
5  Copyright (c) 2007-2009 iMatix Corporation
6 
7  Permission is hereby granted, free of charge, to any person obtaining a copy
8  of this software and associated documentation files (the "Software"), to
9  deal in the Software without restriction, including without limitation the
10  rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
11  sell copies of the Software, and to permit persons to whom the Software is
12  furnished to do so, subject to the following conditions:
13 
14  The above copyright notice and this permission notice shall be included in
15  all copies or substantial portions of the Software.
16 
17  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23  IN THE SOFTWARE.
24 */
25 
26 #ifndef __ZMQ_HPP_INCLUDED__
27 #define __ZMQ_HPP_INCLUDED__
28 
29 #ifdef _WIN32
30 #ifndef NOMINMAX
31 #define NOMINMAX
32 #endif
33 #endif
34 
35 // macros defined if has a specific standard or greater
36 #if (defined(__cplusplus) && __cplusplus >= 201103L) \
37  || (defined(_MSC_VER) && _MSC_VER >= 1900)
38 #define ZMQ_CPP11
39 #endif
40 #if (defined(__cplusplus) && __cplusplus >= 201402L) \
41  || (defined(_HAS_CXX14) && _HAS_CXX14 == 1) \
42  || (defined(_HAS_CXX17) \
43  && _HAS_CXX17 \
44  == 1) // _HAS_CXX14 might not be defined when using C++17 on MSVC
45 #define ZMQ_CPP14
46 #endif
47 #if (defined(__cplusplus) && __cplusplus >= 201703L) \
48  || (defined(_HAS_CXX17) && _HAS_CXX17 == 1)
49 #define ZMQ_CPP17
50 #endif
51 
52 #if defined(ZMQ_CPP14)
53 #define ZMQ_DEPRECATED(msg) [[deprecated(msg)]]
54 #elif defined(_MSC_VER)
55 #define ZMQ_DEPRECATED(msg) __declspec(deprecated(msg))
56 #elif defined(__GNUC__)
57 #define ZMQ_DEPRECATED(msg) __attribute__((deprecated(msg)))
58 #endif
59 
60 #if defined(ZMQ_CPP17)
61 #define ZMQ_NODISCARD [[nodiscard]]
62 #else
63 #define ZMQ_NODISCARD
64 #endif
65 
66 #if defined(ZMQ_CPP11)
67 #define ZMQ_NOTHROW noexcept
68 #define ZMQ_EXPLICIT explicit
69 #define ZMQ_OVERRIDE override
70 #define ZMQ_NULLPTR nullptr
71 #define ZMQ_CONSTEXPR_FN constexpr
72 #define ZMQ_CONSTEXPR_VAR constexpr
73 #define ZMQ_CPP11_DEPRECATED(msg) ZMQ_DEPRECATED(msg)
74 #else
75 #define ZMQ_NOTHROW throw()
76 #define ZMQ_EXPLICIT
77 #define ZMQ_OVERRIDE
78 #define ZMQ_NULLPTR 0
79 #define ZMQ_CONSTEXPR_FN
80 #define ZMQ_CONSTEXPR_VAR const
81 #define ZMQ_CPP11_DEPRECATED(msg)
82 #endif
83 
84 #include <zmq.h>
85 
86 #include <cassert>
87 #include <cstring>
88 
89 #include <algorithm>
90 #include <exception>
91 #include <iomanip>
92 #include <sstream>
93 #include <string>
94 #include <vector>
95 #ifdef ZMQ_CPP11
96 #include <array>
97 #include <chrono>
98 #include <tuple>
99 #include <memory>
100 #endif
101 #ifdef ZMQ_CPP17
102 #ifdef __has_include
103 #if __has_include(<optional>)
104 #include <optional>
105 #define ZMQ_HAS_OPTIONAL 1
106 #endif
107 #if __has_include(<string_view>)
108 #include <string_view>
109 #define ZMQ_HAS_STRING_VIEW 1
110 #endif
111 #endif
112 
113 #endif
114 
115 /* Version macros for compile-time API version detection */
116 #define CPPZMQ_VERSION_MAJOR 4
117 #define CPPZMQ_VERSION_MINOR 7
118 #define CPPZMQ_VERSION_PATCH 0
119 
120 #define CPPZMQ_VERSION \
121  ZMQ_MAKE_VERSION(CPPZMQ_VERSION_MAJOR, CPPZMQ_VERSION_MINOR, \
122  CPPZMQ_VERSION_PATCH)
123 
124 // Detect whether the compiler supports C++11 rvalue references.
125 #if (defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 2)) \
126  && defined(__GXX_EXPERIMENTAL_CXX0X__))
127 #define ZMQ_HAS_RVALUE_REFS
128 #define ZMQ_DELETED_FUNCTION = delete
129 #elif defined(__clang__)
130 #if __has_feature(cxx_rvalue_references)
131 #define ZMQ_HAS_RVALUE_REFS
132 #endif
133 
134 #if __has_feature(cxx_deleted_functions)
135 #define ZMQ_DELETED_FUNCTION = delete
136 #else
137 #define ZMQ_DELETED_FUNCTION
138 #endif
139 #elif defined(_MSC_VER) && (_MSC_VER >= 1900)
140 #define ZMQ_HAS_RVALUE_REFS
141 #define ZMQ_DELETED_FUNCTION = delete
142 #elif defined(_MSC_VER) && (_MSC_VER >= 1600)
143 #define ZMQ_HAS_RVALUE_REFS
144 #define ZMQ_DELETED_FUNCTION
145 #else
146 #define ZMQ_DELETED_FUNCTION
147 #endif
148 
149 #if defined(ZMQ_CPP11) && !defined(__llvm__) && !defined(__INTEL_COMPILER) \
150  && defined(__GNUC__) && __GNUC__ < 5
151 #define ZMQ_CPP11_PARTIAL
152 #elif defined(__GLIBCXX__) && __GLIBCXX__ < 20160805
153 //the date here is the last date of gcc 4.9.4, which
154 // effectively means libstdc++ from gcc 5.5 and higher won't trigger this branch
155 #define ZMQ_CPP11_PARTIAL
156 #endif
157 
158 #ifdef ZMQ_CPP11
159 #ifdef ZMQ_CPP11_PARTIAL
160 #define ZMQ_IS_TRIVIALLY_COPYABLE(T) __has_trivial_copy(T)
161 #else
162 #include <type_traits>
163 #define ZMQ_IS_TRIVIALLY_COPYABLE(T) std::is_trivially_copyable<T>::value
164 #endif
165 #endif
166 
167 #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(3, 3, 0)
168 #define ZMQ_NEW_MONITOR_EVENT_LAYOUT
169 #endif
170 
171 #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 1, 0)
172 #define ZMQ_HAS_PROXY_STEERABLE
173 /* Socket event data */
174 typedef struct
175 {
176  uint16_t event; // id of the event as bitfield
177  int32_t value; // value is either error code, fd or reconnect interval
178 } zmq_event_t;
179 #endif
180 
181 // Avoid using deprecated message receive function when possible
182 #if ZMQ_VERSION < ZMQ_MAKE_VERSION(3, 2, 0)
183 #define zmq_msg_recv(msg, socket, flags) zmq_recvmsg(socket, msg, flags)
184 #endif
185 
186 
187 // In order to prevent unused variable warnings when building in non-debug
188 // mode use this macro to make assertions.
189 #ifndef NDEBUG
190 #define ZMQ_ASSERT(expression) assert(expression)
191 #else
192 #define ZMQ_ASSERT(expression) (void) (expression)
193 #endif
194 
195 namespace zmq
196 {
197 #ifdef ZMQ_CPP11
198 namespace detail
199 {
200 namespace ranges
201 {
202 using std::begin;
203 using std::end;
204 template<class T> auto begin(T &&r) -> decltype(begin(std::forward<T>(r)))
205 {
206  return begin(std::forward<T>(r));
207 }
208 template<class T> auto end(T &&r) -> decltype(end(std::forward<T>(r)))
209 {
210  return end(std::forward<T>(r));
211 }
212 } // namespace ranges
213 
214 template<class T> using void_t = void;
215 
216 template<class Iter>
217 using iter_value_t = typename std::iterator_traits<Iter>::value_type;
218 
219 template<class Range>
220 using range_iter_t = decltype(
221  ranges::begin(std::declval<typename std::remove_reference<Range>::type &>()));
222 
223 template<class Range> using range_value_t = iter_value_t<range_iter_t<Range>>;
224 
225 template<class T, class = void> struct is_range : std::false_type
226 {
227 };
228 
229 template<class T>
230 struct is_range<
231  T,
232  void_t<decltype(
233  ranges::begin(std::declval<typename std::remove_reference<T>::type &>())
234  == ranges::end(std::declval<typename std::remove_reference<T>::type &>()))>>
235  : std::true_type
236 {
237 };
238 
239 } // namespace detail
240 #endif
241 
242 typedef zmq_free_fn free_fn;
243 typedef zmq_pollitem_t pollitem_t;
244 
245 class error_t : public std::exception
246 {
247  public:
248  error_t() : errnum(zmq_errno()) {}
249  virtual const char *what() const ZMQ_NOTHROW ZMQ_OVERRIDE
250  {
251  return zmq_strerror(errnum);
252  }
253  int num() const { return errnum; }
254 
255  private:
256  int errnum;
257 };
258 
259 inline int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_ = -1)
260 {
261  int rc = zmq_poll(items_, static_cast<int>(nitems_), timeout_);
262  if (rc < 0)
263  throw error_t();
264  return rc;
265 }
266 
267 ZMQ_DEPRECATED("from 4.3.1, use poll taking non-const items")
268 inline int poll(zmq_pollitem_t const *items_, size_t nitems_, long timeout_ = -1)
269 {
270  return poll(const_cast<zmq_pollitem_t *>(items_), nitems_, timeout_);
271 }
272 
273 #ifdef ZMQ_CPP11
274 ZMQ_DEPRECATED("from 4.3.1, use poll taking non-const items")
275 inline int
276 poll(zmq_pollitem_t const *items, size_t nitems, std::chrono::milliseconds timeout)
277 {
278  return poll(const_cast<zmq_pollitem_t *>(items), nitems,
279  static_cast<long>(timeout.count()));
280 }
281 
282 ZMQ_DEPRECATED("from 4.3.1, use poll taking non-const items")
283 inline int poll(std::vector<zmq_pollitem_t> const &items,
284  std::chrono::milliseconds timeout)
285 {
286  return poll(const_cast<zmq_pollitem_t *>(items.data()), items.size(),
287  static_cast<long>(timeout.count()));
288 }
289 
290 ZMQ_DEPRECATED("from 4.3.1, use poll taking non-const items")
291 inline int poll(std::vector<zmq_pollitem_t> const &items, long timeout_ = -1)
292 {
293  return poll(const_cast<zmq_pollitem_t *>(items.data()), items.size(), timeout_);
294 }
295 
296 inline int
297 poll(zmq_pollitem_t *items, size_t nitems, std::chrono::milliseconds timeout)
298 {
299  return poll(items, nitems, static_cast<long>(timeout.count()));
300 }
301 
302 inline int poll(std::vector<zmq_pollitem_t> &items,
303  std::chrono::milliseconds timeout)
304 {
305  return poll(items.data(), items.size(), static_cast<long>(timeout.count()));
306 }
307 
308 inline int poll(std::vector<zmq_pollitem_t> &items, long timeout_ = -1)
309 {
310  return poll(items.data(), items.size(), timeout_);
311 }
312 #endif
313 
314 
315 inline void version(int *major_, int *minor_, int *patch_)
316 {
317  zmq_version(major_, minor_, patch_);
318 }
319 
320 #ifdef ZMQ_CPP11
321 inline std::tuple<int, int, int> version()
322 {
323  std::tuple<int, int, int> v;
324  zmq_version(&std::get<0>(v), &std::get<1>(v), &std::get<2>(v));
325  return v;
326 }
327 #endif
328 
330 {
331  public:
333  {
334  int rc = zmq_msg_init(&msg);
335  ZMQ_ASSERT(rc == 0);
336  }
337 
338  explicit message_t(size_t size_)
339  {
340  int rc = zmq_msg_init_size(&msg, size_);
341  if (rc != 0)
342  throw error_t();
343  }
344 
345  template<class ForwardIter> message_t(ForwardIter first, ForwardIter last)
346  {
347  typedef typename std::iterator_traits<ForwardIter>::value_type value_t;
348 
349  assert(std::distance(first, last) >= 0);
350  size_t const size_ =
351  static_cast<size_t>(std::distance(first, last)) * sizeof(value_t);
352  int const rc = zmq_msg_init_size(&msg, size_);
353  if (rc != 0)
354  throw error_t();
355  std::copy(first, last, data<value_t>());
356  }
357 
358  message_t(const void *data_, size_t size_)
359  {
360  int rc = zmq_msg_init_size(&msg, size_);
361  if (rc != 0)
362  throw error_t();
363  memcpy(data(), data_, size_);
364  }
365 
366  message_t(void *data_, size_t size_, free_fn *ffn_, void *hint_ = ZMQ_NULLPTR)
367  {
368  int rc = zmq_msg_init_data(&msg, data_, size_, ffn_, hint_);
369  if (rc != 0)
370  throw error_t();
371  }
372 
373 #if defined(ZMQ_CPP11) && !defined(ZMQ_CPP11_PARTIAL)
374  template<class Range,
375  typename = typename std::enable_if<
376  detail::is_range<Range>::value
377  && ZMQ_IS_TRIVIALLY_COPYABLE(detail::range_value_t<Range>)
378  && !std::is_same<Range, message_t>::value>::type>
379  explicit message_t(const Range &rng) :
380  message_t(detail::ranges::begin(rng), detail::ranges::end(rng))
381  {
382  }
383 #endif
384 
385 #ifdef ZMQ_HAS_RVALUE_REFS
386  message_t(message_t &&rhs) ZMQ_NOTHROW : msg(rhs.msg)
387  {
388  int rc = zmq_msg_init(&rhs.msg);
389  ZMQ_ASSERT(rc == 0);
390  }
391 
392  message_t &operator=(message_t &&rhs) ZMQ_NOTHROW
393  {
394  std::swap(msg, rhs.msg);
395  return *this;
396  }
397 #endif
398 
400  {
401  int rc = zmq_msg_close(&msg);
402  ZMQ_ASSERT(rc == 0);
403  }
404 
405  void rebuild()
406  {
407  int rc = zmq_msg_close(&msg);
408  if (rc != 0)
409  throw error_t();
410  rc = zmq_msg_init(&msg);
411  ZMQ_ASSERT(rc == 0);
412  }
413 
414  void rebuild(size_t size_)
415  {
416  int rc = zmq_msg_close(&msg);
417  if (rc != 0)
418  throw error_t();
419  rc = zmq_msg_init_size(&msg, size_);
420  if (rc != 0)
421  throw error_t();
422  }
423 
424  void rebuild(const void *data_, size_t size_)
425  {
426  int rc = zmq_msg_close(&msg);
427  if (rc != 0)
428  throw error_t();
429  rc = zmq_msg_init_size(&msg, size_);
430  if (rc != 0)
431  throw error_t();
432  memcpy(data(), data_, size_);
433  }
434 
435  void rebuild(void *data_, size_t size_, free_fn *ffn_, void *hint_ = ZMQ_NULLPTR)
436  {
437  int rc = zmq_msg_close(&msg);
438  if (rc != 0)
439  throw error_t();
440  rc = zmq_msg_init_data(&msg, data_, size_, ffn_, hint_);
441  if (rc != 0)
442  throw error_t();
443  }
444 
445  ZMQ_DEPRECATED("from 4.3.1, use move taking non-const reference instead")
446  void move(message_t const *msg_)
447  {
448  int rc = zmq_msg_move(&msg, const_cast<zmq_msg_t *>(msg_->handle()));
449  if (rc != 0)
450  throw error_t();
451  }
452 
453  void move(message_t &msg_)
454  {
455  int rc = zmq_msg_move(&msg, msg_.handle());
456  if (rc != 0)
457  throw error_t();
458  }
459 
460  ZMQ_DEPRECATED("from 4.3.1, use copy taking non-const reference instead")
461  void copy(message_t const *msg_)
462  {
463  int rc = zmq_msg_copy(&msg, const_cast<zmq_msg_t *>(msg_->handle()));
464  if (rc != 0)
465  throw error_t();
466  }
467 
468  void copy(message_t &msg_)
469  {
470  int rc = zmq_msg_copy(&msg, msg_.handle());
471  if (rc != 0)
472  throw error_t();
473  }
474 
475  bool more() const ZMQ_NOTHROW
476  {
477  int rc = zmq_msg_more(const_cast<zmq_msg_t *>(&msg));
478  return rc != 0;
479  }
480 
481  void *data() ZMQ_NOTHROW { return zmq_msg_data(&msg); }
482 
483  const void *data() const ZMQ_NOTHROW
484  {
485  return zmq_msg_data(const_cast<zmq_msg_t *>(&msg));
486  }
487 
488  size_t size() const ZMQ_NOTHROW
489  {
490  return zmq_msg_size(const_cast<zmq_msg_t *>(&msg));
491  }
492 
493  ZMQ_NODISCARD bool empty() const ZMQ_NOTHROW { return size() == 0u; }
494 
495  template<typename T> T *data() ZMQ_NOTHROW { return static_cast<T *>(data()); }
496 
497  template<typename T> T const *data() const ZMQ_NOTHROW
498  {
499  return static_cast<T const *>(data());
500  }
501 
502  ZMQ_DEPRECATED("from 4.3.0, use operator== instead")
503  bool equal(const message_t *other) const ZMQ_NOTHROW { return *this == *other; }
504 
505  bool operator==(const message_t &other) const ZMQ_NOTHROW
506  {
507  const size_t my_size = size();
508  return my_size == other.size() && 0 == memcmp(data(), other.data(), my_size);
509  }
510 
511  bool operator!=(const message_t &other) const ZMQ_NOTHROW
512  {
513  return !(*this == other);
514  }
515 
516 #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(3, 2, 0)
517  int get(int property_)
518  {
519  int value = zmq_msg_get(&msg, property_);
520  if (value == -1)
521  throw error_t();
522  return value;
523  }
524 #endif
525 
526 #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 1, 0)
527  const char *gets(const char *property_)
528  {
529  const char *value = zmq_msg_gets(&msg, property_);
530  if (value == ZMQ_NULLPTR)
531  throw error_t();
532  return value;
533  }
534 #endif
535 
536 #if defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 0)
537  uint32_t routing_id() const
538  {
539  return zmq_msg_routing_id(const_cast<zmq_msg_t *>(&msg));
540  }
541 
542  void set_routing_id(uint32_t routing_id)
543  {
544  int rc = zmq_msg_set_routing_id(&msg, routing_id);
545  if (rc != 0)
546  throw error_t();
547  }
548 
549  const char *group() const
550  {
551  return zmq_msg_group(const_cast<zmq_msg_t *>(&msg));
552  }
553 
554  void set_group(const char *group)
555  {
556  int rc = zmq_msg_set_group(&msg, group);
557  if (rc != 0)
558  throw error_t();
559  }
560 #endif
561 
562  // interpret message content as a string
563  std::string to_string() const
564  {
565  return std::string(static_cast<const char *>(data()), size());
566  }
567 #ifdef ZMQ_CPP17
568  // interpret message content as a string
569  std::string_view to_string_view() const noexcept
570  {
571  return std::string_view(static_cast<const char *>(data()), size());
572  }
573 #endif
574 
581  std::string str() const
582  {
583  // Partly mutuated from the same method in zmq::multipart_t
584  std::stringstream os;
585 
586  const unsigned char *msg_data = this->data<unsigned char>();
587  unsigned char byte;
588  size_t size = this->size();
589  int is_ascii[2] = {0, 0};
590 
591  os << "zmq::message_t [size " << std::dec << std::setw(3)
592  << std::setfill('0') << size << "] (";
593  // Totally arbitrary
594  if (size >= 1000) {
595  os << "... too big to print)";
596  } else {
597  while (size--) {
598  byte = *msg_data++;
599 
600  is_ascii[1] = (byte >= 32 && byte < 127);
601  if (is_ascii[1] != is_ascii[0])
602  os << " "; // Separate text/non text
603 
604  if (is_ascii[1]) {
605  os << byte;
606  } else {
607  os << std::hex << std::uppercase << std::setw(2)
608  << std::setfill('0') << static_cast<short>(byte);
609  }
610  is_ascii[0] = is_ascii[1];
611  }
612  os << ")";
613  }
614  return os.str();
615  }
616 
618  {
619  // this assumes zmq::msg_t from libzmq is trivially relocatable
620  std::swap(msg, other.msg);
621  }
622 
623  ZMQ_NODISCARD zmq_msg_t *handle() ZMQ_NOTHROW { return &msg; }
624  ZMQ_NODISCARD const zmq_msg_t *handle() const ZMQ_NOTHROW { return &msg; }
625 
626  private:
627  // The underlying message
628  zmq_msg_t msg;
629 
630  // Disable implicit message copying, so that users won't use shared
631  // messages (less efficient) without being aware of the fact.
633  void operator=(const message_t &) ZMQ_DELETED_FUNCTION;
634 };
635 
636 inline void swap(message_t &a, message_t &b) ZMQ_NOTHROW
637 {
638  a.swap(b);
639 }
640 
641 #ifdef ZMQ_CPP11
642 enum class ctxopt
643 {
644 #ifdef ZMQ_BLOCKY
645  blocky = ZMQ_BLOCKY,
646 #endif
647 #ifdef ZMQ_IO_THREADS
648  io_threads = ZMQ_IO_THREADS,
649 #endif
650 #ifdef ZMQ_THREAD_SCHED_POLICY
651  thread_sched_policy = ZMQ_THREAD_SCHED_POLICY,
652 #endif
653 #ifdef ZMQ_THREAD_PRIORITY
654  thread_priority = ZMQ_THREAD_PRIORITY,
655 #endif
656 #ifdef ZMQ_THREAD_AFFINITY_CPU_ADD
657  thread_affinity_cpu_add = ZMQ_THREAD_AFFINITY_CPU_ADD,
658 #endif
659 #ifdef ZMQ_THREAD_AFFINITY_CPU_REMOVE
660  thread_affinity_cpu_remove = ZMQ_THREAD_AFFINITY_CPU_REMOVE,
661 #endif
662 #ifdef ZMQ_THREAD_NAME_PREFIX
663  thread_name_prefix = ZMQ_THREAD_NAME_PREFIX,
664 #endif
665 #ifdef ZMQ_MAX_MSGSZ
666  max_msgsz = ZMQ_MAX_MSGSZ,
667 #endif
668 #ifdef ZMQ_ZERO_COPY_RECV
669  zero_copy_recv = ZMQ_ZERO_COPY_RECV,
670 #endif
671 #ifdef ZMQ_MAX_SOCKETS
672  max_sockets = ZMQ_MAX_SOCKETS,
673 #endif
674 #ifdef ZMQ_SOCKET_LIMIT
675  socket_limit = ZMQ_SOCKET_LIMIT,
676 #endif
677 #ifdef ZMQ_IPV6
678  ipv6 = ZMQ_IPV6,
679 #endif
680 #ifdef ZMQ_MSG_T_SIZE
681  msg_t_size = ZMQ_MSG_T_SIZE
682 #endif
683 };
684 #endif
685 
687 {
688  public:
690  {
691  ptr = zmq_ctx_new();
692  if (ptr == ZMQ_NULLPTR)
693  throw error_t();
694  }
695 
696 
697  explicit context_t(int io_threads_, int max_sockets_ = ZMQ_MAX_SOCKETS_DFLT)
698  {
699  ptr = zmq_ctx_new();
700  if (ptr == ZMQ_NULLPTR)
701  throw error_t();
702 
703  int rc = zmq_ctx_set(ptr, ZMQ_IO_THREADS, io_threads_);
704  ZMQ_ASSERT(rc == 0);
705 
706  rc = zmq_ctx_set(ptr, ZMQ_MAX_SOCKETS, max_sockets_);
707  ZMQ_ASSERT(rc == 0);
708  }
709 
710 #ifdef ZMQ_HAS_RVALUE_REFS
711  context_t(context_t &&rhs) ZMQ_NOTHROW : ptr(rhs.ptr) { rhs.ptr = ZMQ_NULLPTR; }
712  context_t &operator=(context_t &&rhs) ZMQ_NOTHROW
713  {
714  close();
715  std::swap(ptr, rhs.ptr);
716  return *this;
717  }
718 #endif
719 
720  ~context_t() ZMQ_NOTHROW { close(); }
721 
722  ZMQ_CPP11_DEPRECATED("from 4.7.0, use set taking zmq::ctxopt instead")
723  int setctxopt(int option_, int optval_)
724  {
725  int rc = zmq_ctx_set(ptr, option_, optval_);
726  ZMQ_ASSERT(rc == 0);
727  return rc;
728  }
729 
730  ZMQ_CPP11_DEPRECATED("from 4.7.0, use get taking zmq::ctxopt instead")
731  int getctxopt(int option_) { return zmq_ctx_get(ptr, option_); }
732 
733 #ifdef ZMQ_CPP11
734  void set(ctxopt option, int optval)
735  {
736  int rc = zmq_ctx_set(ptr, static_cast<int>(option), optval);
737  if (rc == -1)
738  throw error_t();
739  }
740 
741  ZMQ_NODISCARD int get(ctxopt option)
742  {
743  int rc = zmq_ctx_get(ptr, static_cast<int>(option));
744  // some options have a default value of -1
745  // which is unfortunate, and may result in errors
746  // that don't make sense
747  if (rc == -1)
748  throw error_t();
749  return rc;
750  }
751 #endif
752 
753  // Terminates context (see also shutdown()).
755  {
756  if (ptr == ZMQ_NULLPTR)
757  return;
758 
759  int rc;
760  do {
761  rc = zmq_ctx_destroy(ptr);
762  } while (rc == -1 && errno == EINTR);
763 
764  ZMQ_ASSERT(rc == 0);
765  ptr = ZMQ_NULLPTR;
766  }
767 
768  // Shutdown context in preparation for termination (close()).
769  // Causes all blocking socket operations and any further
770  // socket operations to return with ETERM.
772  {
773  if (ptr == ZMQ_NULLPTR)
774  return;
775  int rc = zmq_ctx_shutdown(ptr);
776  ZMQ_ASSERT(rc == 0);
777  }
778 
779  // Be careful with this, it's probably only useful for
780  // using the C api together with an existing C++ api.
781  // Normally you should never need to use this.
782  ZMQ_EXPLICIT operator void *() ZMQ_NOTHROW { return ptr; }
783 
784  ZMQ_EXPLICIT operator void const *() const ZMQ_NOTHROW { return ptr; }
785 
786  operator bool() const ZMQ_NOTHROW { return ptr != ZMQ_NULLPTR; }
787 
788  void swap(context_t &other) ZMQ_NOTHROW { std::swap(ptr, other.ptr); }
789 
790  private:
791  void *ptr;
792 
794  void operator=(const context_t &) ZMQ_DELETED_FUNCTION;
795 };
796 
797 inline void swap(context_t &a, context_t &b) ZMQ_NOTHROW
798 {
799  a.swap(b);
800 }
801 
802 #ifdef ZMQ_CPP11
803 
804 struct recv_buffer_size
805 {
806  size_t size; // number of bytes written to buffer
807  size_t untruncated_size; // untruncated message size in bytes
808 
809  ZMQ_NODISCARD bool truncated() const noexcept
810  {
811  return size != untruncated_size;
812  }
813 };
814 
815 #if defined(ZMQ_HAS_OPTIONAL) && (ZMQ_HAS_OPTIONAL > 0)
816 
817 using send_result_t = std::optional<size_t>;
818 using recv_result_t = std::optional<size_t>;
819 using recv_buffer_result_t = std::optional<recv_buffer_size>;
820 
821 #else
822 
823 namespace detail
824 {
825 // A C++11 type emulating the most basic
826 // operations of std::optional for trivial types
827 template<class T> class trivial_optional
828 {
829  public:
830  static_assert(std::is_trivial<T>::value, "T must be trivial");
831  using value_type = T;
832 
833  trivial_optional() = default;
834  trivial_optional(T value) noexcept : _value(value), _has_value(true) {}
835 
836  const T *operator->() const noexcept
837  {
838  assert(_has_value);
839  return &_value;
840  }
841  T *operator->() noexcept
842  {
843  assert(_has_value);
844  return &_value;
845  }
846 
847  const T &operator*() const noexcept
848  {
849  assert(_has_value);
850  return _value;
851  }
852  T &operator*() noexcept
853  {
854  assert(_has_value);
855  return _value;
856  }
857 
858  T &value()
859  {
860  if (!_has_value)
861  throw std::exception();
862  return _value;
863  }
864  const T &value() const
865  {
866  if (!_has_value)
867  throw std::exception();
868  return _value;
869  }
870 
871  explicit operator bool() const noexcept { return _has_value; }
872  bool has_value() const noexcept { return _has_value; }
873 
874  private:
875  T _value{};
876  bool _has_value{false};
877 };
878 } // namespace detail
879 
880 using send_result_t = detail::trivial_optional<size_t>;
881 using recv_result_t = detail::trivial_optional<size_t>;
882 using recv_buffer_result_t = detail::trivial_optional<recv_buffer_size>;
883 
884 #endif
885 
886 namespace detail
887 {
888 template<class T> constexpr T enum_bit_or(T a, T b) noexcept
889 {
890  static_assert(std::is_enum<T>::value, "must be enum");
891  using U = typename std::underlying_type<T>::type;
892  return static_cast<T>(static_cast<U>(a) | static_cast<U>(b));
893 }
894 template<class T> constexpr T enum_bit_and(T a, T b) noexcept
895 {
896  static_assert(std::is_enum<T>::value, "must be enum");
897  using U = typename std::underlying_type<T>::type;
898  return static_cast<T>(static_cast<U>(a) & static_cast<U>(b));
899 }
900 template<class T> constexpr T enum_bit_xor(T a, T b) noexcept
901 {
902  static_assert(std::is_enum<T>::value, "must be enum");
903  using U = typename std::underlying_type<T>::type;
904  return static_cast<T>(static_cast<U>(a) ^ static_cast<U>(b));
905 }
906 template<class T> constexpr T enum_bit_not(T a) noexcept
907 {
908  static_assert(std::is_enum<T>::value, "must be enum");
909  using U = typename std::underlying_type<T>::type;
910  return static_cast<T>(~static_cast<U>(a));
911 }
912 } // namespace detail
913 
914 // partially satisfies named requirement BitmaskType
915 enum class send_flags : int
916 {
917  none = 0,
918  dontwait = ZMQ_DONTWAIT,
919  sndmore = ZMQ_SNDMORE
920 };
921 
922 constexpr send_flags operator|(send_flags a, send_flags b) noexcept
923 {
924  return detail::enum_bit_or(a, b);
925 }
926 constexpr send_flags operator&(send_flags a, send_flags b) noexcept
927 {
928  return detail::enum_bit_and(a, b);
929 }
930 constexpr send_flags operator^(send_flags a, send_flags b) noexcept
931 {
932  return detail::enum_bit_xor(a, b);
933 }
934 constexpr send_flags operator~(send_flags a) noexcept
935 {
936  return detail::enum_bit_not(a);
937 }
938 
939 // partially satisfies named requirement BitmaskType
940 enum class recv_flags : int
941 {
942  none = 0,
943  dontwait = ZMQ_DONTWAIT
944 };
945 
946 constexpr recv_flags operator|(recv_flags a, recv_flags b) noexcept
947 {
948  return detail::enum_bit_or(a, b);
949 }
950 constexpr recv_flags operator&(recv_flags a, recv_flags b) noexcept
951 {
952  return detail::enum_bit_and(a, b);
953 }
954 constexpr recv_flags operator^(recv_flags a, recv_flags b) noexcept
955 {
956  return detail::enum_bit_xor(a, b);
957 }
958 constexpr recv_flags operator~(recv_flags a) noexcept
959 {
960  return detail::enum_bit_not(a);
961 }
962 
963 
964 // mutable_buffer, const_buffer and buffer are based on
965 // the Networking TS specification, draft:
966 // http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/n4771.pdf
967 
968 class mutable_buffer
969 {
970  public:
971  constexpr mutable_buffer() noexcept : _data(nullptr), _size(0) {}
972  constexpr mutable_buffer(void *p, size_t n) noexcept : _data(p), _size(n)
973  {
974 #ifdef ZMQ_CPP14
975  assert(p != nullptr || n == 0);
976 #endif
977  }
978 
979  constexpr void *data() const noexcept { return _data; }
980  constexpr size_t size() const noexcept { return _size; }
981  mutable_buffer &operator+=(size_t n) noexcept
982  {
983  // (std::min) is a workaround for when a min macro is defined
984  const auto shift = (std::min)(n, _size);
985  _data = static_cast<char *>(_data) + shift;
986  _size -= shift;
987  return *this;
988  }
989 
990  private:
991  void *_data;
992  size_t _size;
993 };
994 
995 inline mutable_buffer operator+(const mutable_buffer &mb, size_t n) noexcept
996 {
997  return mutable_buffer(static_cast<char *>(mb.data()) + (std::min)(n, mb.size()),
998  mb.size() - (std::min)(n, mb.size()));
999 }
1000 inline mutable_buffer operator+(size_t n, const mutable_buffer &mb) noexcept
1001 {
1002  return mb + n;
1003 }
1004 
1005 class const_buffer
1006 {
1007  public:
1008  constexpr const_buffer() noexcept : _data(nullptr), _size(0) {}
1009  constexpr const_buffer(const void *p, size_t n) noexcept : _data(p), _size(n)
1010  {
1011 #ifdef ZMQ_CPP14
1012  assert(p != nullptr || n == 0);
1013 #endif
1014  }
1015  constexpr const_buffer(const mutable_buffer &mb) noexcept :
1016  _data(mb.data()), _size(mb.size())
1017  {
1018  }
1019 
1020  constexpr const void *data() const noexcept { return _data; }
1021  constexpr size_t size() const noexcept { return _size; }
1022  const_buffer &operator+=(size_t n) noexcept
1023  {
1024  const auto shift = (std::min)(n, _size);
1025  _data = static_cast<const char *>(_data) + shift;
1026  _size -= shift;
1027  return *this;
1028  }
1029 
1030  private:
1031  const void *_data;
1032  size_t _size;
1033 };
1034 
1035 inline const_buffer operator+(const const_buffer &cb, size_t n) noexcept
1036 {
1037  return const_buffer(static_cast<const char *>(cb.data())
1038  + (std::min)(n, cb.size()),
1039  cb.size() - (std::min)(n, cb.size()));
1040 }
1041 inline const_buffer operator+(size_t n, const const_buffer &cb) noexcept
1042 {
1043  return cb + n;
1044 }
1045 
1046 // buffer creation
1047 
1048 constexpr mutable_buffer buffer(void *p, size_t n) noexcept
1049 {
1050  return mutable_buffer(p, n);
1051 }
1052 constexpr const_buffer buffer(const void *p, size_t n) noexcept
1053 {
1054  return const_buffer(p, n);
1055 }
1056 constexpr mutable_buffer buffer(const mutable_buffer &mb) noexcept
1057 {
1058  return mb;
1059 }
1060 inline mutable_buffer buffer(const mutable_buffer &mb, size_t n) noexcept
1061 {
1062  return mutable_buffer(mb.data(), (std::min)(mb.size(), n));
1063 }
1064 constexpr const_buffer buffer(const const_buffer &cb) noexcept
1065 {
1066  return cb;
1067 }
1068 inline const_buffer buffer(const const_buffer &cb, size_t n) noexcept
1069 {
1070  return const_buffer(cb.data(), (std::min)(cb.size(), n));
1071 }
1072 
1073 namespace detail
1074 {
1075 template<class T> struct is_buffer
1076 {
1077  static constexpr bool value =
1078  std::is_same<T, const_buffer>::value || std::is_same<T, mutable_buffer>::value;
1079 };
1080 
1081 template<class T> struct is_pod_like
1082 {
1083  // NOTE: The networking draft N4771 section 16.11 requires
1084  // T in the buffer functions below to be
1085  // trivially copyable OR standard layout.
1086  // Here we decide to be conservative and require both.
1087  static constexpr bool value =
1088  ZMQ_IS_TRIVIALLY_COPYABLE(T) && std::is_standard_layout<T>::value;
1089 };
1090 
1091 template<class C> constexpr auto seq_size(const C &c) noexcept -> decltype(c.size())
1092 {
1093  return c.size();
1094 }
1095 template<class T, size_t N>
1096 constexpr size_t seq_size(const T (&/*array*/)[N]) noexcept
1097 {
1098  return N;
1099 }
1100 
1101 template<class Seq>
1102 auto buffer_contiguous_sequence(Seq &&seq) noexcept
1103  -> decltype(buffer(std::addressof(*std::begin(seq)), size_t{}))
1104 {
1105  using T = typename std::remove_cv<
1107  static_assert(detail::is_pod_like<T>::value, "T must be POD");
1108 
1109  const auto size = seq_size(seq);
1110  return buffer(size != 0u ? std::addressof(*std::begin(seq)) : nullptr,
1111  size * sizeof(T));
1112 }
1113 template<class Seq>
1114 auto buffer_contiguous_sequence(Seq &&seq, size_t n_bytes) noexcept
1115  -> decltype(buffer_contiguous_sequence(seq))
1116 {
1117  using T = typename std::remove_cv<
1119  static_assert(detail::is_pod_like<T>::value, "T must be POD");
1120 
1121  const auto size = seq_size(seq);
1122  return buffer(size != 0u ? std::addressof(*std::begin(seq)) : nullptr,
1123  (std::min)(size * sizeof(T), n_bytes));
1124 }
1125 
1126 } // namespace detail
1127 
1128 // C array
1129 template<class T, size_t N> mutable_buffer buffer(T (&data)[N]) noexcept
1130 {
1131  return detail::buffer_contiguous_sequence(data);
1132 }
1133 template<class T, size_t N>
1134 mutable_buffer buffer(T (&data)[N], size_t n_bytes) noexcept
1135 {
1136  return detail::buffer_contiguous_sequence(data, n_bytes);
1137 }
1138 template<class T, size_t N> const_buffer buffer(const T (&data)[N]) noexcept
1139 {
1140  return detail::buffer_contiguous_sequence(data);
1141 }
1142 template<class T, size_t N>
1143 const_buffer buffer(const T (&data)[N], size_t n_bytes) noexcept
1144 {
1145  return detail::buffer_contiguous_sequence(data, n_bytes);
1146 }
1147 // std::array
1148 template<class T, size_t N> mutable_buffer buffer(std::array<T, N> &data) noexcept
1149 {
1150  return detail::buffer_contiguous_sequence(data);
1151 }
1152 template<class T, size_t N>
1153 mutable_buffer buffer(std::array<T, N> &data, size_t n_bytes) noexcept
1154 {
1155  return detail::buffer_contiguous_sequence(data, n_bytes);
1156 }
1157 template<class T, size_t N>
1158 const_buffer buffer(std::array<const T, N> &data) noexcept
1159 {
1160  return detail::buffer_contiguous_sequence(data);
1161 }
1162 template<class T, size_t N>
1163 const_buffer buffer(std::array<const T, N> &data, size_t n_bytes) noexcept
1164 {
1165  return detail::buffer_contiguous_sequence(data, n_bytes);
1166 }
1167 template<class T, size_t N>
1168 const_buffer buffer(const std::array<T, N> &data) noexcept
1169 {
1170  return detail::buffer_contiguous_sequence(data);
1171 }
1172 template<class T, size_t N>
1173 const_buffer buffer(const std::array<T, N> &data, size_t n_bytes) noexcept
1174 {
1175  return detail::buffer_contiguous_sequence(data, n_bytes);
1176 }
1177 // std::vector
1178 template<class T, class Allocator>
1179 mutable_buffer buffer(std::vector<T, Allocator> &data) noexcept
1180 {
1181  return detail::buffer_contiguous_sequence(data);
1182 }
1183 template<class T, class Allocator>
1184 mutable_buffer buffer(std::vector<T, Allocator> &data, size_t n_bytes) noexcept
1185 {
1186  return detail::buffer_contiguous_sequence(data, n_bytes);
1187 }
1188 template<class T, class Allocator>
1189 const_buffer buffer(const std::vector<T, Allocator> &data) noexcept
1190 {
1191  return detail::buffer_contiguous_sequence(data);
1192 }
1193 template<class T, class Allocator>
1194 const_buffer buffer(const std::vector<T, Allocator> &data, size_t n_bytes) noexcept
1195 {
1196  return detail::buffer_contiguous_sequence(data, n_bytes);
1197 }
1198 // std::basic_string
1199 template<class T, class Traits, class Allocator>
1200 mutable_buffer buffer(std::basic_string<T, Traits, Allocator> &data) noexcept
1201 {
1202  return detail::buffer_contiguous_sequence(data);
1203 }
1204 template<class T, class Traits, class Allocator>
1205 mutable_buffer buffer(std::basic_string<T, Traits, Allocator> &data,
1206  size_t n_bytes) noexcept
1207 {
1208  return detail::buffer_contiguous_sequence(data, n_bytes);
1209 }
1210 template<class T, class Traits, class Allocator>
1211 const_buffer buffer(const std::basic_string<T, Traits, Allocator> &data) noexcept
1212 {
1213  return detail::buffer_contiguous_sequence(data);
1214 }
1215 template<class T, class Traits, class Allocator>
1216 const_buffer buffer(const std::basic_string<T, Traits, Allocator> &data,
1217  size_t n_bytes) noexcept
1218 {
1219  return detail::buffer_contiguous_sequence(data, n_bytes);
1220 }
1221 
1222 #if defined(ZMQ_HAS_STRING_VIEW) && (ZMQ_HAS_STRING_VIEW > 0)
1223 // std::basic_string_view
1224 template<class T, class Traits>
1225 const_buffer buffer(std::basic_string_view<T, Traits> data) noexcept
1226 {
1227  return detail::buffer_contiguous_sequence(data);
1228 }
1229 template<class T, class Traits>
1230 const_buffer buffer(std::basic_string_view<T, Traits> data, size_t n_bytes) noexcept
1231 {
1232  return detail::buffer_contiguous_sequence(data, n_bytes);
1233 }
1234 #endif
1235 
1236 // Buffer for a string literal (null terminated)
1237 // where the buffer size excludes the terminating character.
1238 // Equivalent to zmq::buffer(std::string_view("...")).
1239 template<class Char, size_t N>
1240 constexpr const_buffer str_buffer(const Char (&data)[N]) noexcept
1241 {
1242  static_assert(detail::is_pod_like<Char>::value, "Char must be POD");
1243 #ifdef ZMQ_CPP14
1244  assert(data[N - 1] == Char{0});
1245 #endif
1246  return const_buffer(static_cast<const Char *>(data), (N - 1) * sizeof(Char));
1247 }
1248 
1249 namespace literals
1250 {
1251 constexpr const_buffer operator"" _zbuf(const char *str, size_t len) noexcept
1252 {
1253  return const_buffer(str, len * sizeof(char));
1254 }
1255 constexpr const_buffer operator"" _zbuf(const wchar_t *str, size_t len) noexcept
1256 {
1257  return const_buffer(str, len * sizeof(wchar_t));
1258 }
1259 constexpr const_buffer operator"" _zbuf(const char16_t *str, size_t len) noexcept
1260 {
1261  return const_buffer(str, len * sizeof(char16_t));
1262 }
1263 constexpr const_buffer operator"" _zbuf(const char32_t *str, size_t len) noexcept
1264 {
1265  return const_buffer(str, len * sizeof(char32_t));
1266 }
1267 }
1268 
1269 #endif // ZMQ_CPP11
1270 
1271 namespace detail
1272 {
1274 {
1275  public:
1277  ZMQ_EXPLICIT socket_base(void *handle) ZMQ_NOTHROW : _handle(handle) {}
1278 
1279  template<typename T> void setsockopt(int option_, T const &optval)
1280  {
1281  setsockopt(option_, &optval, sizeof(T));
1282  }
1283 
1284  void setsockopt(int option_, const void *optval_, size_t optvallen_)
1285  {
1286  int rc = zmq_setsockopt(_handle, option_, optval_, optvallen_);
1287  if (rc != 0)
1288  throw error_t();
1289  }
1290 
1291  void getsockopt(int option_, void *optval_, size_t *optvallen_) const
1292  {
1293  int rc = zmq_getsockopt(_handle, option_, optval_, optvallen_);
1294  if (rc != 0)
1295  throw error_t();
1296  }
1297 
1298  template<typename T> T getsockopt(int option_) const
1299  {
1300  T optval;
1301  size_t optlen = sizeof(T);
1302  getsockopt(option_, &optval, &optlen);
1303  return optval;
1304  }
1305 
1306  void bind(std::string const &addr) { bind(addr.c_str()); }
1307 
1308  void bind(const char *addr_)
1309  {
1310  int rc = zmq_bind(_handle, addr_);
1311  if (rc != 0)
1312  throw error_t();
1313  }
1314 
1315  void unbind(std::string const &addr) { unbind(addr.c_str()); }
1316 
1317  void unbind(const char *addr_)
1318  {
1319  int rc = zmq_unbind(_handle, addr_);
1320  if (rc != 0)
1321  throw error_t();
1322  }
1323 
1324  void connect(std::string const &addr) { connect(addr.c_str()); }
1325 
1326  void connect(const char *addr_)
1327  {
1328  int rc = zmq_connect(_handle, addr_);
1329  if (rc != 0)
1330  throw error_t();
1331  }
1332 
1333  void disconnect(std::string const &addr) { disconnect(addr.c_str()); }
1334 
1335  void disconnect(const char *addr_)
1336  {
1337  int rc = zmq_disconnect(_handle, addr_);
1338  if (rc != 0)
1339  throw error_t();
1340  }
1341 
1342  bool connected() const ZMQ_NOTHROW { return (_handle != ZMQ_NULLPTR); }
1343 
1344  ZMQ_CPP11_DEPRECATED("from 4.3.1, use send taking a const_buffer and send_flags")
1345  size_t send(const void *buf_, size_t len_, int flags_ = 0)
1346  {
1347  int nbytes = zmq_send(_handle, buf_, len_, flags_);
1348  if (nbytes >= 0)
1349  return static_cast<size_t>(nbytes);
1350  if (zmq_errno() == EAGAIN)
1351  return 0;
1352  throw error_t();
1353  }
1354 
1355  ZMQ_CPP11_DEPRECATED("from 4.3.1, use send taking message_t and send_flags")
1356  bool send(message_t &msg_,
1357  int flags_ = 0) // default until removed
1358  {
1359  int nbytes = zmq_msg_send(msg_.handle(), _handle, flags_);
1360  if (nbytes >= 0)
1361  return true;
1362  if (zmq_errno() == EAGAIN)
1363  return false;
1364  throw error_t();
1365  }
1366 
1367  template<typename T>
1369  "from 4.4.1, use send taking message_t or buffer (for contiguous "
1370  "ranges), and send_flags")
1371  bool send(T first, T last, int flags_ = 0)
1372  {
1373  zmq::message_t msg(first, last);
1374  int nbytes = zmq_msg_send(msg.handle(), _handle, flags_);
1375  if (nbytes >= 0)
1376  return true;
1377  if (zmq_errno() == EAGAIN)
1378  return false;
1379  throw error_t();
1380  }
1381 
1382 #ifdef ZMQ_HAS_RVALUE_REFS
1383  ZMQ_CPP11_DEPRECATED("from 4.3.1, use send taking message_t and send_flags")
1384  bool send(message_t &&msg_,
1385  int flags_ = 0) // default until removed
1386  {
1387 #ifdef ZMQ_CPP11
1388  return send(msg_, static_cast<send_flags>(flags_)).has_value();
1389 #else
1390  return send(msg_, flags_);
1391 #endif
1392  }
1393 #endif
1394 
1395 #ifdef ZMQ_CPP11
1396  send_result_t send(const_buffer buf, send_flags flags = send_flags::none)
1397  {
1398  const int nbytes =
1399  zmq_send(_handle, buf.data(), buf.size(), static_cast<int>(flags));
1400  if (nbytes >= 0)
1401  return static_cast<size_t>(nbytes);
1402  if (zmq_errno() == EAGAIN)
1403  return {};
1404  throw error_t();
1405  }
1406 
1407  send_result_t send(message_t &msg, send_flags flags)
1408  {
1409  int nbytes = zmq_msg_send(msg.handle(), _handle, static_cast<int>(flags));
1410  if (nbytes >= 0)
1411  return static_cast<size_t>(nbytes);
1412  if (zmq_errno() == EAGAIN)
1413  return {};
1414  throw error_t();
1415  }
1416 
1417  send_result_t send(message_t &&msg, send_flags flags)
1418  {
1419  return send(msg, flags);
1420  }
1421 #endif
1422 
1424  "from 4.3.1, use recv taking a mutable_buffer and recv_flags")
1425  size_t recv(void *buf_, size_t len_, int flags_ = 0)
1426  {
1427  int nbytes = zmq_recv(_handle, buf_, len_, flags_);
1428  if (nbytes >= 0)
1429  return static_cast<size_t>(nbytes);
1430  if (zmq_errno() == EAGAIN)
1431  return 0;
1432  throw error_t();
1433  }
1434 
1436  "from 4.3.1, use recv taking a reference to message_t and recv_flags")
1437  bool recv(message_t *msg_, int flags_ = 0)
1438  {
1439  int nbytes = zmq_msg_recv(msg_->handle(), _handle, flags_);
1440  if (nbytes >= 0)
1441  return true;
1442  if (zmq_errno() == EAGAIN)
1443  return false;
1444  throw error_t();
1445  }
1446 
1447 #ifdef ZMQ_CPP11
1449  recv_buffer_result_t recv(mutable_buffer buf,
1450  recv_flags flags = recv_flags::none)
1451  {
1452  const int nbytes =
1453  zmq_recv(_handle, buf.data(), buf.size(), static_cast<int>(flags));
1454  if (nbytes >= 0) {
1455  return recv_buffer_size{
1456  (std::min)(static_cast<size_t>(nbytes), buf.size()),
1457  static_cast<size_t>(nbytes)};
1458  }
1459  if (zmq_errno() == EAGAIN)
1460  return {};
1461  throw error_t();
1462  }
1463 
1465  recv_result_t recv(message_t &msg, recv_flags flags = recv_flags::none)
1466  {
1467  const int nbytes =
1468  zmq_msg_recv(msg.handle(), _handle, static_cast<int>(flags));
1469  if (nbytes >= 0) {
1470  assert(msg.size() == static_cast<size_t>(nbytes));
1471  return static_cast<size_t>(nbytes);
1472  }
1473  if (zmq_errno() == EAGAIN)
1474  return {};
1475  throw error_t();
1476  }
1477 #endif
1478 
1479 #if defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 0)
1480  void join(const char *group)
1481  {
1482  int rc = zmq_join(_handle, group);
1483  if (rc != 0)
1484  throw error_t();
1485  }
1486 
1487  void leave(const char *group)
1488  {
1489  int rc = zmq_leave(_handle, group);
1490  if (rc != 0)
1491  throw error_t();
1492  }
1493 #endif
1494 
1495  ZMQ_NODISCARD void *handle() ZMQ_NOTHROW { return _handle; }
1496  ZMQ_NODISCARD const void *handle() const ZMQ_NOTHROW { return _handle; }
1497 
1498  ZMQ_EXPLICIT operator bool() const ZMQ_NOTHROW { return _handle != ZMQ_NULLPTR; }
1499  // note: non-const operator bool can be removed once
1500  // operator void* is removed from socket_t
1501  ZMQ_EXPLICIT operator bool() ZMQ_NOTHROW { return _handle != ZMQ_NULLPTR; }
1502 
1503  protected:
1504  void *_handle;
1505 };
1506 } // namespace detail
1507 
1508 #ifdef ZMQ_CPP11
1509 enum class socket_type : int
1510 {
1511  req = ZMQ_REQ,
1512  rep = ZMQ_REP,
1513  dealer = ZMQ_DEALER,
1514  router = ZMQ_ROUTER,
1515  pub = ZMQ_PUB,
1516  sub = ZMQ_SUB,
1517  xpub = ZMQ_XPUB,
1518  xsub = ZMQ_XSUB,
1519  push = ZMQ_PUSH,
1520  pull = ZMQ_PULL,
1521 #if defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 0)
1522  server = ZMQ_SERVER,
1523  client = ZMQ_CLIENT,
1524  radio = ZMQ_RADIO,
1525  dish = ZMQ_DISH,
1526 #endif
1527 #if ZMQ_VERSION_MAJOR >= 4
1528  stream = ZMQ_STREAM,
1529 #endif
1530  pair = ZMQ_PAIR
1531 };
1532 #endif
1533 
1535 {
1536  struct _private
1537  {
1538  }; // disabling use other than with from_handle
1540 };
1541 
1544 
1545 // A non-owning nullable reference to a socket.
1546 // The reference is invalidated on socket close or destruction.
1548 {
1549  public:
1550  socket_ref() ZMQ_NOTHROW : detail::socket_base() {}
1551 #ifdef ZMQ_CPP11
1552  socket_ref(std::nullptr_t) ZMQ_NOTHROW : detail::socket_base() {}
1553 #endif
1554  socket_ref(from_handle_t /*fh*/, void *handle) ZMQ_NOTHROW
1555  : detail::socket_base(handle)
1556  {
1557  }
1558 };
1559 
1560 #ifdef ZMQ_CPP11
1561 inline bool operator==(socket_ref sr, std::nullptr_t /*p*/) ZMQ_NOTHROW
1562 {
1563  return sr.handle() == nullptr;
1564 }
1565 inline bool operator==(std::nullptr_t /*p*/, socket_ref sr) ZMQ_NOTHROW
1566 {
1567  return sr.handle() == nullptr;
1568 }
1569 inline bool operator!=(socket_ref sr, std::nullptr_t /*p*/) ZMQ_NOTHROW
1570 {
1571  return !(sr == nullptr);
1572 }
1573 inline bool operator!=(std::nullptr_t /*p*/, socket_ref sr) ZMQ_NOTHROW
1574 {
1575  return !(sr == nullptr);
1576 }
1577 #endif
1578 
1580 {
1581  return std::equal_to<void *>()(a.handle(), b.handle());
1582 }
1584 {
1585  return !(a == b);
1586 }
1588 {
1589  return std::less<void *>()(a.handle(), b.handle());
1590 }
1592 {
1593  return b < a;
1594 }
1596 {
1597  return !(a > b);
1598 }
1600 {
1601  return !(a < b);
1602 }
1603 
1604 } // namespace zmq
1605 
1606 #ifdef ZMQ_CPP11
1607 namespace std
1608 {
1609 template<> struct hash<zmq::socket_ref>
1610 {
1611  size_t operator()(zmq::socket_ref sr) const ZMQ_NOTHROW
1612  {
1613  return hash<void *>()(sr.handle());
1614  }
1615 };
1616 } // namespace std
1617 #endif
1618 
1619 namespace zmq
1620 {
1622 {
1623  friend class monitor_t;
1624 
1625  public:
1626  socket_t() ZMQ_NOTHROW : detail::socket_base(ZMQ_NULLPTR), ctxptr(ZMQ_NULLPTR) {}
1627 
1628  socket_t(context_t &context_, int type_) :
1629  detail::socket_base(zmq_socket(static_cast<void *>(context_), type_)),
1630  ctxptr(static_cast<void *>(context_))
1631  {
1632  if (_handle == ZMQ_NULLPTR)
1633  throw error_t();
1634  }
1635 
1636 #ifdef ZMQ_CPP11
1637  socket_t(context_t &context_, socket_type type_) :
1638  socket_t(context_, static_cast<int>(type_))
1639  {
1640  }
1641 #endif
1642 
1643 #ifdef ZMQ_HAS_RVALUE_REFS
1644  socket_t(socket_t &&rhs) ZMQ_NOTHROW : detail::socket_base(rhs._handle),
1645  ctxptr(rhs.ctxptr)
1646  {
1647  rhs._handle = ZMQ_NULLPTR;
1648  rhs.ctxptr = ZMQ_NULLPTR;
1649  }
1650  socket_t &operator=(socket_t &&rhs) ZMQ_NOTHROW
1651  {
1652  close();
1653  std::swap(_handle, rhs._handle);
1654  return *this;
1655  }
1656 #endif
1657 
1658  ~socket_t() ZMQ_NOTHROW { close(); }
1659 
1660  operator void *() ZMQ_NOTHROW { return _handle; }
1661 
1662  operator void const *() const ZMQ_NOTHROW { return _handle; }
1663 
1665  {
1666  if (_handle == ZMQ_NULLPTR)
1667  // already closed
1668  return;
1669  int rc = zmq_close(_handle);
1670  ZMQ_ASSERT(rc == 0);
1671  _handle = ZMQ_NULLPTR;
1672  }
1673 
1675  {
1676  std::swap(_handle, other._handle);
1677  std::swap(ctxptr, other.ctxptr);
1678  }
1679 
1680  operator socket_ref() ZMQ_NOTHROW { return socket_ref(from_handle, _handle); }
1681 
1682  private:
1683  void *ctxptr;
1684 
1686  void operator=(const socket_t &) ZMQ_DELETED_FUNCTION;
1687 
1688  // used by monitor_t
1689  socket_t(void *context_, int type_) :
1690  detail::socket_base(zmq_socket(context_, type_)), ctxptr(context_)
1691  {
1692  if (_handle == ZMQ_NULLPTR)
1693  throw error_t();
1694  }
1695 };
1696 
1697 inline void swap(socket_t &a, socket_t &b) ZMQ_NOTHROW
1698 {
1699  a.swap(b);
1700 }
1701 
1702 ZMQ_DEPRECATED("from 4.3.1, use proxy taking socket_t objects")
1703 inline void proxy(void *frontend, void *backend, void *capture)
1704 {
1705  int rc = zmq_proxy(frontend, backend, capture);
1706  if (rc != 0)
1707  throw error_t();
1708 }
1709 
1710 inline void
1711 proxy(socket_ref frontend, socket_ref backend, socket_ref capture = socket_ref())
1712 {
1713  int rc = zmq_proxy(frontend.handle(), backend.handle(), capture.handle());
1714  if (rc != 0)
1715  throw error_t();
1716 }
1717 
1718 #ifdef ZMQ_HAS_PROXY_STEERABLE
1719 ZMQ_DEPRECATED("from 4.3.1, use proxy_steerable taking socket_t objects")
1720 inline void
1721 proxy_steerable(void *frontend, void *backend, void *capture, void *control)
1722 {
1723  int rc = zmq_proxy_steerable(frontend, backend, capture, control);
1724  if (rc != 0)
1725  throw error_t();
1726 }
1727 
1728 inline void proxy_steerable(socket_ref frontend,
1729  socket_ref backend,
1730  socket_ref capture,
1731  socket_ref control)
1732 {
1733  int rc = zmq_proxy_steerable(frontend.handle(), backend.handle(),
1734  capture.handle(), control.handle());
1735  if (rc != 0)
1736  throw error_t();
1737 }
1738 #endif
1739 
1741 {
1742  public:
1743  monitor_t() : _socket(), _monitor_socket() {}
1744 
1745  virtual ~monitor_t() { close(); }
1746 
1747 #ifdef ZMQ_HAS_RVALUE_REFS
1748  monitor_t(monitor_t &&rhs) ZMQ_NOTHROW : _socket(), _monitor_socket()
1749  {
1750  std::swap(_socket, rhs._socket);
1751  std::swap(_monitor_socket, rhs._monitor_socket);
1752  }
1753 
1754  monitor_t &operator=(monitor_t &&rhs) ZMQ_NOTHROW
1755  {
1756  close();
1757  _socket = socket_ref();
1758  std::swap(_socket, rhs._socket);
1759  std::swap(_monitor_socket, rhs._monitor_socket);
1760  return *this;
1761  }
1762 #endif
1763 
1764 
1765  void
1766  monitor(socket_t &socket, std::string const &addr, int events = ZMQ_EVENT_ALL)
1767  {
1768  monitor(socket, addr.c_str(), events);
1769  }
1770 
1771  void monitor(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL)
1772  {
1773  init(socket, addr_, events);
1774  while (true) {
1775  check_event(-1);
1776  }
1777  }
1778 
1779  void init(socket_t &socket, std::string const &addr, int events = ZMQ_EVENT_ALL)
1780  {
1781  init(socket, addr.c_str(), events);
1782  }
1783 
1784  void init(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL)
1785  {
1786  int rc = zmq_socket_monitor(socket.handle(), addr_, events);
1787  if (rc != 0)
1788  throw error_t();
1789 
1790  _socket = socket;
1791  _monitor_socket = socket_t(socket.ctxptr, ZMQ_PAIR);
1792  _monitor_socket.connect(addr_);
1793 
1794  on_monitor_started();
1795  }
1796 
1797  bool check_event(int timeout = 0)
1798  {
1799  assert(_monitor_socket);
1800 
1801  zmq_msg_t eventMsg;
1802  zmq_msg_init(&eventMsg);
1803 
1804  zmq::pollitem_t items[] = {
1805  {_monitor_socket.handle(), 0, ZMQ_POLLIN, 0},
1806  };
1807 
1808  zmq::poll(&items[0], 1, timeout);
1809 
1810  if (items[0].revents & ZMQ_POLLIN) {
1811  int rc = zmq_msg_recv(&eventMsg, _monitor_socket.handle(), 0);
1812  if (rc == -1 && zmq_errno() == ETERM)
1813  return false;
1814  assert(rc != -1);
1815 
1816  } else {
1817  zmq_msg_close(&eventMsg);
1818  return false;
1819  }
1820 
1821 #if ZMQ_VERSION_MAJOR >= 4
1822  const char *data = static_cast<const char *>(zmq_msg_data(&eventMsg));
1823  zmq_event_t msgEvent;
1824  memcpy(&msgEvent.event, data, sizeof(uint16_t));
1825  data += sizeof(uint16_t);
1826  memcpy(&msgEvent.value, data, sizeof(int32_t));
1827  zmq_event_t *event = &msgEvent;
1828 #else
1829  zmq_event_t *event = static_cast<zmq_event_t *>(zmq_msg_data(&eventMsg));
1830 #endif
1831 
1832 #ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT
1833  zmq_msg_t addrMsg;
1834  zmq_msg_init(&addrMsg);
1835  int rc = zmq_msg_recv(&addrMsg, _monitor_socket.handle(), 0);
1836  if (rc == -1 && zmq_errno() == ETERM) {
1837  zmq_msg_close(&eventMsg);
1838  return false;
1839  }
1840 
1841  assert(rc != -1);
1842  const char *str = static_cast<const char *>(zmq_msg_data(&addrMsg));
1843  std::string address(str, str + zmq_msg_size(&addrMsg));
1844  zmq_msg_close(&addrMsg);
1845 #else
1846  // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types.
1847  std::string address = event->data.connected.addr;
1848 #endif
1849 
1850 #ifdef ZMQ_EVENT_MONITOR_STOPPED
1851  if (event->event == ZMQ_EVENT_MONITOR_STOPPED) {
1852  zmq_msg_close(&eventMsg);
1853  return false;
1854  }
1855 
1856 #endif
1857 
1858  switch (event->event) {
1859  case ZMQ_EVENT_CONNECTED:
1860  on_event_connected(*event, address.c_str());
1861  break;
1862  case ZMQ_EVENT_CONNECT_DELAYED:
1863  on_event_connect_delayed(*event, address.c_str());
1864  break;
1865  case ZMQ_EVENT_CONNECT_RETRIED:
1866  on_event_connect_retried(*event, address.c_str());
1867  break;
1868  case ZMQ_EVENT_LISTENING:
1869  on_event_listening(*event, address.c_str());
1870  break;
1871  case ZMQ_EVENT_BIND_FAILED:
1872  on_event_bind_failed(*event, address.c_str());
1873  break;
1874  case ZMQ_EVENT_ACCEPTED:
1875  on_event_accepted(*event, address.c_str());
1876  break;
1877  case ZMQ_EVENT_ACCEPT_FAILED:
1878  on_event_accept_failed(*event, address.c_str());
1879  break;
1880  case ZMQ_EVENT_CLOSED:
1881  on_event_closed(*event, address.c_str());
1882  break;
1883  case ZMQ_EVENT_CLOSE_FAILED:
1884  on_event_close_failed(*event, address.c_str());
1885  break;
1886  case ZMQ_EVENT_DISCONNECTED:
1887  on_event_disconnected(*event, address.c_str());
1888  break;
1889 #ifdef ZMQ_BUILD_DRAFT_API
1890 #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3)
1891  case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL:
1892  on_event_handshake_failed_no_detail(*event, address.c_str());
1893  break;
1894  case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL:
1895  on_event_handshake_failed_protocol(*event, address.c_str());
1896  break;
1897  case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH:
1898  on_event_handshake_failed_auth(*event, address.c_str());
1899  break;
1900  case ZMQ_EVENT_HANDSHAKE_SUCCEEDED:
1901  on_event_handshake_succeeded(*event, address.c_str());
1902  break;
1903 #elif ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1)
1904  case ZMQ_EVENT_HANDSHAKE_FAILED:
1905  on_event_handshake_failed(*event, address.c_str());
1906  break;
1907  case ZMQ_EVENT_HANDSHAKE_SUCCEED:
1908  on_event_handshake_succeed(*event, address.c_str());
1909  break;
1910 #endif
1911 #endif
1912  default:
1913  on_event_unknown(*event, address.c_str());
1914  break;
1915  }
1916  zmq_msg_close(&eventMsg);
1917 
1918  return true;
1919  }
1920 
1921 #ifdef ZMQ_EVENT_MONITOR_STOPPED
1922  void abort()
1923  {
1924  if (_socket)
1925  zmq_socket_monitor(_socket.handle(), ZMQ_NULLPTR, 0);
1926 
1927  _socket = socket_ref();
1928  }
1929 #endif
1930  virtual void on_monitor_started() {}
1931  virtual void on_event_connected(const zmq_event_t &event_, const char *addr_)
1932  {
1933  (void) event_;
1934  (void) addr_;
1935  }
1936  virtual void on_event_connect_delayed(const zmq_event_t &event_,
1937  const char *addr_)
1938  {
1939  (void) event_;
1940  (void) addr_;
1941  }
1942  virtual void on_event_connect_retried(const zmq_event_t &event_,
1943  const char *addr_)
1944  {
1945  (void) event_;
1946  (void) addr_;
1947  }
1948  virtual void on_event_listening(const zmq_event_t &event_, const char *addr_)
1949  {
1950  (void) event_;
1951  (void) addr_;
1952  }
1953  virtual void on_event_bind_failed(const zmq_event_t &event_, const char *addr_)
1954  {
1955  (void) event_;
1956  (void) addr_;
1957  }
1958  virtual void on_event_accepted(const zmq_event_t &event_, const char *addr_)
1959  {
1960  (void) event_;
1961  (void) addr_;
1962  }
1963  virtual void on_event_accept_failed(const zmq_event_t &event_, const char *addr_)
1964  {
1965  (void) event_;
1966  (void) addr_;
1967  }
1968  virtual void on_event_closed(const zmq_event_t &event_, const char *addr_)
1969  {
1970  (void) event_;
1971  (void) addr_;
1972  }
1973  virtual void on_event_close_failed(const zmq_event_t &event_, const char *addr_)
1974  {
1975  (void) event_;
1976  (void) addr_;
1977  }
1978  virtual void on_event_disconnected(const zmq_event_t &event_, const char *addr_)
1979  {
1980  (void) event_;
1981  (void) addr_;
1982  }
1983 #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3)
1985  const char *addr_)
1986  {
1987  (void) event_;
1988  (void) addr_;
1989  }
1991  const char *addr_)
1992  {
1993  (void) event_;
1994  (void) addr_;
1995  }
1996  virtual void on_event_handshake_failed_auth(const zmq_event_t &event_,
1997  const char *addr_)
1998  {
1999  (void) event_;
2000  (void) addr_;
2001  }
2002  virtual void on_event_handshake_succeeded(const zmq_event_t &event_,
2003  const char *addr_)
2004  {
2005  (void) event_;
2006  (void) addr_;
2007  }
2008 #elif ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1)
2009  virtual void on_event_handshake_failed(const zmq_event_t &event_,
2010  const char *addr_)
2011  {
2012  (void) event_;
2013  (void) addr_;
2014  }
2015  virtual void on_event_handshake_succeed(const zmq_event_t &event_,
2016  const char *addr_)
2017  {
2018  (void) event_;
2019  (void) addr_;
2020  }
2021 #endif
2022  virtual void on_event_unknown(const zmq_event_t &event_, const char *addr_)
2023  {
2024  (void) event_;
2025  (void) addr_;
2026  }
2027 
2028  private:
2030  void operator=(const monitor_t &) ZMQ_DELETED_FUNCTION;
2031 
2032  socket_ref _socket;
2033  socket_t _monitor_socket;
2034 
2035  void close() ZMQ_NOTHROW
2036  {
2037  if (_socket)
2038  zmq_socket_monitor(_socket.handle(), ZMQ_NULLPTR, 0);
2039  _monitor_socket.close();
2040  }
2041 };
2042 
2043 #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
2044 
2045 // polling events
2046 enum class event_flags : short
2047 {
2048  none = 0,
2049  pollin = ZMQ_POLLIN,
2050  pollout = ZMQ_POLLOUT,
2051  pollerr = ZMQ_POLLERR,
2052  pollpri = ZMQ_POLLPRI
2053 };
2054 
2055 constexpr event_flags operator|(event_flags a, event_flags b) noexcept
2056 {
2057  return detail::enum_bit_or(a, b);
2058 }
2059 constexpr event_flags operator&(event_flags a, event_flags b) noexcept
2060 {
2061  return detail::enum_bit_and(a, b);
2062 }
2063 constexpr event_flags operator^(event_flags a, event_flags b) noexcept
2064 {
2065  return detail::enum_bit_xor(a, b);
2066 }
2067 constexpr event_flags operator~(event_flags a) noexcept
2068 {
2069  return detail::enum_bit_not(a);
2070 }
2071 
2072 struct no_user_data;
2073 
2074 // layout compatible with zmq_poller_event_t
2075 template<class T = no_user_data> struct poller_event
2076 {
2077  socket_ref socket;
2078 #ifdef _WIN32
2079  SOCKET fd;
2080 #else
2081  int fd;
2082 #endif
2083  T *user_data;
2084  event_flags events;
2085 };
2086 
2087 template<typename T = no_user_data> class poller_t
2088 {
2089  public:
2090  using event_type = poller_event<T>;
2091 
2092  poller_t() : poller_ptr(zmq_poller_new())
2093  {
2094  if (!poller_ptr)
2095  throw error_t();
2096  }
2097 
2098  template<
2099  typename Dummy = void,
2100  typename =
2101  typename std::enable_if<!std::is_same<T, no_user_data>::value, Dummy>::type>
2102  void add(zmq::socket_ref socket, event_flags events, T *user_data)
2103  {
2104  add_impl(socket, events, user_data);
2105  }
2106 
2107  void add(zmq::socket_ref socket, event_flags events)
2108  {
2109  add_impl(socket, events, nullptr);
2110  }
2111 
2112  void remove(zmq::socket_ref socket)
2113  {
2114  if (0 != zmq_poller_remove(poller_ptr.get(), socket.handle())) {
2115  throw error_t();
2116  }
2117  }
2118 
2119  void modify(zmq::socket_ref socket, event_flags events)
2120  {
2121  if (0
2122  != zmq_poller_modify(poller_ptr.get(), socket.handle(),
2123  static_cast<short>(events))) {
2124  throw error_t();
2125  }
2126  }
2127 
2128  size_t wait_all(std::vector<event_type> &poller_events,
2129  const std::chrono::milliseconds timeout)
2130  {
2131  int rc = zmq_poller_wait_all(
2132  poller_ptr.get(),
2133  reinterpret_cast<zmq_poller_event_t *>(poller_events.data()),
2134  static_cast<int>(poller_events.size()),
2135  static_cast<long>(timeout.count()));
2136  if (rc > 0)
2137  return static_cast<size_t>(rc);
2138 
2139 #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3)
2140  if (zmq_errno() == EAGAIN)
2141 #else
2142  if (zmq_errno() == ETIMEDOUT)
2143 #endif
2144  return 0;
2145 
2146  throw error_t();
2147  }
2148 
2149  private:
2150  struct destroy_poller_t
2151  {
2152  void operator()(void *ptr) noexcept
2153  {
2154  int rc = zmq_poller_destroy(&ptr);
2155  ZMQ_ASSERT(rc == 0);
2156  }
2157  };
2158 
2159  std::unique_ptr<void, destroy_poller_t> poller_ptr;
2160 
2161  void add_impl(zmq::socket_ref socket, event_flags events, T *user_data)
2162  {
2163  if (0
2164  != zmq_poller_add(poller_ptr.get(), socket.handle(), user_data,
2165  static_cast<short>(events))) {
2166  throw error_t();
2167  }
2168  }
2169 };
2170 #endif // defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
2171 
2172 inline std::ostream &operator<<(std::ostream &os, const message_t &msg)
2173 {
2174  return os << msg.str();
2175 }
2176 
2177 } // namespace zmq
2178 
2179 #endif // __ZMQ_HPP_INCLUDED__
void monitor(socket_t &socket, const char *addr_, int events=ZMQ_EVENT_ALL)
Definition: zmq.hpp:1771
void swap(context_t &other) ZMQ_NOTHROW
Definition: zmq.hpp:788
void setsockopt(int option_, const void *optval_, size_t optvallen_)
Definition: zmq.hpp:1284
context_t(int io_threads_, int max_sockets_=ZMQ_MAX_SOCKETS_DFLT)
Definition: zmq.hpp:697
bool operator>=(socket_ref a, socket_ref b) ZMQ_NOTHROW
Definition: zmq.hpp:1599
ZMQ_CONSTEXPR_FN ZMQ_EXPLICIT from_handle_t(_private) ZMQ_NOTHROW
Definition: zmq.hpp:1539
virtual const char * what() const ZMQ_NOTHROW ZMQ_OVERRIDE
Definition: zmq.hpp:249
ZMQ_NODISCARD const zmq_msg_t * handle() const ZMQ_NOTHROW
Definition: zmq.hpp:624
#define ZMQ_DELETED_FUNCTION
Definition: zmq.hpp:146
bool operator<=(socket_ref a, socket_ref b) ZMQ_NOTHROW
Definition: zmq.hpp:1595
void monitor(socket_t &socket, std::string const &addr, int events=ZMQ_EVENT_ALL)
Definition: zmq.hpp:1766
virtual void on_event_handshake_failed_protocol(const zmq_event_t &event_, const char *addr_)
Definition: zmq.hpp:1990
const char * addr
inproc hangs. no messages ever get received by server. tcp/ipc okay.
Definition: test_tcs.cpp:16
zmq_pollitem_t pollitem_t
Definition: zmq.hpp:243
virtual void on_event_connect_delayed(const zmq_event_t &event_, const char *addr_)
Definition: zmq.hpp:1936
#define ZMQ_CONSTEXPR_FN
Definition: zmq.hpp:79
void close() ZMQ_NOTHROW
Definition: zmq.hpp:754
void setsockopt(int option_, T const &optval)
Definition: zmq.hpp:1279
bool connected() const ZMQ_NOTHROW
Definition: zmq.hpp:1342
#define ZMQ_NOTHROW
Definition: zmq.hpp:75
const char * gets(const char *property_)
Definition: zmq.hpp:527
void swap(socket_t &other) ZMQ_NOTHROW
Definition: zmq.hpp:1674
void bind(const char *addr_)
Definition: zmq.hpp:1308
void server()
T * data() ZMQ_NOTHROW
Definition: zmq.hpp:495
virtual void on_event_closed(const zmq_event_t &event_, const char *addr_)
Definition: zmq.hpp:1968
virtual void on_event_disconnected(const zmq_event_t &event_, const char *addr_)
Definition: zmq.hpp:1978
virtual void on_event_accepted(const zmq_event_t &event_, const char *addr_)
Definition: zmq.hpp:1958
void rebuild(const void *data_, size_t size_)
Definition: zmq.hpp:424
message_t(const void *data_, size_t size_)
Definition: zmq.hpp:358
ZMQ_NODISCARD bool empty() const ZMQ_NOTHROW
Definition: zmq.hpp:493
void unbind(std::string const &addr)
Definition: zmq.hpp:1315
void disconnect(const char *addr_)
Definition: zmq.hpp:1335
void close() ZMQ_NOTHROW
Definition: zmq.hpp:1664
#define ZMQ_CONSTEXPR_VAR
Definition: zmq.hpp:80
std::string to_string() const
Definition: zmq.hpp:563
bool operator<(socket_ref a, socket_ref b) ZMQ_NOTHROW
Definition: zmq.hpp:1587
virtual void on_event_handshake_succeeded(const zmq_event_t &event_, const char *addr_)
Definition: zmq.hpp:2002
virtual ~monitor_t()
Definition: zmq.hpp:1745
ZMQ_NODISCARD zmq_msg_t * handle() ZMQ_NOTHROW
Definition: zmq.hpp:623
T const * data() const ZMQ_NOTHROW
Definition: zmq.hpp:497
~message_t() ZMQ_NOTHROW
Definition: zmq.hpp:399
ZMQ_EXPLICIT socket_base(void *handle) ZMQ_NOTHROW
Definition: zmq.hpp:1277
void proxy_steerable(void *frontend, void *backend, void *capture, void *control)
Definition: zmq.hpp:1721
void * data() ZMQ_NOTHROW
Definition: zmq.hpp:481
socket_ref() ZMQ_NOTHROW
Definition: zmq.hpp:1550
zmq_free_fn free_fn
Definition: zmq.hpp:242
int poll(zmq_pollitem_t *items_, size_t nitems_, long timeout_=-1)
Definition: zmq.hpp:259
#define ZMQ_NODISCARD
Definition: zmq.hpp:63
void proxy(void *frontend, void *backend, void *capture)
Definition: zmq.hpp:1703
virtual void on_event_handshake_failed_auth(const zmq_event_t &event_, const char *addr_)
Definition: zmq.hpp:1996
int num() const
Definition: zmq.hpp:253
void rebuild()
Definition: zmq.hpp:405
void move(message_t &msg_)
Definition: zmq.hpp:453
socket_t() ZMQ_NOTHROW
Definition: zmq.hpp:1626
const void * data() const ZMQ_NOTHROW
Definition: zmq.hpp:483
void copy(message_t &msg_)
Definition: zmq.hpp:468
void init(socket_t &socket, const char *addr_, int events=ZMQ_EVENT_ALL)
Definition: zmq.hpp:1784
#define ZMQ_OVERRIDE
Definition: zmq.hpp:77
bool check_event(int timeout=0)
Definition: zmq.hpp:1797
message_t(ForwardIter first, ForwardIter last)
Definition: zmq.hpp:345
virtual void on_event_close_failed(const zmq_event_t &event_, const char *addr_)
Definition: zmq.hpp:1973
bool operator!=(socket_ref a, socket_ref b) ZMQ_NOTHROW
Definition: zmq.hpp:1583
virtual void on_event_handshake_failed_no_detail(const zmq_event_t &event_, const char *addr_)
Definition: zmq.hpp:1984
std::ostream & operator<<(std::ostream &os, const message_t &msg)
Definition: zmq.hpp:2172
#define ZMQ_ASSERT(expression)
Definition: zmq.hpp:190
message_t(void *data_, size_t size_, free_fn *ffn_, void *hint_=ZMQ_NULLPTR)
Definition: zmq.hpp:366
socket_base() ZMQ_NOTHROW
Definition: zmq.hpp:1276
~socket_t() ZMQ_NOTHROW
Definition: zmq.hpp:1658
T getsockopt(int option_) const
Definition: zmq.hpp:1298
uint16_t event
Definition: zmq.hpp:176
void getsockopt(int option_, void *optval_, size_t *optvallen_) const
Definition: zmq.hpp:1291
virtual void on_event_unknown(const zmq_event_t &event_, const char *addr_)
Definition: zmq.hpp:2022
socket_t(context_t &context_, int type_)
Definition: zmq.hpp:1628
void rebuild(size_t size_)
Definition: zmq.hpp:414
std::string str() const
Definition: zmq.hpp:581
message_t(size_t size_)
Definition: zmq.hpp:338
bool operator>(socket_ref a, socket_ref b) ZMQ_NOTHROW
Definition: zmq.hpp:1591
bool operator==(const message_t &other) const ZMQ_NOTHROW
Definition: zmq.hpp:505
virtual void on_event_listening(const zmq_event_t &event_, const char *addr_)
Definition: zmq.hpp:1948
void client(std::string str)
void version(int *major_, int *minor_, int *patch_)
Definition: zmq.hpp:315
virtual void on_event_connected(const zmq_event_t &event_, const char *addr_)
Definition: zmq.hpp:1931
void connect(std::string const &addr)
Definition: zmq.hpp:1324
socket_ref(from_handle_t, void *handle) ZMQ_NOTHROW
Definition: zmq.hpp:1554
bool operator!=(const message_t &other) const ZMQ_NOTHROW
Definition: zmq.hpp:511
void connect(const char *addr_)
Definition: zmq.hpp:1326
void swap(message_t &a, message_t &b) ZMQ_NOTHROW
Definition: zmq.hpp:636
void shutdown() ZMQ_NOTHROW
Definition: zmq.hpp:771
size_t size() const ZMQ_NOTHROW
Definition: zmq.hpp:488
void bind(std::string const &addr)
Definition: zmq.hpp:1306
void rebuild(void *data_, size_t size_, free_fn *ffn_, void *hint_=ZMQ_NULLPTR)
Definition: zmq.hpp:435
virtual void on_monitor_started()
Definition: zmq.hpp:1930
virtual void on_event_bind_failed(const zmq_event_t &event_, const char *addr_)
Definition: zmq.hpp:1953
void init(socket_t &socket, std::string const &addr, int events=ZMQ_EVENT_ALL)
Definition: zmq.hpp:1779
virtual void on_event_accept_failed(const zmq_event_t &event_, const char *addr_)
Definition: zmq.hpp:1963
message_t() ZMQ_NOTHROW
Definition: zmq.hpp:332
void disconnect(std::string const &addr)
Definition: zmq.hpp:1333
bool operator==(socket_ref a, socket_ref b) ZMQ_NOTHROW
Definition: zmq.hpp:1579
~context_t() ZMQ_NOTHROW
Definition: zmq.hpp:720
void swap(socket_t &a, socket_t &b) ZMQ_NOTHROW
Definition: zmq.hpp:1697
Definition: zmq.hpp:195
int32_t value
Definition: zmq.hpp:177
void swap(message_t &other) ZMQ_NOTHROW
Definition: zmq.hpp:617
#define ZMQ_CPP11_DEPRECATED(msg)
Definition: zmq.hpp:81
#define ZMQ_EXPLICIT
Definition: zmq.hpp:76
void unbind(const char *addr_)
Definition: zmq.hpp:1317
#define ZMQ_NULLPTR
Definition: zmq.hpp:78
virtual void on_event_connect_retried(const zmq_event_t &event_, const char *addr_)
Definition: zmq.hpp:1942
ZMQ_CONSTEXPR_VAR from_handle_t from_handle
Definition: zmq.hpp:1542
bool more() const ZMQ_NOTHROW
Definition: zmq.hpp:475