Commit 7668b246 authored by Martin Sustrik's avatar Martin Sustrik

ZMQ_POLL option forces fd_signaler to be used in app_thread

parent 495a2228
...@@ -31,6 +31,8 @@ ...@@ -31,6 +31,8 @@
#include "app_thread.hpp" #include "app_thread.hpp"
#include "dispatcher.hpp" #include "dispatcher.hpp"
#include "fd_signaler.hpp"
#include "ypollset.hpp"
#include "err.hpp" #include "err.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "config.hpp" #include "config.hpp"
...@@ -52,16 +54,26 @@ zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_, ...@@ -52,16 +54,26 @@ zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_,
associated (false), associated (false),
last_processing_time (0) last_processing_time (0)
{ {
if (flags_ & ZMQ_POLL) {
signaler = new fd_signaler_t;
zmq_assert (signaler);
}
else {
signaler = new ypollset_t;
zmq_assert (signaler);
}
} }
zmq::app_thread_t::~app_thread_t () zmq::app_thread_t::~app_thread_t ()
{ {
zmq_assert (sockets.empty ()); zmq_assert (sockets.empty ());
zmq_assert (signaler);
delete signaler;
} }
zmq::i_signaler *zmq::app_thread_t::get_signaler () zmq::i_signaler *zmq::app_thread_t::get_signaler ()
{ {
return &pollset; return signaler;
} }
bool zmq::app_thread_t::is_current () bool zmq::app_thread_t::is_current ()
...@@ -86,7 +98,7 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_) ...@@ -86,7 +98,7 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_)
{ {
uint64_t signals; uint64_t signals;
if (block_) if (block_)
signals = pollset.poll (); signals = signaler->poll ();
else { else {
#if defined ZMQ_DELAY_COMMANDS #if defined ZMQ_DELAY_COMMANDS
...@@ -119,7 +131,7 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_) ...@@ -119,7 +131,7 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_)
#endif #endif
// Check whether there are any commands pending for this thread. // Check whether there are any commands pending for this thread.
signals = pollset.check (); signals = signaler->check ();
} }
if (signals) { if (signals) {
......
...@@ -24,7 +24,6 @@ ...@@ -24,7 +24,6 @@
#include "stdint.hpp" #include "stdint.hpp"
#include "object.hpp" #include "object.hpp"
#include "ypollset.hpp"
#include "thread.hpp" #include "thread.hpp"
namespace zmq namespace zmq
...@@ -40,7 +39,7 @@ namespace zmq ...@@ -40,7 +39,7 @@ namespace zmq
~app_thread_t (); ~app_thread_t ();
// Returns signaler associated with this application thread. // Returns signaler associated with this application thread.
i_signaler *get_signaler (); struct i_signaler *get_signaler ();
// Nota bene: Following two functions are accessed from different // Nota bene: Following two functions are accessed from different
// threads. The caller (dispatcher) is responsible for synchronisation // threads. The caller (dispatcher) is responsible for synchronisation
...@@ -79,7 +78,7 @@ namespace zmq ...@@ -79,7 +78,7 @@ namespace zmq
thread_t::id_t tid; thread_t::id_t tid;
// App thread's signaler object. // App thread's signaler object.
ypollset_t pollset; struct i_signaler *signaler;
// Timestamp of when commands were processed the last time. // Timestamp of when commands were processed the last time.
uint64_t last_processing_time; uint64_t last_processing_time;
......
...@@ -65,12 +65,18 @@ void zmq::fd_signaler_t::signal (int signal_) ...@@ -65,12 +65,18 @@ void zmq::fd_signaler_t::signal (int signal_)
errno_assert (sz == sizeof (uint64_t)); errno_assert (sz == sizeof (uint64_t));
} }
uint64_t zmq::fd_signaler_t::poll ()
{
// TODO: Can we do a blocking read on non-blocking eventfd?
// It's not needed as for now, so let it stay unimplemented.
zmq_assert (false);
}
uint64_t zmq::fd_signaler_t::check () uint64_t zmq::fd_signaler_t::check ()
{ {
uint64_t val; uint64_t val;
ssize_t sz = read (fd, &val, sizeof (uint64_t)); ssize_t sz = read (fd, &val, sizeof (uint64_t));
if (sz == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || if (sz == -1 && (errno == EAGAIN || errno == EINTR))
errno == EINTR))
return 0; return 0;
errno_assert (sz != -1); errno_assert (sz != -1);
return val; return val;
...@@ -120,7 +126,7 @@ zmq::fd_signaler_t::fd_signaler_t () ...@@ -120,7 +126,7 @@ zmq::fd_signaler_t::fd_signaler_t ()
rc = connect (w, (sockaddr *) &addr, sizeof (addr)); rc = connect (w, (sockaddr *) &addr, sizeof (addr));
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
// Accept connection from w // Accept connection from w.
r = accept (listener, NULL, NULL); r = accept (listener, NULL, NULL);
wsa_assert (r != INVALID_SOCKET); wsa_assert (r != INVALID_SOCKET);
...@@ -139,6 +145,9 @@ zmq::fd_signaler_t::~fd_signaler_t () ...@@ -139,6 +145,9 @@ zmq::fd_signaler_t::~fd_signaler_t ()
void zmq::fd_signaler_t::signal (int signal_) void zmq::fd_signaler_t::signal (int signal_)
{ {
// TODO: Note that send is a blocking operation.
// How should we behave if the signal cannot be written to the signaler?
zmq_assert (signal_ >= 0 && signal_ < 64); zmq_assert (signal_ >= 0 && signal_ < 64);
char c = (char) signal_; char c = (char) signal_;
int rc = send (w, &c, 1, 0); int rc = send (w, &c, 1, 0);
...@@ -154,7 +163,7 @@ uint64_t zmq::fd_signaler_t::poll () ...@@ -154,7 +163,7 @@ uint64_t zmq::fd_signaler_t::poll ()
// If there are no signals, wait until at least one signal arrives. // If there are no signals, wait until at least one signal arrives.
unsigned char sig; unsigned char sig;
int nbytes = recv (r, (char*) &sig, 1, MSG_WAITALL); int nbytes = recv (r, (char*) &sig, 1, 0);
win_assert (nbytes != -1); win_assert (nbytes != -1);
return uint64_t (1) << sig; return uint64_t (1) << sig;
} }
...@@ -162,7 +171,9 @@ uint64_t zmq::fd_signaler_t::poll () ...@@ -162,7 +171,9 @@ uint64_t zmq::fd_signaler_t::poll ()
uint64_t zmq::fd_signaler_t::check () uint64_t zmq::fd_signaler_t::check ()
{ {
unsigned char buffer [32]; unsigned char buffer [32];
int nbytes = recv (r, (char*) buffer, 32, 0); int nbytes = recv (r, (char*) buffer, 32, MSG_DONTWAIT);
if (nbytes == -1 && errno == EAGAIN)
return 0;
win_assert (nbytes != -1); win_assert (nbytes != -1);
uint64_t signals = 0; uint64_t signals = 0;
...@@ -190,13 +201,6 @@ zmq::fd_signaler_t::fd_signaler_t () ...@@ -190,13 +201,6 @@ zmq::fd_signaler_t::fd_signaler_t ()
errno_assert (rc == 0); errno_assert (rc == 0);
w = sv [0]; w = sv [0];
r = sv [1]; r = sv [1];
// Set to non-blocking mode.
int flags = fcntl (r, F_GETFL, 0);
if (flags == -1)
flags = 0;
rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
errno_assert (rc != -1);
} }
zmq::fd_signaler_t::~fd_signaler_t () zmq::fd_signaler_t::~fd_signaler_t ()
...@@ -207,6 +211,9 @@ zmq::fd_signaler_t::~fd_signaler_t () ...@@ -207,6 +211,9 @@ zmq::fd_signaler_t::~fd_signaler_t ()
void zmq::fd_signaler_t::signal (int signal_) void zmq::fd_signaler_t::signal (int signal_)
{ {
// TODO: Note that send is a blocking operation.
// How should we behave if the signal cannot be written to the signaler?
zmq_assert (signal_ >= 0 && signal_ < 64); zmq_assert (signal_ >= 0 && signal_ < 64);
unsigned char c = (unsigned char) signal_; unsigned char c = (unsigned char) signal_;
ssize_t nbytes = send (w, &c, 1, 0); ssize_t nbytes = send (w, &c, 1, 0);
...@@ -222,7 +229,7 @@ uint64_t zmq::fd_signaler_t::poll () ...@@ -222,7 +229,7 @@ uint64_t zmq::fd_signaler_t::poll ()
// If there are no signals, wait until at least one signal arrives. // If there are no signals, wait until at least one signal arrives.
unsigned char sig; unsigned char sig;
ssize_t nbytes = recv (r, &sig, 1, MSG_WAITALL); ssize_t nbytes = recv (r, &sig, 1, 0);
errno_assert (nbytes != -1); errno_assert (nbytes != -1);
return uint64_t (1) << sig; return uint64_t (1) << sig;
} }
...@@ -230,8 +237,10 @@ uint64_t zmq::fd_signaler_t::poll () ...@@ -230,8 +237,10 @@ uint64_t zmq::fd_signaler_t::poll ()
uint64_t zmq::fd_signaler_t::check () uint64_t zmq::fd_signaler_t::check ()
{ {
unsigned char buffer [32]; unsigned char buffer [32];
ssize_t nbytes = recv (r, buffer, 32, 0); ssize_t nbytes = recv (r, buffer, 32, MSG_DONTWAIT);
errno_assert (nbytes != -1); if (nbytes == -1 && errno == EAGAIN)
return 0;
zmq_assert (nbytes != -1);
uint64_t signals = 0; uint64_t signals = 0;
for (int pos = 0; pos != nbytes; pos ++) { for (int pos = 0; pos != nbytes; pos ++) {
......
...@@ -37,10 +37,7 @@ namespace zmq ...@@ -37,10 +37,7 @@ namespace zmq
{ {
public: public:
// Initialise the object.
fd_signaler_t (); fd_signaler_t ();
// Destroy the object.
~fd_signaler_t (); ~fd_signaler_t ();
// i_signaler interface implementation. // i_signaler interface implementation.
......
...@@ -37,11 +37,11 @@ namespace zmq ...@@ -37,11 +37,11 @@ namespace zmq
// Wait for signal. Returns a set of signals in form of a bitmap. // Wait for signal. Returns a set of signals in form of a bitmap.
// Signal with index 0 corresponds to value 1, index 1 to value 2, // Signal with index 0 corresponds to value 1, index 1 to value 2,
// index 2 to value 3 etc. // index 2 to value 3 etc.
uint64_t poll (); virtual uint64_t poll () = 0;
// Same as poll, however, if there is no signal available, // Same as poll, however, if there is no signal available,
// function returns zero immediately instead of waiting for a signal. // function returns zero immediately instead of waiting for a signal.
uint64_t check (); virtual uint64_t check () = 0;
}; };
} }
......
...@@ -23,6 +23,10 @@ zmq::ypollset_t::ypollset_t () ...@@ -23,6 +23,10 @@ zmq::ypollset_t::ypollset_t ()
{ {
} }
zmq::ypollset_t::~ypollset_t ()
{
}
void zmq::ypollset_t::signal (int signal_) void zmq::ypollset_t::signal (int signal_)
{ {
zmq_assert (signal_ >= 0 && signal_ < wait_signal); zmq_assert (signal_ >= 0 && signal_ < wait_signal);
......
...@@ -35,8 +35,8 @@ namespace zmq ...@@ -35,8 +35,8 @@ namespace zmq
{ {
public: public:
// Create the pollset.
ypollset_t (); ypollset_t ();
~ypollset_t ();
// i_signaler interface implementation. // i_signaler interface implementation.
void signal (int signal_); void signal (int signal_);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment