Commit 7572fee9 authored by Fabien Ninoles's avatar Fabien Ninoles Committed by Martin Sustrik

Optimise block/non-block switching in mailbox_t

For the platforms that don't support MSG_DONTWAIT the reader
socket in mailbox_t was kept in non-blocking state and flipped
to blocking state and back when blocking read was requested.
Now, the state is preserved between calls and flipped only
if different type of operation (block vs. non-block) is
requested.
Signed-off-by: 's avatarFabien Ninoles <fabien@tzone.org>
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent 8440de2b
...@@ -79,7 +79,8 @@ zmq::fd_t zmq::mailbox_t::get_fd () ...@@ -79,7 +79,8 @@ zmq::fd_t zmq::mailbox_t::get_fd ()
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
zmq::mailbox_t::mailbox_t () zmq::mailbox_t::mailbox_t () :
blocking (true)
{ {
// Create the socketpair for signalling. // Create the socketpair for signalling.
int rc = make_socketpair (&r, &w); int rc = make_socketpair (&r, &w);
...@@ -89,11 +90,6 @@ zmq::mailbox_t::mailbox_t () ...@@ -89,11 +90,6 @@ zmq::mailbox_t::mailbox_t ()
unsigned long argp = 1; unsigned long argp = 1;
rc = ioctlsocket (w, FIONBIO, &argp); rc = ioctlsocket (w, FIONBIO, &argp);
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
// Set the reader to non-blocking mode.
argp = 1;
rc = ioctlsocket (r, FIONBIO, &argp);
wsa_assert (rc != SOCKET_ERROR);
} }
zmq::mailbox_t::~mailbox_t () zmq::mailbox_t::~mailbox_t ()
...@@ -121,33 +117,18 @@ int zmq::mailbox_t::recv (command_t *cmd_, int timeout_) ...@@ -121,33 +117,18 @@ int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
if (timeout_ > 0) if (timeout_ > 0)
return recv_timeout (cmd_, timeout_); return recv_timeout (cmd_, timeout_);
// If required, set the reader to blocking mode. // If required, switch the reader to blocking or non-blocking mode.
if (timeout_ < 0) { if ((timeout_ < 0 && !blocking) || (timeout_ == 0 && blocking)) {
unsigned long argp = 0; blocking = (timeout_ < 0);
unsigned long argp = blocking ? 0 : 1;
int rc = ioctlsocket (r, FIONBIO, &argp); int rc = ioctlsocket (r, FIONBIO, &argp);
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
} }
// Attempt to read an entire command. Returns EAGAIN if non-blocking // Attempt to read an entire command.
// and a command is not available. Save value of errno if we wish to pass
// it to caller.
int err = 0;
int nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0); int nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0);
if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK)
err = EAGAIN;
// Re-set the reader to non-blocking mode.
if (timeout_ < 0) {
unsigned long argp = 1;
int rc = ioctlsocket (r, FIONBIO, &argp);
wsa_assert (rc != SOCKET_ERROR);
}
// If the recv failed, return with the saved errno.
if (err != 0) {
errno = err;
return -1; return -1;
}
// Sanity check for success. // Sanity check for success.
wsa_assert (nbytes != SOCKET_ERROR); wsa_assert (nbytes != SOCKET_ERROR);
...@@ -160,7 +141,8 @@ int zmq::mailbox_t::recv (command_t *cmd_, int timeout_) ...@@ -160,7 +141,8 @@ int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
#else #else
zmq::mailbox_t::mailbox_t () zmq::mailbox_t::mailbox_t () :
blocking (true)
{ {
#ifdef PIPE_BUF #ifdef PIPE_BUF
// Make sure that command can be written to the socket in atomic fashion. // Make sure that command can be written to the socket in atomic fashion.
...@@ -178,14 +160,6 @@ zmq::mailbox_t::mailbox_t () ...@@ -178,14 +160,6 @@ zmq::mailbox_t::mailbox_t ()
errno_assert (flags >= 0); errno_assert (flags >= 0);
rc = fcntl (w, F_SETFL, flags | O_NONBLOCK); rc = fcntl (w, F_SETFL, flags | O_NONBLOCK);
errno_assert (rc == 0); errno_assert (rc == 0);
#ifndef MSG_DONTWAIT
// Set the reader to non-blocking mode.
flags = fcntl (r, F_GETFL, 0);
errno_assert (flags >= 0);
rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
errno_assert (rc == 0);
#endif
} }
zmq::mailbox_t::~mailbox_t () zmq::mailbox_t::~mailbox_t ()
...@@ -250,35 +224,20 @@ int zmq::mailbox_t::recv (command_t *cmd_, int timeout_) ...@@ -250,35 +224,20 @@ int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
return -1; return -1;
#else #else
// If required, set the reader to blocking mode. // If required, switch the reader to blocking or non-blocking mode.
if (timeout_ < 0) { if ((timeout_ < 0 && !blocking) || (timeout_ == 0 && blocking)) {
blocking = (timeout_ < 0);
int flags = fcntl (r, F_GETFL, 0); int flags = fcntl (r, F_GETFL, 0);
errno_assert (flags >= 0); errno_assert (flags >= 0);
int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK); int rc = fcntl (r, F_SETFL,
blocking ? flags | O_NONBLOCK : flags & ~O_NONBLOCK);
errno_assert (rc == 0); errno_assert (rc == 0);
} }
// Attempt to read an entire command. Returns EAGAIN if non-blocking // Attempt to read an entire command.
// and a command is not available. Save value of errno if we wish to pass
// it to caller.
int err = 0;
ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0); ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);
if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))
err = errno;
// Re-set the reader to non-blocking mode.
if (timeout_ < 0) {
int flags = fcntl (r, F_GETFL, 0);
errno_assert (flags >= 0);
int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
errno_assert (rc == 0);
}
// If the recv failed, return with the saved errno if set.
if (err != 0) {
errno = err;
return -1; return -1;
}
#endif #endif
......
...@@ -45,10 +45,6 @@ namespace zmq ...@@ -45,10 +45,6 @@ namespace zmq
private: private:
// Write & read end of the socketpair.
fd_t w;
fd_t r;
// Platform-dependent function to create a socketpair. // Platform-dependent function to create a socketpair.
static int make_socketpair (fd_t *r_, fd_t *w_); static int make_socketpair (fd_t *r_, fd_t *w_);
...@@ -57,6 +53,14 @@ namespace zmq ...@@ -57,6 +53,14 @@ namespace zmq
// blocking recvs. // blocking recvs.
int recv_timeout (command_t *cmd_, int timeout_); int recv_timeout (command_t *cmd_, int timeout_);
// Write & read end of the socketpair.
fd_t w;
fd_t r;
// Used on platforms where there's no MSG_DONTWAIT functionality.
// True if the read socket is set to the blocking state.
bool blocking;
// Disable copying of mailbox_t object. // Disable copying of mailbox_t object.
mailbox_t (const mailbox_t&); mailbox_t (const mailbox_t&);
const mailbox_t &operator = (const mailbox_t&); const mailbox_t &operator = (const mailbox_t&);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment