Unverified Commit c6bd1236 authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #2903 from sigiesec/fix-2895-2

Problem: in case of exhausted resources on creation of a context, assertions are triggered
parents 4e2b9e6e 206c8321
...@@ -127,6 +127,11 @@ zmq::ctx_t::~ctx_t () ...@@ -127,6 +127,11 @@ zmq::ctx_t::~ctx_t ()
tag = ZMQ_CTX_TAG_VALUE_BAD; tag = ZMQ_CTX_TAG_VALUE_BAD;
} }
bool zmq::ctx_t::valid () const
{
return term_mailbox.valid ();
}
int zmq::ctx_t::terminate () int zmq::ctx_t::terminate ()
{ {
slot_sync.lock(); slot_sync.lock();
...@@ -146,7 +151,6 @@ int zmq::ctx_t::terminate () ...@@ -146,7 +151,6 @@ int zmq::ctx_t::terminate ()
terminating = saveTerminating; terminating = saveTerminating;
if (!starting) { if (!starting) {
#ifdef HAVE_FORK #ifdef HAVE_FORK
if (pid != getpid ()) { if (pid != getpid ()) {
// we are a forked child process. Close all file descriptors // we are a forked child process. Close all file descriptors
...@@ -320,13 +324,8 @@ int zmq::ctx_t::get (int option_) ...@@ -320,13 +324,8 @@ int zmq::ctx_t::get (int option_)
return rc; return rc;
} }
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) bool zmq::ctx_t::start ()
{ {
scoped_lock_t locker(slot_sync);
if (unlikely (starting)) {
starting = false;
// Initialise the array of mailboxes. Additional three slots are for // Initialise the array of mailboxes. Additional three slots are for
// zmq_ctx_term thread and reaper thread. // zmq_ctx_term thread and reaper thread.
opt_sync.lock (); opt_sync.lock ();
...@@ -334,33 +333,74 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) ...@@ -334,33 +333,74 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
int ios = io_thread_count; int ios = io_thread_count;
opt_sync.unlock (); opt_sync.unlock ();
slot_count = mazmq + ios + 2; slot_count = mazmq + ios + 2;
slots = (i_mailbox **) malloc (sizeof (i_mailbox*) * slot_count); slots = (i_mailbox **) malloc (sizeof (i_mailbox *) * slot_count);
alloc_assert (slots); if (!slots) {
errno = ENOMEM;
goto fail;
}
// Initialise the infrastructure for zmq_ctx_term thread. // Initialise the infrastructure for zmq_ctx_term thread.
slots [term_tid] = &term_mailbox; slots[term_tid] = &term_mailbox;
// Create the reaper thread. // Create the reaper thread.
reaper = new (std::nothrow) reaper_t (this, reaper_tid); reaper = new (std::nothrow) reaper_t (this, reaper_tid);
alloc_assert (reaper); if (!reaper) {
slots [reaper_tid] = reaper->get_mailbox (); errno = ENOMEM;
goto fail_cleanup_slots;
}
if (!reaper->get_mailbox ()->valid ())
goto fail_cleanup_reaper;
slots[reaper_tid] = reaper->get_mailbox ();
reaper->start (); reaper->start ();
// Create I/O thread objects and launch them. // Create I/O thread objects and launch them.
for (int32_t i = (int32_t) slot_count - 1; i >= (int32_t) 2; i--) {
slots[i] = NULL;
}
for (int i = 2; i != ios + 2; i++) { for (int i = 2; i != ios + 2; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
alloc_assert (io_thread); if (!io_thread) {
errno = ENOMEM;
goto fail_cleanup_reaper;
}
if (!io_thread->get_mailbox ()->valid ()) {
delete io_thread;
goto fail_cleanup_reaper;
}
io_threads.push_back (io_thread); io_threads.push_back (io_thread);
slots [i] = io_thread->get_mailbox (); slots [i] = io_thread->get_mailbox ();
io_thread->start (); io_thread->start ();
} }
// In the unused part of the slot array, create a list of empty slots. // In the unused part of the slot array, create a list of empty slots.
for (int32_t i = (int32_t) slot_count - 1; for (int32_t i = (int32_t) slot_count - 1; i >= (int32_t) ios + 2; i--) {
i >= (int32_t) ios + 2; i--) {
empty_slots.push_back (i); empty_slots.push_back (i);
slots [i] = NULL;
} }
starting = false;
return true;
fail_cleanup_reaper:
reaper->stop ();
delete reaper;
reaper = NULL;
fail_cleanup_slots:
free (slots);
slots = NULL;
fail:
return false;
}
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
scoped_lock_t locker (slot_sync);
if (unlikely (starting)) {
if (!start ())
return NULL;
} }
// Once zmq_ctx_term() was called, we can't create new sockets. // Once zmq_ctx_term() was called, we can't create new sockets.
......
...@@ -133,7 +133,10 @@ namespace zmq ...@@ -133,7 +133,10 @@ namespace zmq
~ctx_t (); ~ctx_t ();
bool valid() const;
private: private:
bool start();
struct pending_connection_t struct pending_connection_t
{ {
......
...@@ -37,13 +37,16 @@ ...@@ -37,13 +37,16 @@
#include "ctx.hpp" #include "ctx.hpp"
zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) : zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
object_t (ctx_, tid_) object_t (ctx_, tid_),
mailbox_handle (NULL)
{ {
poller = new (std::nothrow) poller_t (*ctx_); poller = new (std::nothrow) poller_t (*ctx_);
alloc_assert (poller); alloc_assert (poller);
if (mailbox.get_fd () != retired_fd) {
mailbox_handle = poller->add_fd (mailbox.get_fd (), this); mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
poller->set_pollin (mailbox_handle); poller->set_pollin (mailbox_handle);
}
} }
zmq::io_thread_t::~io_thread_t () zmq::io_thread_t::~io_thread_t ()
...@@ -109,6 +112,8 @@ zmq::poller_t *zmq::io_thread_t::get_poller () ...@@ -109,6 +112,8 @@ zmq::poller_t *zmq::io_thread_t::get_poller ()
void zmq::io_thread_t::process_stop () void zmq::io_thread_t::process_stop ()
{ {
if (mailbox_handle) {
poller->rm_fd (mailbox_handle); poller->rm_fd (mailbox_handle);
}
poller->stop (); poller->stop ();
} }
...@@ -99,3 +99,8 @@ int zmq::mailbox_t::recv (command_t *cmd_, int timeout_) ...@@ -99,3 +99,8 @@ int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
zmq_assert (ok); zmq_assert (ok);
return 0; return 0;
} }
bool zmq::mailbox_t::valid () const
{
return signaler.valid ();
}
...@@ -54,6 +54,8 @@ namespace zmq ...@@ -54,6 +54,8 @@ namespace zmq
void send (const command_t &cmd_); void send (const command_t &cmd_);
int recv (command_t *cmd_, int timeout_); int recv (command_t *cmd_, int timeout_);
bool valid () const;
#ifdef HAVE_FORK #ifdef HAVE_FORK
// close the file descriptors in the signaller. This is used in a forked // close the file descriptors in the signaller. This is used in a forked
// child process to close the file descriptors so that they do not interfere // child process to close the file descriptors so that they do not interfere
......
...@@ -36,9 +36,13 @@ ...@@ -36,9 +36,13 @@
zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) : zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
object_t (ctx_, tid_), object_t (ctx_, tid_),
mailbox_handle((poller_t::handle_t)NULL), mailbox_handle((poller_t::handle_t)NULL),
poller (NULL),
sockets (0), sockets (0),
terminating (false) terminating (false)
{ {
if (!mailbox.valid ())
return;
poller = new (std::nothrow) poller_t (*ctx_); poller = new (std::nothrow) poller_t (*ctx_);
alloc_assert (poller); alloc_assert (poller);
...@@ -64,13 +68,17 @@ zmq::mailbox_t *zmq::reaper_t::get_mailbox () ...@@ -64,13 +68,17 @@ zmq::mailbox_t *zmq::reaper_t::get_mailbox ()
void zmq::reaper_t::start () void zmq::reaper_t::start ()
{ {
zmq_assert (mailbox.valid ());
// Start the thread. // Start the thread.
poller->start (); poller->start ();
} }
void zmq::reaper_t::stop () void zmq::reaper_t::stop ()
{ {
if (get_mailbox ()->valid ()) {
send_stop (); send_stop ();
}
} }
void zmq::reaper_t::in_event () void zmq::reaper_t::in_event ()
......
...@@ -55,6 +55,7 @@ zmq::select_t::select_t (const zmq::ctx_t &ctx_) : ...@@ -55,6 +55,7 @@ zmq::select_t::select_t (const zmq::ctx_t &ctx_) :
#else #else
maxfd (retired_fd), maxfd (retired_fd),
#endif #endif
started (false),
stopping (false) stopping (false)
{ {
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
...@@ -65,7 +66,10 @@ zmq::select_t::select_t (const zmq::ctx_t &ctx_) : ...@@ -65,7 +66,10 @@ zmq::select_t::select_t (const zmq::ctx_t &ctx_) :
zmq::select_t::~select_t () zmq::select_t::~select_t ()
{ {
if (started) {
stop ();
worker.stop (); worker.stop ();
}
} }
zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
...@@ -257,6 +261,7 @@ void zmq::select_t::reset_pollout (handle_t handle_) ...@@ -257,6 +261,7 @@ void zmq::select_t::reset_pollout (handle_t handle_)
void zmq::select_t::start () void zmq::select_t::start ()
{ {
ctx.start_thread (worker, worker_routine, this); ctx.start_thread (worker, worker_routine, this);
started = true;
} }
void zmq::select_t::stop () void zmq::select_t::stop ()
......
...@@ -166,6 +166,9 @@ class select_t : public poller_base_t ...@@ -166,6 +166,9 @@ class select_t : public poller_base_t
static fd_entries_t::iterator static fd_entries_t::iterator
find_fd_entry_by_handle (fd_entries_t &fd_entries, handle_t handle_); find_fd_entry_by_handle (fd_entries_t &fd_entries, handle_t handle_);
// If true, start has been called.
bool started;
// If true, thread is shutting down. // If true, thread is shutting down.
bool stopping; bool stopping;
......
...@@ -372,6 +372,11 @@ int zmq::signaler_t::recv_failable () ...@@ -372,6 +372,11 @@ int zmq::signaler_t::recv_failable ()
return 0; return 0;
} }
bool zmq::signaler_t::valid () const
{
return w != retired_fd;
}
#ifdef HAVE_FORK #ifdef HAVE_FORK
void zmq::signaler_t::forked () void zmq::signaler_t::forked ()
{ {
......
...@@ -51,12 +51,16 @@ namespace zmq ...@@ -51,12 +51,16 @@ namespace zmq
signaler_t (); signaler_t ();
~signaler_t (); ~signaler_t ();
// Returns the socket/file descriptor
// May return retired_fd if the signaler could not be initialized.
fd_t get_fd () const; fd_t get_fd () const;
void send (); void send ();
int wait (int timeout_); int wait (int timeout_);
void recv (); void recv ();
int recv_failable (); int recv_failable ();
bool valid () const;
#ifdef HAVE_FORK #ifdef HAVE_FORK
// close the file descriptors in a forked child process so that they // close the file descriptors in a forked child process so that they
// do not interfere with the context in the parent process. // do not interfere with the context in the parent process.
...@@ -70,7 +74,8 @@ namespace zmq ...@@ -70,7 +74,8 @@ namespace zmq
static int make_fdpair (fd_t *r_, fd_t *w_); static int make_fdpair (fd_t *r_, fd_t *w_);
// Underlying write & read file descriptor // Underlying write & read file descriptor
// Will be -1 if we exceeded number of available handles // Will be -1 if an error occurred during initialization, e.g. we
// exceeded the number of available handles
fd_t w; fd_t w;
fd_t r; fd_t r;
......
...@@ -111,8 +111,19 @@ int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short e ...@@ -111,8 +111,19 @@ int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short e
zmq_assert (rc == 0); zmq_assert (rc == 0);
if (thread_safe) { if (thread_safe) {
if (signaler == NULL) if (signaler == NULL) {
signaler = new signaler_t (); signaler = new (std::nothrow) signaler_t ();
if (!signaler) {
errno = ENOMEM;
return -1;
}
if (!signaler->valid ()) {
delete signaler;
signaler = NULL;
errno = EMFILE;
return -1;
}
}
rc = socket_->add_signaler (signaler); rc = socket_->add_signaler (signaler);
zmq_assert (rc == 0); zmq_assert (rc == 0);
......
...@@ -161,7 +161,12 @@ void *zmq_ctx_new (void) ...@@ -161,7 +161,12 @@ void *zmq_ctx_new (void)
// Create 0MQ context. // Create 0MQ context.
zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t; zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
alloc_assert (ctx); if (ctx) {
if (!ctx->valid ()) {
delete ctx;
return NULL;
}
}
return ctx; return ctx;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment