Commit 9c118866 authored by Martin Sustrik's avatar Martin Sustrik

The flag in the socket has clear semantics now -- it tracks whether…

The flag in the socket has clear semantics now -- it tracks whether corresponding context was closed, it doesn't track whether zmq_close was called on the socket itself
parent 4d51a528
...@@ -107,7 +107,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, ...@@ -107,7 +107,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) : zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) :
own_t (parent_, slot_), own_t (parent_, slot_),
zombie (false), ctx_terminated (false),
destroyed (false), destroyed (false),
last_processing_time (0), last_processing_time (0),
ticks (0), ticks (0),
...@@ -117,7 +117,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) : ...@@ -117,7 +117,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) :
zmq::socket_base_t::~socket_base_t () zmq::socket_base_t::~socket_base_t ()
{ {
zmq_assert (zombie && destroyed); zmq_assert (destroyed);
// Check whether there are no session leaks. // Check whether there are no session leaks.
sessions_sync.lock (); sessions_sync.lock ();
...@@ -196,7 +196,7 @@ void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_, ...@@ -196,7 +196,7 @@ void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
int zmq::socket_base_t::setsockopt (int option_, const void *optval_, int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
if (unlikely (zombie)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
return -1; return -1;
} }
...@@ -214,7 +214,7 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, ...@@ -214,7 +214,7 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
int zmq::socket_base_t::getsockopt (int option_, void *optval_, int zmq::socket_base_t::getsockopt (int option_, void *optval_,
size_t *optvallen_) size_t *optvallen_)
{ {
if (unlikely (zombie)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
return -1; return -1;
} }
...@@ -262,7 +262,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, ...@@ -262,7 +262,7 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
int zmq::socket_base_t::bind (const char *addr_) int zmq::socket_base_t::bind (const char *addr_)
{ {
if (unlikely (zombie)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
return -1; return -1;
} }
...@@ -324,7 +324,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -324,7 +324,7 @@ int zmq::socket_base_t::bind (const char *addr_)
int zmq::socket_base_t::connect (const char *addr_) int zmq::socket_base_t::connect (const char *addr_)
{ {
if (unlikely (zombie)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
return -1; return -1;
} }
...@@ -432,7 +432,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -432,7 +432,7 @@ int zmq::socket_base_t::connect (const char *addr_)
int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
{ {
if (unlikely (zombie)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
return -1; return -1;
} }
...@@ -470,7 +470,7 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) ...@@ -470,7 +470,7 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
{ {
if (unlikely (zombie)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
return -1; return -1;
} }
...@@ -543,12 +543,6 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) ...@@ -543,12 +543,6 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
int zmq::socket_base_t::close () int zmq::socket_base_t::close ()
{ {
zmq_assert (!zombie);
// Socket becomes a zombie. From now on all new arrived pipes (bind
// command) are immediately terminated.
zombie = true;
// Start termination of associated I/O object hierarchy. // Start termination of associated I/O object hierarchy.
terminate (); terminate ();
...@@ -608,8 +602,6 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_) ...@@ -608,8 +602,6 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_)
bool zmq::socket_base_t::dezombify () bool zmq::socket_base_t::dezombify ()
{ {
zmq_assert (zombie);
// Process any commands from other threads/sockets that may be available // Process any commands from other threads/sockets that may be available
// at the moment. Ultimately, socket will be destroyed. // at the moment. Ultimately, socket will be destroyed.
process_commands (false, false); process_commands (false, false);
...@@ -656,14 +648,9 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) ...@@ -656,14 +648,9 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
#error #error
#endif #endif
// Check whether certain time have elapsed since last command // Check whether certain time have elapsed since last command
// processing. // processing. If it didn't do nothing.
if (current_time - last_processing_time <= max_command_delay) { if (current_time - last_processing_time <= max_command_delay)
// No command was processed, so the socket should
// not get into the zombie state.
zmq_assert (!zombie);
return 0; return 0;
}
last_processing_time = current_time; last_processing_time = current_time;
} }
#endif #endif
...@@ -683,7 +670,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) ...@@ -683,7 +670,7 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
rc = signaler.recv (&cmd, false); rc = signaler.recv (&cmd, false);
} }
if (zombie) { if (ctx_terminated) {
errno = ETERM; errno = ETERM;
return -1; return -1;
} }
...@@ -694,10 +681,10 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_) ...@@ -694,10 +681,10 @@ int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
void zmq::socket_base_t::process_stop () void zmq::socket_base_t::process_stop ()
{ {
// Here, someone have called zmq_term while the socket was still alive. // Here, someone have called zmq_term while the socket was still alive.
// We'll zombify it so that any blocking call is interrupted and any // We'll remember the fact so that any blocking call is interrupted and any
// further attempt to use the socket will return ETERM. The user is still // further attempt to use the socket will return ETERM. The user is still
// responsible for calling zmq_close on the socket though! // responsible for calling zmq_close on the socket though!
zombie = true; ctx_terminated = true;
} }
void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
...@@ -712,8 +699,6 @@ void zmq::socket_base_t::process_unplug () ...@@ -712,8 +699,6 @@ void zmq::socket_base_t::process_unplug ()
void zmq::socket_base_t::process_term () void zmq::socket_base_t::process_term ()
{ {
zmq_assert (zombie);
// Unregister all inproc endpoints associated with this socket. // Unregister all inproc endpoints associated with this socket.
// Doing this we make sure that no new pipes from other sockets (inproc) // Doing this we make sure that no new pipes from other sockets (inproc)
// will be initiated. // will be initiated.
......
...@@ -124,10 +124,8 @@ namespace zmq ...@@ -124,10 +124,8 @@ namespace zmq
private: private:
// If true, socket was already closed but not yet deallocated // If true, associated context was already terminated.
// because either shutdown is in process or there are still pipes bool ctx_terminated;
// attached to the socket.
bool zombie;
// If true, object should have been already destroyed. However, // If true, object should have been already destroyed. However,
// destruction is delayed while we unwind the stack to the point // destruction is delayed while we unwind the stack to the point
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment