Commit 3a76d246 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #348 from hurtonm/code_cleanup

Code cleanup
parents 5db30fe9 d0b9005e
...@@ -70,7 +70,7 @@ zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_, ...@@ -70,7 +70,7 @@ zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_,
} }
} }
assert (!fd_table [fd_].valid); zmq_assert (!fd_table [fd_].valid);
fd_table [fd_].events = 0; fd_table [fd_].events = 0;
fd_table [fd_].reactor = reactor_; fd_table [fd_].reactor = reactor_;
...@@ -88,7 +88,7 @@ zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_, ...@@ -88,7 +88,7 @@ zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_,
void zmq::devpoll_t::rm_fd (handle_t handle_) void zmq::devpoll_t::rm_fd (handle_t handle_)
{ {
assert (fd_table [handle_].valid); zmq_assert (fd_table [handle_].valid);
devpoll_ctl (handle_, POLLREMOVE); devpoll_ctl (handle_, POLLREMOVE);
fd_table [handle_].valid = false; fd_table [handle_].valid = false;
......
...@@ -214,46 +214,36 @@ void zmq::win_error (char *buffer_, size_t buffer_size_) ...@@ -214,46 +214,36 @@ void zmq::win_error (char *buffer_, size_t buffer_size_)
zmq_assert (rc); zmq_assert (rc);
} }
void zmq::wsa_error_to_errno () int zmq::wsa_error_to_errno (int errcode)
{ {
int errcode = WSAGetLastError ();
switch (errcode) { switch (errcode) {
case WSAEINPROGRESS: case WSAEINPROGRESS:
errno = EAGAIN; return EAGAIN;
return;
case WSAEBADF: case WSAEBADF:
errno = EBADF; return EBADF;
return;
case WSAEINVAL: case WSAEINVAL:
errno = EINVAL; return EINVAL;
return;
case WSAEMFILE: case WSAEMFILE:
errno = EMFILE; return EMFILE;
return;
case WSAEFAULT: case WSAEFAULT:
errno = EFAULT; return EFAULT;
return;
case WSAEPROTONOSUPPORT: case WSAEPROTONOSUPPORT:
errno = EPROTONOSUPPORT; return EPROTONOSUPPORT;
return;
case WSAENOBUFS: case WSAENOBUFS:
errno = ENOBUFS; return ENOBUFS;
return;
case WSAENETDOWN: case WSAENETDOWN:
errno = ENETDOWN; return ENETDOWN;
return;
case WSAEADDRINUSE: case WSAEADDRINUSE:
errno = EADDRINUSE; return EADDRINUSE;
return;
case WSAEADDRNOTAVAIL: case WSAEADDRNOTAVAIL:
errno = EADDRNOTAVAIL; return EADDRNOTAVAIL;
return;
case WSAEAFNOSUPPORT: case WSAEAFNOSUPPORT:
errno = EAFNOSUPPORT; return EAFNOSUPPORT;
return;
default: default:
wsa_assert (false); wsa_assert (false);
} }
// Not reachable
return 0;
} }
#endif #endif
...@@ -57,7 +57,7 @@ namespace zmq ...@@ -57,7 +57,7 @@ namespace zmq
const char *wsa_error (); const char *wsa_error ();
const char *wsa_error_no (int no_); const char *wsa_error_no (int no_);
void win_error (char *buffer_, size_t buffer_size_); void win_error (char *buffer_, size_t buffer_size_);
void wsa_error_to_errno (); int wsa_error_to_errno (int errcode);
} }
// Provides convenient way to check WSA-style errors on Windows. // Provides convenient way to check WSA-style errors on Windows.
......
...@@ -37,9 +37,6 @@ namespace zmq ...@@ -37,9 +37,6 @@ namespace zmq
virtual void plug (zmq::io_thread_t *io_thread_, virtual void plug (zmq::io_thread_t *io_thread_,
class session_base_t *session_) = 0; class session_base_t *session_) = 0;
// Unplug the engine from the session.
virtual void unplug () = 0;
// Terminate and deallocate the engine. Note that 'detached' // Terminate and deallocate the engine. Note that 'detached'
// events are not fired on termination. // events are not fired on termination.
virtual void terminate () = 0; virtual void terminate () = 0;
......
...@@ -59,7 +59,6 @@ namespace zmq ...@@ -59,7 +59,6 @@ namespace zmq
// i_engine interface implementation. // i_engine interface implementation.
void plug (zmq::io_thread_t *io_thread_, void plug (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_); zmq::session_base_t *session_);
void unplug ();
void terminate (); void terminate ();
void activate_in (); void activate_in ();
void activate_out (); void activate_out ();
...@@ -70,6 +69,9 @@ namespace zmq ...@@ -70,6 +69,9 @@ namespace zmq
private: private:
// Unplug the engine from the session.
void unplug ();
// PGM is not able to move subscriptions upstream. Thus, drop all // PGM is not able to move subscriptions upstream. Thus, drop all
// the pending subscriptions. // the pending subscriptions.
void drop_subscriptions (); void drop_subscriptions ();
......
...@@ -57,7 +57,6 @@ namespace zmq ...@@ -57,7 +57,6 @@ namespace zmq
// i_engine interface implementation. // i_engine interface implementation.
void plug (zmq::io_thread_t *io_thread_, void plug (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_); zmq::session_base_t *session_);
void unplug ();
void terminate (); void terminate ();
void activate_in (); void activate_in ();
void activate_out (); void activate_out ();
...@@ -69,6 +68,9 @@ namespace zmq ...@@ -69,6 +68,9 @@ namespace zmq
private: private:
// Unplug the engine from the session.
void unplug ();
// TX and RX timeout timer ID's. // TX and RX timeout timer ID's.
enum {tx_timer_id = 0xa0, rx_timer_id = 0xa1}; enum {tx_timer_id = 0xa0, rx_timer_id = 0xa1};
......
...@@ -454,7 +454,7 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) ...@@ -454,7 +454,7 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
// We have to write all data as one packet. // We have to write all data as one packet.
if (nbytes > 0) { if (nbytes > 0) {
zmq_assert (status == PGM_IO_STATUS_NORMAL); zmq_assert (status == PGM_IO_STATUS_NORMAL);
zmq_assert ((ssize_t) nbytes == (ssize_t) data_len_); zmq_assert (nbytes == data_len_);
} else { } else {
zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED || zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED ||
status == PGM_IO_STATUS_WOULD_BLOCK); status == PGM_IO_STATUS_WOULD_BLOCK);
......
...@@ -57,7 +57,7 @@ zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) ...@@ -57,7 +57,7 @@ zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
pollfd pfd = {fd_, 0, 0}; pollfd pfd = {fd_, 0, 0};
pollset.push_back (pfd); pollset.push_back (pfd);
assert (fd_table [fd_].index == retired_fd); zmq_assert (fd_table [fd_].index == retired_fd);
fd_table [fd_].index = pollset.size() - 1; fd_table [fd_].index = pollset.size() - 1;
fd_table [fd_].events = events_; fd_table [fd_].events = events_;
...@@ -71,7 +71,7 @@ zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) ...@@ -71,7 +71,7 @@ zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
void zmq::poll_t::rm_fd (handle_t handle_) void zmq::poll_t::rm_fd (handle_t handle_)
{ {
fd_t index = fd_table [handle_].index; fd_t index = fd_table [handle_].index;
assert (index != retired_fd); zmq_assert (index != retired_fd);
// Mark the fd as unused. // Mark the fd as unused.
pollset [index].fd = retired_fd; pollset [index].fd = retired_fd;
......
...@@ -219,8 +219,8 @@ int zmq::router_t::xrecv (msg_t *msg_, int flags_) ...@@ -219,8 +219,8 @@ int zmq::router_t::xrecv (msg_t *msg_, int flags_)
} }
// Identity is not expected // Identity is not expected
assert ((msg_->flags () & msg_t::identity) == 0); zmq_assert ((msg_->flags () & msg_t::identity) == 0);
assert (pipe != NULL); zmq_assert (pipe != NULL);
// If we are in the middle of reading a message, just return the next part. // If we are in the middle of reading a message, just return the next part.
if (more_in) if (more_in)
...@@ -273,7 +273,7 @@ bool zmq::router_t::xhas_in () ...@@ -273,7 +273,7 @@ bool zmq::router_t::xhas_in ()
return false; return false;
// Identity is not expected // Identity is not expected
assert ((prefetched_msg.flags () & msg_t::identity) == 0); zmq_assert ((prefetched_msg.flags () & msg_t::identity) == 0);
blob_t identity = pipe->get_identity (); blob_t identity = pipe->get_identity ();
rc = prefetched_id.init_size (identity.size ()); rc = prefetched_id.init_size (identity.size ());
......
...@@ -280,13 +280,7 @@ void zmq::session_base_t::process_plug () ...@@ -280,13 +280,7 @@ void zmq::session_base_t::process_plug ()
void zmq::session_base_t::process_attach (i_engine *engine_) void zmq::session_base_t::process_attach (i_engine *engine_)
{ {
// If some other object (e.g. init) notifies us that the connection failed zmq_assert (engine_ != NULL);
// without creating an engine we need to start the reconnection process.
if (!engine_) {
zmq_assert (!engine);
detached ();
return;
}
// Create the pipe if it does not exist yet. // Create the pipe if it does not exist yet.
if (!pipe && !is_terminating ()) { if (!pipe && !is_terminating ()) {
......
...@@ -52,7 +52,6 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) : ...@@ -52,7 +52,6 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
outsize (0), outsize (0),
encoder (out_batch_size), encoder (out_batch_size),
session (NULL), session (NULL),
leftover_session (NULL),
options (options_), options (options_),
plugged (false) plugged (false)
{ {
...@@ -109,7 +108,6 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, ...@@ -109,7 +108,6 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
{ {
zmq_assert (!plugged); zmq_assert (!plugged);
plugged = true; plugged = true;
leftover_session = NULL;
// Connect to session object. // Connect to session object.
zmq_assert (!session); zmq_assert (!session);
...@@ -144,7 +142,6 @@ void zmq::stream_engine_t::unplug () ...@@ -144,7 +142,6 @@ void zmq::stream_engine_t::unplug ()
// Disconnect from session object. // Disconnect from session object.
encoder.set_session (NULL); encoder.set_session (NULL);
decoder.set_session (NULL); decoder.set_session (NULL);
leftover_session = session;
session = NULL; session = NULL;
endpoint.clear(); endpoint.clear();
} }
...@@ -185,12 +182,8 @@ void zmq::stream_engine_t::in_event () ...@@ -185,12 +182,8 @@ void zmq::stream_engine_t::in_event ()
else { else {
// Stop polling for input if we got stuck. // Stop polling for input if we got stuck.
if (processed < insize) { if (processed < insize)
// This may happen if queue limits are in effect.
if (plugged)
reset_pollin (handle); reset_pollin (handle);
}
// Adjust the buffer. // Adjust the buffer.
inpos += processed; inpos += processed;
...@@ -198,20 +191,14 @@ void zmq::stream_engine_t::in_event () ...@@ -198,20 +191,14 @@ void zmq::stream_engine_t::in_event ()
} }
// Flush all messages the decoder may have produced. // Flush all messages the decoder may have produced.
// If IO handler has unplugged engine, flush transient IO handler.
if (unlikely (!plugged)) {
zmq_assert (leftover_session);
leftover_session->flush ();
} else {
session->flush (); session->flush ();
}
// Input error has occurred. If the last decoded // Input error has occurred. If the last decoded
// message has already been accepted, we terminate // message has already been accepted, we terminate
// the engine immediately. Otherwise, we stop // the engine immediately. Otherwise, we stop
// waiting for input events and postpone the termination // waiting for input events and postpone the termination
// until after the session has accepted the message. // until after the session has accepted the message.
if (session != NULL && disconnection) { if (disconnection) {
input_error = true; input_error = true;
if (decoder.stalled ()) if (decoder.stalled ())
reset_pollin (handle); reset_pollin (handle);
...@@ -228,13 +215,6 @@ void zmq::stream_engine_t::out_event () ...@@ -228,13 +215,6 @@ void zmq::stream_engine_t::out_event ()
outpos = NULL; outpos = NULL;
encoder.get_data (&outpos, &outsize); encoder.get_data (&outpos, &outsize);
// If IO handler has unplugged engine, flush transient IO handler.
if (unlikely (!plugged)) {
zmq_assert (leftover_session);
leftover_session->flush ();
return;
}
// If there is no data to send, stop polling for output. // If there is no data to send, stop polling for output.
if (outsize == 0) { if (outsize == 0) {
reset_pollout (handle); reset_pollout (handle);
...@@ -312,7 +292,7 @@ int zmq::stream_engine_t::write (const void *data_, size_t size_) ...@@ -312,7 +292,7 @@ int zmq::stream_engine_t::write (const void *data_, size_t size_)
return 0; return 0;
// Signalise peer failure. // Signalise peer failure.
if (nbytes == -1 && ( if (nbytes == SOCKET_ERROR && (
WSAGetLastError () == WSAENETDOWN || WSAGetLastError () == WSAENETDOWN ||
WSAGetLastError () == WSAENETRESET || WSAGetLastError () == WSAENETRESET ||
WSAGetLastError () == WSAEHOSTUNREACH || WSAGetLastError () == WSAEHOSTUNREACH ||
...@@ -322,7 +302,7 @@ int zmq::stream_engine_t::write (const void *data_, size_t size_) ...@@ -322,7 +302,7 @@ int zmq::stream_engine_t::write (const void *data_, size_t size_)
return -1; return -1;
wsa_assert (nbytes != SOCKET_ERROR); wsa_assert (nbytes != SOCKET_ERROR);
return (size_t) nbytes; return nbytes;
#else #else
...@@ -358,7 +338,7 @@ int zmq::stream_engine_t::read (void *data_, size_t size_) ...@@ -358,7 +338,7 @@ int zmq::stream_engine_t::read (void *data_, size_t size_)
return 0; return 0;
// Connection failure. // Connection failure.
if (nbytes == -1 && ( if (nbytes == SOCKET_ERROR && (
WSAGetLastError () == WSAENETDOWN || WSAGetLastError () == WSAENETDOWN ||
WSAGetLastError () == WSAENETRESET || WSAGetLastError () == WSAENETRESET ||
WSAGetLastError () == WSAECONNABORTED || WSAGetLastError () == WSAECONNABORTED ||
...@@ -374,7 +354,7 @@ int zmq::stream_engine_t::read (void *data_, size_t size_) ...@@ -374,7 +354,7 @@ int zmq::stream_engine_t::read (void *data_, size_t size_)
if (nbytes == 0) if (nbytes == 0)
return -1; return -1;
return (size_t) nbytes; return nbytes;
#else #else
......
...@@ -51,7 +51,6 @@ namespace zmq ...@@ -51,7 +51,6 @@ namespace zmq
// i_engine interface implementation. // i_engine interface implementation.
void plug (zmq::io_thread_t *io_thread_, void plug (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_); zmq::session_base_t *session_);
void unplug ();
void terminate (); void terminate ();
void activate_in (); void activate_in ();
void activate_out (); void activate_out ();
...@@ -62,6 +61,9 @@ namespace zmq ...@@ -62,6 +61,9 @@ namespace zmq
private: private:
// Unplug the engine from the session.
void unplug ();
// Function to handle network disconnections. // Function to handle network disconnections.
void error (); void error ();
......
...@@ -193,7 +193,7 @@ int zmq::tcp_connecter_t::open () ...@@ -193,7 +193,7 @@ int zmq::tcp_connecter_t::open ()
s = open_socket (addr->resolved.tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP); s = open_socket (addr->resolved.tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
if (s == INVALID_SOCKET) { if (s == INVALID_SOCKET) {
wsa_error_to_errno (); errno = wsa_error_to_errno (WSAGetLastError ());
return -1; return -1;
} }
#else #else
...@@ -218,20 +218,17 @@ int zmq::tcp_connecter_t::open () ...@@ -218,20 +218,17 @@ int zmq::tcp_connecter_t::open ()
if (rc == 0) if (rc == 0)
return 0; return 0;
// Translate other error codes indicating asynchronous connect has been // Translate error codes indicating asynchronous connect has been
// launched to a uniform EINPROGRESS. // launched to a uniform EINPROGRESS.
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
if (rc == SOCKET_ERROR && (WSAGetLastError () == WSAEINPROGRESS || const int error_code = WSAGetLastError ();
WSAGetLastError () == WSAEWOULDBLOCK)) { if (error_code == WSAEINPROGRESS || error_code == WSAEWOULDBLOCK)
errno = EINPROGRESS; errno = EINPROGRESS;
return -1; else
} errno = wsa_error_to_errno (error_code);
wsa_error_to_errno ();
#else #else
if (rc == -1 && errno == EINTR) { if (errno == EINTR)
errno = EINPROGRESS; errno = EINPROGRESS;
return -1;
}
#endif #endif
return -1; return -1;
} }
......
...@@ -156,7 +156,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_) ...@@ -156,7 +156,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
s = open_socket (address.family (), SOCK_STREAM, IPPROTO_TCP); s = open_socket (address.family (), SOCK_STREAM, IPPROTO_TCP);
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
if (s == INVALID_SOCKET) if (s == INVALID_SOCKET)
wsa_error_to_errno (); errno = wsa_error_to_errno (WSAGetLastError ());
#endif #endif
// IPv6 address family not supported, try automatic downgrade to IPv4. // IPv6 address family not supported, try automatic downgrade to IPv4.
...@@ -170,7 +170,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_) ...@@ -170,7 +170,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
if (s == INVALID_SOCKET) { if (s == INVALID_SOCKET) {
wsa_error_to_errno (); errno = wsa_error_to_errno (WSAGetLastError ());
return -1; return -1;
} }
// On Windows, preventing sockets to be inherited by child processes. // On Windows, preventing sockets to be inherited by child processes.
...@@ -203,7 +203,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_) ...@@ -203,7 +203,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
rc = bind (s, address.addr (), address.addrlen ()); rc = bind (s, address.addr (), address.addrlen ());
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
if (rc == SOCKET_ERROR) { if (rc == SOCKET_ERROR) {
wsa_error_to_errno (); errno = wsa_error_to_errno (WSAGetLastError ());
return -1; return -1;
} }
#else #else
...@@ -215,7 +215,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_) ...@@ -215,7 +215,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
rc = listen (s, options.backlog); rc = listen (s, options.backlog);
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
if (rc == SOCKET_ERROR) { if (rc == SOCKET_ERROR) {
wsa_error_to_errno (); errno = wsa_error_to_errno (WSAGetLastError ());
return -1; return -1;
} }
#else #else
......
...@@ -853,17 +853,15 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -853,17 +853,15 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
int rc = select (0, &inset, &outset, &errset, ptimeout); int rc = select (0, &inset, &outset, &errset, ptimeout);
if (unlikely (rc == SOCKET_ERROR)) { if (unlikely (rc == SOCKET_ERROR)) {
zmq::wsa_error_to_errno (); errno = zmq::wsa_error_to_errno (WSAGetLastError ());
if (errno == ENOTSOCK) wsa_assert (errno == ENOTSOCK);
return -1; return -1;
wsa_assert (false);
} }
#else #else
int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout); int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
if (unlikely (rc == -1)) { if (unlikely (rc == -1)) {
if (errno == EINTR || errno == EBADF) errno_assert (errno == EINTR || errno == EBADF);
return -1; return -1;
errno_assert (false);
} }
#endif #endif
break; break;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment