Commit a68e6739 authored by Martin Sustrik's avatar Martin Sustrik

when no I/O threads are available error is raised instead of assertion

parent 47e87b7e
...@@ -58,6 +58,8 @@ The requested 'address' specifies a nonexistent interface. ...@@ -58,6 +58,8 @@ The requested 'address' specifies a nonexistent interface.
The 0MQ 'context' associated with the specified 'socket' was terminated. The 0MQ 'context' associated with the specified 'socket' was terminated.
*EFAULT*:: *EFAULT*::
The provided 'socket' was not valid (NULL). The provided 'socket' was not valid (NULL).
*EMTHREAD*::
No I/O thread is available to accomplish the task.
EXAMPLE EXAMPLE
......
...@@ -56,6 +56,8 @@ The requested 'transport' protocol is not compatible with the socket type. ...@@ -56,6 +56,8 @@ The requested 'transport' protocol is not compatible with the socket type.
The 0MQ 'context' associated with the specified 'socket' was terminated. The 0MQ 'context' associated with the specified 'socket' was terminated.
*EFAULT*:: *EFAULT*::
The provided 'socket' was not valid (NULL). The provided 'socket' was not valid (NULL).
*EMTHREAD*::
No I/O thread is available to accomplish the task.
EXAMPLE EXAMPLE
......
...@@ -85,7 +85,7 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch); ...@@ -85,7 +85,7 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
#define EFSM (ZMQ_HAUSNUMERO + 51) #define EFSM (ZMQ_HAUSNUMERO + 51)
#define ENOCOMPATPROTO (ZMQ_HAUSNUMERO + 52) #define ENOCOMPATPROTO (ZMQ_HAUSNUMERO + 52)
#define ETERM (ZMQ_HAUSNUMERO + 53) #define ETERM (ZMQ_HAUSNUMERO + 53)
#define EMTHREAD (ZMQ_HAUSNUMERO + 54) /* Old error code, remove in 3.x */ #define EMTHREAD (ZMQ_HAUSNUMERO + 54)
/* This function retrieves the errno as it is known to 0MQ library. The goal */ /* This function retrieves the errno as it is known to 0MQ library. The goal */
/* of this function is to make the code 100% portable, including where 0MQ */ /* of this function is to make the code 100% portable, including where 0MQ */
......
...@@ -43,13 +43,18 @@ void zmq::connect_session_t::process_plug () ...@@ -43,13 +43,18 @@ void zmq::connect_session_t::process_plug ()
void zmq::connect_session_t::start_connecting () void zmq::connect_session_t::start_connecting ()
{ {
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);
// Create the connecter object. // Create the connecter object.
// Both TCP and IPC transports are using the same infrastructure. // Both TCP and IPC transports are using the same infrastructure.
if (protocol == "tcp" || protocol == "ipc") { if (protocol == "tcp" || protocol == "ipc") {
zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t ( zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (
choose_io_thread (options.affinity), this, options, io_thread, this, options, protocol.c_str (), address.c_str ());
protocol.c_str (), address.c_str ());
zmq_assert (connecter); zmq_assert (connecter);
launch_child (connecter); launch_child (connecter);
return; return;
...@@ -70,7 +75,7 @@ void zmq::connect_session_t::start_connecting () ...@@ -70,7 +75,7 @@ void zmq::connect_session_t::start_connecting ()
// PGM sender. // PGM sender.
pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t ( pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
choose_io_thread (options.affinity), options); io_thread, options);
zmq_assert (pgm_sender); zmq_assert (pgm_sender);
int rc = pgm_sender->init (udp_encapsulation, address.c_str ()); int rc = pgm_sender->init (udp_encapsulation, address.c_str ());
...@@ -82,7 +87,7 @@ void zmq::connect_session_t::start_connecting () ...@@ -82,7 +87,7 @@ void zmq::connect_session_t::start_connecting ()
// PGM receiver. // PGM receiver.
pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
choose_io_thread (options.affinity), options); io_thread, options);
zmq_assert (pgm_receiver); zmq_assert (pgm_receiver);
int rc = pgm_receiver->init (udp_encapsulation, address.c_str ()); int rc = pgm_receiver->init (udp_encapsulation, address.c_str ());
......
...@@ -64,7 +64,8 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : ...@@ -64,7 +64,8 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
} }
// In the unused part of the slot array, create a list of empty slots. // In the unused part of the slot array, create a list of empty slots.
for (uint32_t i = slot_count - 1; i >= io_threads_; i--) { for (int32_t i = (int32_t) slot_count - 1;
i >= (int32_t) io_threads_; i--) {
empty_slots.push_back (i); empty_slots.push_back (i);
slots [i] = NULL; slots [i] = NULL;
} }
...@@ -221,8 +222,10 @@ void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_) ...@@ -221,8 +222,10 @@ void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_)
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_) zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
{ {
if (io_threads.empty ())
return NULL;
// Find the I/O thread with minimum load. // Find the I/O thread with minimum load.
zmq_assert (io_threads.size () > 0);
int min_load = -1; int min_load = -1;
io_threads_t::size_type result = 0; io_threads_t::size_type result = 0;
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) { for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
......
...@@ -65,8 +65,9 @@ namespace zmq ...@@ -65,8 +65,9 @@ namespace zmq
void send_command (uint32_t slot_, const command_t &command_); void send_command (uint32_t slot_, const command_t &command_);
// Returns the I/O thread that is the least busy at the moment. // Returns the I/O thread that is the least busy at the moment.
// Taskset specifies which I/O threads are eligible (0 = all). // Affinity specifies which I/O threads are eligible (0 = all).
class io_thread_t *choose_io_thread (uint64_t taskset_); // Returns NULL is no I/O thread is available.
class io_thread_t *choose_io_thread (uint64_t affinity_);
// Management of inproc endpoints. // Management of inproc endpoints.
int register_endpoint (const char *addr_, class socket_base_t *socket_); int register_endpoint (const char *addr_, class socket_base_t *socket_);
......
...@@ -142,9 +142,9 @@ void zmq::object_t::log (zmq_msg_t *msg_) ...@@ -142,9 +142,9 @@ void zmq::object_t::log (zmq_msg_t *msg_)
ctx->log (msg_); ctx->log (msg_);
} }
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_)
{ {
return ctx->choose_io_thread (taskset_); return ctx->choose_io_thread (affinity_);
} }
void zmq::object_t::zombify_socket (socket_base_t *socket_) void zmq::object_t::zombify_socket (socket_base_t *socket_)
......
...@@ -54,7 +54,7 @@ namespace zmq ...@@ -54,7 +54,7 @@ namespace zmq
void log (zmq_msg_t *msg_); void log (zmq_msg_t *msg_);
// Chooses least loaded I/O thread. // Chooses least loaded I/O thread.
class io_thread_t *choose_io_thread (uint64_t taskset_); class io_thread_t *choose_io_thread (uint64_t affinity_);
// Zombify particular socket. In other words, pass the ownership to // Zombify particular socket. In other words, pass the ownership to
// the context. // the context.
......
...@@ -289,8 +289,17 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -289,8 +289,17 @@ int zmq::socket_base_t::bind (const char *addr_)
return register_endpoint (addr_, this); return register_endpoint (addr_, this);
if (protocol == "tcp" || protocol == "ipc") { if (protocol == "tcp" || protocol == "ipc") {
// Choose I/O thread to run the listerner in.
io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) {
errno = EMTHREAD;
return -1;
}
// Create and run the listener.
zmq_listener_t *listener = new (std::nothrow) zmq_listener_t ( zmq_listener_t *listener = new (std::nothrow) zmq_listener_t (
choose_io_thread (options.affinity), this, options); io_thread, this, options);
zmq_assert (listener); zmq_assert (listener);
int rc = listener->set_address (protocol.c_str(), address.c_str ()); int rc = listener->set_address (protocol.c_str(), address.c_str ());
if (rc != 0) { if (rc != 0) {
...@@ -376,10 +385,16 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -376,10 +385,16 @@ int zmq::socket_base_t::connect (const char *addr_)
return 0; return 0;
} }
// Choose the I/O thread to run the session in.
io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) {
errno = EMTHREAD;
return -1;
}
// Create session. // Create session.
connect_session_t *session = new (std::nothrow) connect_session_t ( connect_session_t *session = new (std::nothrow) connect_session_t (
choose_io_thread (options.affinity), this, options, io_thread, this, options, protocol.c_str (), address.c_str ());
protocol.c_str (), address.c_str ());
zmq_assert (session); zmq_assert (session);
// If 'immediate connect' feature is required, we'll create the pipes // If 'immediate connect' feature is required, we'll create the pipes
......
...@@ -88,6 +88,8 @@ const char *zmq_strerror (int errnum_) ...@@ -88,6 +88,8 @@ const char *zmq_strerror (int errnum_)
return "The protocol is not compatible with the socket type"; return "The protocol is not compatible with the socket type";
case ETERM: case ETERM:
return "Context was terminated"; return "Context was terminated";
case EMTHREAD:
return "No thread available";
default: default:
#if defined _MSC_VER #if defined _MSC_VER
#pragma warning (push) #pragma warning (push)
......
...@@ -77,9 +77,14 @@ void zmq::zmq_connecter_t::out_event () ...@@ -77,9 +77,14 @@ void zmq::zmq_connecter_t::out_event ()
return; return;
} }
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);
// Create an init object. // Create an init object.
zmq_init_t *init = new (std::nothrow) zmq_init_t ( zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, NULL,
choose_io_thread (options.affinity), NULL, session, fd, options); session, fd, options);
zmq_assert (init); zmq_assert (init);
launch_sibling (init); launch_sibling (init);
......
...@@ -64,9 +64,14 @@ void zmq::zmq_listener_t::in_event () ...@@ -64,9 +64,14 @@ void zmq::zmq_listener_t::in_event ()
if (fd == retired_fd) if (fd == retired_fd)
return; return;
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);
// Create and launch an init object. // Create and launch an init object.
zmq_init_t *init = new (std::nothrow) zmq_init_t ( zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, socket,
choose_io_thread (options.affinity), socket, NULL, fd, options); NULL, fd, options);
zmq_assert (init); zmq_assert (init);
launch_sibling (init); launch_sibling (init);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment