Unverified Commit 9fbd125b authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #2917 from ZMQers/thread-safe-simplification

Problem: code duplication and unnecessary nesting around ZMQ_THREAD_SAFE querying
parents cb9ccfa1 147fe9ed
...@@ -31,6 +31,17 @@ ...@@ -31,6 +31,17 @@
#include "socket_poller.hpp" #include "socket_poller.hpp"
#include "err.hpp" #include "err.hpp"
static bool is_thread_safe (zmq::socket_base_t &socket)
{
int thread_safe;
size_t thread_safe_size = sizeof (int);
int rc =
socket.getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size);
zmq_assert (rc == 0);
return thread_safe;
}
zmq::socket_poller_t::socket_poller_t () : zmq::socket_poller_t::socket_poller_t () :
tag (0xCAFEBABE), tag (0xCAFEBABE),
signaler (NULL), signaler (NULL),
...@@ -67,15 +78,10 @@ zmq::socket_poller_t::~socket_poller_t () ...@@ -67,15 +78,10 @@ zmq::socket_poller_t::~socket_poller_t ()
tag = 0xdeadbeef; tag = 0xdeadbeef;
for (items_t::iterator it = items.begin (); it != items.end (); ++it) { for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
if (it->socket && it->socket->check_tag ()) { // TODO shouldn't this zmq_assert (it->socket->check_tag ()) instead?
int thread_safe; if (it->socket && it->socket->check_tag ()
size_t thread_safe_size = sizeof (int); && is_thread_safe (*it->socket)) {
it->socket->remove_signaler (signaler);
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe,
&thread_safe_size)
== 0
&& thread_safe)
it->socket->remove_signaler (signaler);
} }
} }
...@@ -108,14 +114,7 @@ int zmq::socket_poller_t::add (socket_base_t *socket_, ...@@ -108,14 +114,7 @@ int zmq::socket_poller_t::add (socket_base_t *socket_,
} }
} }
int thread_safe; if (is_thread_safe (*socket_)) {
size_t thread_safe_size = sizeof (int);
int rc =
socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size);
zmq_assert (rc == 0);
if (thread_safe) {
if (signaler == NULL) { if (signaler == NULL) {
signaler = new (std::nothrow) signaler_t (); signaler = new (std::nothrow) signaler_t ();
if (!signaler) { if (!signaler) {
...@@ -233,13 +232,9 @@ int zmq::socket_poller_t::remove (socket_base_t *socket_) ...@@ -233,13 +232,9 @@ int zmq::socket_poller_t::remove (socket_base_t *socket_)
items.erase (it); items.erase (it);
need_rebuild = true; need_rebuild = true;
int thread_safe; if (is_thread_safe (*socket_)) {
size_t thread_safe_size = sizeof (int);
if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size)
== 0
&& thread_safe)
socket_->remove_signaler (signaler); socket_->remove_signaler (signaler);
}
return 0; return 0;
} }
...@@ -279,21 +274,11 @@ void zmq::socket_poller_t::rebuild () ...@@ -279,21 +274,11 @@ void zmq::socket_poller_t::rebuild ()
for (items_t::iterator it = items.begin (); it != items.end (); ++it) { for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
if (it->events) { if (it->events) {
if (it->socket) { if (it->socket && is_thread_safe (*it->socket)) {
int thread_safe; if (!use_signaler) {
size_t thread_safe_size = sizeof (int); use_signaler = true;
int rc = it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe,
&thread_safe_size);
zmq_assert (rc == 0);
if (thread_safe) {
if (!use_signaler) {
use_signaler = true;
poll_size++;
}
} else
poll_size++; poll_size++;
}
} else } else
poll_size++; poll_size++;
} }
...@@ -316,17 +301,10 @@ void zmq::socket_poller_t::rebuild () ...@@ -316,17 +301,10 @@ void zmq::socket_poller_t::rebuild ()
for (items_t::iterator it = items.begin (); it != items.end (); ++it) { for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
if (it->events) { if (it->events) {
if (it->socket) { if (it->socket) {
int thread_safe; if (!is_thread_safe (*it->socket)) {
size_t thread_safe_size = sizeof (int);
int rc = it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe,
&thread_safe_size);
zmq_assert (rc == 0);
if (!thread_safe) {
size_t fd_size = sizeof (zmq::fd_t); size_t fd_size = sizeof (zmq::fd_t);
rc = it->socket->getsockopt (ZMQ_FD, &pollfds[item_nbr].fd, int rc = it->socket->getsockopt (
&fd_size); ZMQ_FD, &pollfds[item_nbr].fd, &fd_size);
zmq_assert (rc == 0); zmq_assert (rc == 0);
pollfds[item_nbr].events = POLLIN; pollfds[item_nbr].events = POLLIN;
...@@ -359,20 +337,11 @@ void zmq::socket_poller_t::rebuild () ...@@ -359,20 +337,11 @@ void zmq::socket_poller_t::rebuild ()
use_signaler = false; use_signaler = false;
for (items_t::iterator it = items.begin (); it != items.end (); ++it) { for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
if (it->socket) { if (it->socket && is_thread_safe (*it->socket) && it->events) {
int thread_safe; use_signaler = true;
size_t thread_safe_size = sizeof (int); FD_SET (signaler->get_fd (), &pollset_in);
poll_size = 1;
int rc = it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, break;
&thread_safe_size);
zmq_assert (rc == 0);
if (thread_safe && it->events) {
use_signaler = true;
FD_SET (signaler->get_fd (), &pollset_in);
poll_size = 1;
break;
}
} }
} }
...@@ -384,17 +353,11 @@ void zmq::socket_poller_t::rebuild () ...@@ -384,17 +353,11 @@ void zmq::socket_poller_t::rebuild ()
// If the poll item is a 0MQ socket we are interested in input on the // If the poll item is a 0MQ socket we are interested in input on the
// notification file descriptor retrieved by the ZMQ_FD socket option. // notification file descriptor retrieved by the ZMQ_FD socket option.
if (it->socket) { if (it->socket) {
int thread_safe; if (!is_thread_safe (*it->socket)) {
size_t thread_safe_size = sizeof (int);
int rc = it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe,
&thread_safe_size);
zmq_assert (rc == 0);
if (!thread_safe) {
zmq::fd_t notify_fd; zmq::fd_t notify_fd;
size_t fd_size = sizeof (zmq::fd_t); size_t fd_size = sizeof (zmq::fd_t);
rc = it->socket->getsockopt (ZMQ_FD, &notify_fd, &fd_size); int rc =
it->socket->getsockopt (ZMQ_FD, &notify_fd, &fd_size);
zmq_assert (rc == 0); zmq_assert (rc == 0);
FD_SET (notify_fd, &pollset_in); FD_SET (notify_fd, &pollset_in);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment