Commit 580c5b28 authored by Constantin Rack's avatar Constantin Rack Committed by GitHub

Merge pull request #2364 from somdoron/master

problem: zmq_poll is slow because FD is being created on every call
parents 651f81e8 f694a2d9
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
zmq::socket_poller_t::socket_poller_t () : zmq::socket_poller_t::socket_poller_t () :
tag (0xCAFEBABE), tag (0xCAFEBABE),
signaler (NULL),
need_rebuild (true), need_rebuild (true),
use_signaler (false), use_signaler (false),
poll_size(0) poll_size(0)
...@@ -62,10 +63,15 @@ zmq::socket_poller_t::~socket_poller_t () ...@@ -62,10 +63,15 @@ zmq::socket_poller_t::~socket_poller_t ()
size_t thread_safe_size = sizeof(int); size_t thread_safe_size = sizeof(int);
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe) if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe)
it->socket->remove_signaler (&signaler); it->socket->remove_signaler (signaler);
} }
} }
if (signaler != NULL) {
delete signaler;
signaler = NULL;
}
#if defined ZMQ_POLL_BASED_ON_POLL #if defined ZMQ_POLL_BASED_ON_POLL
if (pollfds) { if (pollfds) {
free (pollfds); free (pollfds);
...@@ -95,7 +101,10 @@ int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short e ...@@ -95,7 +101,10 @@ int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short e
return -1; return -1;
if (thread_safe) { if (thread_safe) {
if (socket_->add_signaler (&signaler) == -1) if (signaler == NULL)
signaler = new signaler_t ();
if (socket_->add_signaler (signaler) == -1)
return -1; return -1;
} }
...@@ -193,7 +202,7 @@ int zmq::socket_poller_t::remove (socket_base_t *socket_) ...@@ -193,7 +202,7 @@ int zmq::socket_poller_t::remove (socket_base_t *socket_)
size_t thread_safe_size = sizeof(int); size_t thread_safe_size = sizeof(int);
if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe) if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe)
socket_->remove_signaler (&signaler); socket_->remove_signaler (signaler);
return 0; return 0;
} }
...@@ -264,7 +273,7 @@ int zmq::socket_poller_t::rebuild () ...@@ -264,7 +273,7 @@ int zmq::socket_poller_t::rebuild ()
if (use_signaler) { if (use_signaler) {
item_nbr = 1; item_nbr = 1;
pollfds[0].fd = signaler.get_fd(); pollfds[0].fd = signaler->get_fd();
pollfds[0].events = POLLIN; pollfds[0].events = POLLIN;
} }
...@@ -323,7 +332,7 @@ int zmq::socket_poller_t::rebuild () ...@@ -323,7 +332,7 @@ int zmq::socket_poller_t::rebuild ()
if (thread_safe && it->events) { if (thread_safe && it->events) {
use_signaler = true; use_signaler = true;
FD_SET (signaler.get_fd (), &pollset_in); FD_SET (signaler->get_fd (), &pollset_in);
poll_size = 1; poll_size = 1;
break; break;
} }
...@@ -436,7 +445,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev ...@@ -436,7 +445,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
// Receive the signal from pollfd // Receive the signal from pollfd
if (use_signaler && pollfds[0].revents & POLLIN) if (use_signaler && pollfds[0].revents & POLLIN)
signaler.recv (); signaler->recv ();
// Check for the events. // Check for the events.
int found = 0; int found = 0;
...@@ -596,8 +605,8 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev ...@@ -596,8 +605,8 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
break; break;
} }
if (use_signaler && FD_ISSET (signaler.get_fd (), &inset)) if (use_signaler && FD_ISSET (signaler->get_fd (), &inset))
signaler.recv (); signaler->recv ();
// Check for the events. // Check for the events.
int found = 0; int found = 0;
......
...@@ -87,7 +87,7 @@ namespace zmq ...@@ -87,7 +87,7 @@ namespace zmq
uint32_t tag; uint32_t tag;
// Signaler used for thread safe sockets polling // Signaler used for thread safe sockets polling
signaler_t signaler; signaler_t* signaler;
typedef struct item_t { typedef struct item_t {
socket_base_t *socket; socket_base_t *socket;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment