Commit fba28c7c authored by Martin Sustrik's avatar Martin Sustrik

issue 1 - Change zmq_term semantics

parent dff79d77
...@@ -83,6 +83,7 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch); ...@@ -83,6 +83,7 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
#define EMTHREAD (ZMQ_HAUSNUMERO + 50) #define EMTHREAD (ZMQ_HAUSNUMERO + 50)
#define EFSM (ZMQ_HAUSNUMERO + 51) #define EFSM (ZMQ_HAUSNUMERO + 51)
#define ENOCOMPATPROTO (ZMQ_HAUSNUMERO + 52) #define ENOCOMPATPROTO (ZMQ_HAUSNUMERO + 52)
#define ETERM (ZMQ_HAUSNUMERO + 53)
/* This function retrieves the errno as it is known to 0MQ library. The goal */ /* This function retrieves the errno as it is known to 0MQ library. The goal */
/* of this function is to make the code 100% portable, including where 0MQ */ /* of this function is to make the code 100% portable, including where 0MQ */
......
...@@ -62,7 +62,8 @@ ...@@ -62,7 +62,8 @@
zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_, zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_,
int flags_) : int flags_) :
object_t (dispatcher_, thread_slot_), object_t (dispatcher_, thread_slot_),
last_processing_time (0) last_processing_time (0),
terminated (false)
{ {
if (flags_ & ZMQ_POLL) { if (flags_ & ZMQ_POLL) {
signaler = new (std::nothrow) fd_signaler_t; signaler = new (std::nothrow) fd_signaler_t;
...@@ -81,12 +82,17 @@ zmq::app_thread_t::~app_thread_t () ...@@ -81,12 +82,17 @@ zmq::app_thread_t::~app_thread_t ()
delete signaler; delete signaler;
} }
void zmq::app_thread_t::stop ()
{
send_stop ();
}
zmq::i_signaler *zmq::app_thread_t::get_signaler () zmq::i_signaler *zmq::app_thread_t::get_signaler ()
{ {
return signaler; return signaler;
} }
void zmq::app_thread_t::process_commands (bool block_, bool throttle_) bool zmq::app_thread_t::process_commands (bool block_, bool throttle_)
{ {
uint64_t signals; uint64_t signals;
if (block_) if (block_)
...@@ -117,7 +123,7 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_) ...@@ -117,7 +123,7 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_)
// Check whether certain time have elapsed since last command // Check whether certain time have elapsed since last command
// processing. // processing.
if (current_time - last_processing_time <= max_command_delay) if (current_time - last_processing_time <= max_command_delay)
return; return !terminated;
last_processing_time = current_time; last_processing_time = current_time;
} }
#endif #endif
...@@ -138,6 +144,8 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_) ...@@ -138,6 +144,8 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_)
} }
} }
} }
return !terminated;
} }
zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
...@@ -190,3 +198,14 @@ void zmq::app_thread_t::remove_socket (socket_base_t *socket_) ...@@ -190,3 +198,14 @@ void zmq::app_thread_t::remove_socket (socket_base_t *socket_)
if (sockets.empty ()) if (sockets.empty ())
dispatcher->no_sockets (this); dispatcher->no_sockets (this);
} }
void zmq::app_thread_t::process_stop ()
{
terminated = true;
}
bool zmq::app_thread_t::is_terminated ()
{
return terminated;
}
...@@ -38,14 +38,19 @@ namespace zmq ...@@ -38,14 +38,19 @@ namespace zmq
~app_thread_t (); ~app_thread_t ();
// Interrupt blocking call if the app thread is stuck in one.
// This function is is called from a different thread!
void stop ();
// Returns signaler associated with this application thread. // Returns signaler associated with this application thread.
struct i_signaler *get_signaler (); struct i_signaler *get_signaler ();
// Processes commands sent to this thread (if any). If 'block' is // Processes commands sent to this thread (if any). If 'block' is
// set to true, returns only after at least one command was processed. // set to true, returns only after at least one command was processed.
// If throttle argument is true, commands are processed at most once // If throttle argument is true, commands are processed at most once
// in a predefined time period. // in a predefined time period. The function returns false is the
void process_commands (bool block_, bool throttle_); // associated context was terminated, true otherwise.
bool process_commands (bool block_, bool throttle_);
// Create a socket of a specified type. // Create a socket of a specified type.
class socket_base_t *create_socket (int type_); class socket_base_t *create_socket (int type_);
...@@ -53,8 +58,14 @@ namespace zmq ...@@ -53,8 +58,14 @@ namespace zmq
// Unregister the socket from the app_thread (called by socket itself). // Unregister the socket from the app_thread (called by socket itself).
void remove_socket (class socket_base_t *socket_); void remove_socket (class socket_base_t *socket_);
// Returns true is the associated context was already terminated.
bool is_terminated ();
private: private:
// Command handlers.
void process_stop ();
// All the sockets created from this application thread. // All the sockets created from this application thread.
typedef yarray_t <socket_base_t> sockets_t; typedef yarray_t <socket_base_t> sockets_t;
sockets_t sockets; sockets_t sockets;
...@@ -65,6 +76,9 @@ namespace zmq ...@@ -65,6 +76,9 @@ namespace zmq
// Timestamp of when commands were processed the last time. // Timestamp of when commands were processed the last time.
uint64_t last_processing_time; uint64_t last_processing_time;
// If true, 'stop' command was already received.
bool terminated;
app_thread_t (const app_thread_t&); app_thread_t (const app_thread_t&);
void operator = (const app_thread_t&); void operator = (const app_thread_t&);
}; };
......
...@@ -86,12 +86,19 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_, ...@@ -86,12 +86,19 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
int zmq::dispatcher_t::term () int zmq::dispatcher_t::term ()
{ {
// First send stop command to application threads so that any
// blocking calls are interrupted.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
app_threads [i].app_thread->stop ();
// Then mark context as terminated.
term_sync.lock (); term_sync.lock ();
zmq_assert (!terminated); zmq_assert (!terminated);
terminated = true; terminated = true;
bool destroy = (sockets == 0); bool destroy = (sockets == 0);
term_sync.unlock (); term_sync.unlock ();
// If there are no sockets open, destroy the context immediately.
if (destroy) if (destroy)
delete this; delete this;
......
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
#include "platform.hpp" #include "platform.hpp"
#include "pgm_sender.hpp" #include "pgm_sender.hpp"
#include "pgm_receiver.hpp" #include "pgm_receiver.hpp"
#include "likely.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_), object_t (parent_),
...@@ -58,6 +59,11 @@ zmq::socket_base_t::~socket_base_t () ...@@ -58,6 +59,11 @@ zmq::socket_base_t::~socket_base_t ()
int zmq::socket_base_t::setsockopt (int option_, const void *optval_, int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
if (unlikely (app_thread->is_terminated ())) {
errno = ETERM;
return -1;
}
// First, check whether specific socket type overloads the option. // First, check whether specific socket type overloads the option.
int rc = xsetsockopt (option_, optval_, optvallen_); int rc = xsetsockopt (option_, optval_, optvallen_);
if (rc == 0 || errno != EINVAL) if (rc == 0 || errno != EINVAL)
...@@ -71,6 +77,11 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, ...@@ -71,6 +77,11 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
int zmq::socket_base_t::getsockopt (int option_, void *optval_, int zmq::socket_base_t::getsockopt (int option_, void *optval_,
size_t *optvallen_) size_t *optvallen_)
{ {
if (unlikely (app_thread->is_terminated ())) {
errno = ETERM;
return -1;
}
if (option_ == ZMQ_RCVMORE) { if (option_ == ZMQ_RCVMORE) {
if (*optvallen_ < sizeof (int64_t)) { if (*optvallen_ < sizeof (int64_t)) {
errno = EINVAL; errno = EINVAL;
...@@ -86,6 +97,11 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, ...@@ -86,6 +97,11 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
int zmq::socket_base_t::bind (const char *addr_) int zmq::socket_base_t::bind (const char *addr_)
{ {
if (unlikely (app_thread->is_terminated ())) {
errno = ETERM;
return -1;
}
// Parse addr_ string. // Parse addr_ string.
std::string addr_type; std::string addr_type;
std::string addr_args; std::string addr_args;
...@@ -141,6 +157,11 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -141,6 +157,11 @@ int zmq::socket_base_t::bind (const char *addr_)
int zmq::socket_base_t::connect (const char *addr_) int zmq::socket_base_t::connect (const char *addr_)
{ {
if (unlikely (app_thread->is_terminated ())) {
errno = ETERM;
return -1;
}
// Parse addr_ string. // Parse addr_ string.
std::string addr_type; std::string addr_type;
std::string addr_args; std::string addr_args;
...@@ -328,13 +349,16 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -328,13 +349,16 @@ int zmq::socket_base_t::connect (const char *addr_)
int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
{ {
// Process pending commands, if any.
if (unlikely (!app_thread->process_commands (false, true))) {
errno = ETERM;
return -1;
}
// At this point we impose the MORE flag on the message. // At this point we impose the MORE flag on the message.
if (flags_ & ZMQ_SNDMORE) if (flags_ & ZMQ_SNDMORE)
msg_->flags |= ZMQ_MSG_MORE; msg_->flags |= ZMQ_MSG_MORE;
// Process pending commands, if any.
app_thread->process_commands (false, true);
// Try to send the message. // Try to send the message.
int rc = xsend (msg_, flags_); int rc = xsend (msg_, flags_);
if (rc == 0) if (rc == 0)
...@@ -350,7 +374,10 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) ...@@ -350,7 +374,10 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
while (rc != 0) { while (rc != 0) {
if (errno != EAGAIN) if (errno != EAGAIN)
return -1; return -1;
app_thread->process_commands (true, false); if (unlikely (!app_thread->process_commands (true, false))) {
errno = ETERM;
return -1;
}
rc = xsend (msg_, flags_); rc = xsend (msg_, flags_);
} }
return 0; return 0;
...@@ -371,7 +398,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) ...@@ -371,7 +398,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
// described above) from the one used by 'send'. This is because counting // described above) from the one used by 'send'. This is because counting
// ticks is more efficient than doing rdtsc all the time. // ticks is more efficient than doing rdtsc all the time.
if (++ticks == inbound_poll_rate) { if (++ticks == inbound_poll_rate) {
app_thread->process_commands (false, false); if (unlikely (!app_thread->process_commands (false, false))) {
errno = ETERM;
return -1;
}
ticks = 0; ticks = 0;
} }
...@@ -392,7 +422,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) ...@@ -392,7 +422,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
if (flags_ & ZMQ_NOBLOCK) { if (flags_ & ZMQ_NOBLOCK) {
if (errno != EAGAIN) if (errno != EAGAIN)
return -1; return -1;
app_thread->process_commands (false, false); if (unlikely (!app_thread->process_commands (false, false))) {
errno = ETERM;
return -1;
}
ticks = 0; ticks = 0;
return xrecv (msg_, flags_); return xrecv (msg_, flags_);
} }
...@@ -402,7 +435,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) ...@@ -402,7 +435,10 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
while (rc != 0) { while (rc != 0) {
if (errno != EAGAIN) if (errno != EAGAIN)
return -1; return -1;
app_thread->process_commands (true, false); if (unlikely (!app_thread->process_commands (true, false))) {
errno = ETERM;
return -1;
}
rc = xrecv (msg_, flags_); rc = xrecv (msg_, flags_);
ticks = 0; ticks = 0;
} }
......
...@@ -88,6 +88,8 @@ const char *zmq_strerror (int errnum_) ...@@ -88,6 +88,8 @@ const char *zmq_strerror (int errnum_)
return "Operation cannot be accomplished in current state"; return "Operation cannot be accomplished in current state";
case ENOCOMPATPROTO: case ENOCOMPATPROTO:
return "The protocol is not compatible with the socket type"; return "The protocol is not compatible with the socket type";
case ETERM:
return "Context was terminated";
default: default:
#if defined _MSC_VER #if defined _MSC_VER
#pragma warning (push) #pragma warning (push)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment