Commit 206c8321 authored by sigiesec's avatar sigiesec Committed by Giesecke

Problem: in case of exhausted resources on creation of a context, assertions are triggered

Solution: signal error to caller, and apply appropriate cleanup
parent 4e2b9e6e
......@@ -127,6 +127,11 @@ zmq::ctx_t::~ctx_t ()
tag = ZMQ_CTX_TAG_VALUE_BAD;
}
bool zmq::ctx_t::valid () const
{
return term_mailbox.valid ();
}
int zmq::ctx_t::terminate ()
{
slot_sync.lock();
......@@ -146,7 +151,6 @@ int zmq::ctx_t::terminate ()
terminating = saveTerminating;
if (!starting) {
#ifdef HAVE_FORK
if (pid != getpid ()) {
// we are a forked child process. Close all file descriptors
......@@ -320,47 +324,83 @@ int zmq::ctx_t::get (int option_)
return rc;
}
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
bool zmq::ctx_t::start ()
{
scoped_lock_t locker(slot_sync);
// Initialise the array of mailboxes. Additional three slots are for
// zmq_ctx_term thread and reaper thread.
opt_sync.lock ();
int mazmq = max_sockets;
int ios = io_thread_count;
opt_sync.unlock ();
slot_count = mazmq + ios + 2;
slots = (i_mailbox **) malloc (sizeof (i_mailbox *) * slot_count);
if (!slots) {
errno = ENOMEM;
goto fail;
}
// Initialise the infrastructure for zmq_ctx_term thread.
slots[term_tid] = &term_mailbox;
// Create the reaper thread.
reaper = new (std::nothrow) reaper_t (this, reaper_tid);
if (!reaper) {
errno = ENOMEM;
goto fail_cleanup_slots;
}
if (!reaper->get_mailbox ()->valid ())
goto fail_cleanup_reaper;
slots[reaper_tid] = reaper->get_mailbox ();
reaper->start ();
// Create I/O thread objects and launch them.
for (int32_t i = (int32_t) slot_count - 1; i >= (int32_t) 2; i--) {
slots[i] = NULL;
}
for (int i = 2; i != ios + 2; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
if (!io_thread) {
errno = ENOMEM;
goto fail_cleanup_reaper;
}
if (!io_thread->get_mailbox ()->valid ()) {
delete io_thread;
goto fail_cleanup_reaper;
}
io_threads.push_back (io_thread);
slots [i] = io_thread->get_mailbox ();
io_thread->start ();
}
if (unlikely (starting)) {
// In the unused part of the slot array, create a list of empty slots.
for (int32_t i = (int32_t) slot_count - 1; i >= (int32_t) ios + 2; i--) {
empty_slots.push_back (i);
}
starting = false;
// Initialise the array of mailboxes. Additional three slots are for
// zmq_ctx_term thread and reaper thread.
opt_sync.lock ();
int mazmq = max_sockets;
int ios = io_thread_count;
opt_sync.unlock ();
slot_count = mazmq + ios + 2;
slots = (i_mailbox **) malloc (sizeof (i_mailbox*) * slot_count);
alloc_assert (slots);
// Initialise the infrastructure for zmq_ctx_term thread.
slots [term_tid] = &term_mailbox;
// Create the reaper thread.
reaper = new (std::nothrow) reaper_t (this, reaper_tid);
alloc_assert (reaper);
slots [reaper_tid] = reaper->get_mailbox ();
reaper->start ();
// Create I/O thread objects and launch them.
for (int i = 2; i != ios + 2; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
alloc_assert (io_thread);
io_threads.push_back (io_thread);
slots [i] = io_thread->get_mailbox ();
io_thread->start ();
}
starting = false;
return true;
// In the unused part of the slot array, create a list of empty slots.
for (int32_t i = (int32_t) slot_count - 1;
i >= (int32_t) ios + 2; i--) {
empty_slots.push_back (i);
slots [i] = NULL;
}
fail_cleanup_reaper:
reaper->stop ();
delete reaper;
reaper = NULL;
fail_cleanup_slots:
free (slots);
slots = NULL;
fail:
return false;
}
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
scoped_lock_t locker (slot_sync);
if (unlikely (starting)) {
if (!start ())
return NULL;
}
// Once zmq_ctx_term() was called, we can't create new sockets.
......
......@@ -133,7 +133,10 @@ namespace zmq
~ctx_t ();
bool valid() const;
private:
bool start();
struct pending_connection_t
{
......
......@@ -37,13 +37,16 @@
#include "ctx.hpp"
zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
object_t (ctx_, tid_)
object_t (ctx_, tid_),
mailbox_handle (NULL)
{
poller = new (std::nothrow) poller_t (*ctx_);
alloc_assert (poller);
mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
poller->set_pollin (mailbox_handle);
if (mailbox.get_fd () != retired_fd) {
mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
poller->set_pollin (mailbox_handle);
}
}
zmq::io_thread_t::~io_thread_t ()
......@@ -109,6 +112,8 @@ zmq::poller_t *zmq::io_thread_t::get_poller ()
void zmq::io_thread_t::process_stop ()
{
poller->rm_fd (mailbox_handle);
if (mailbox_handle) {
poller->rm_fd (mailbox_handle);
}
poller->stop ();
}
......@@ -99,3 +99,8 @@ int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
zmq_assert (ok);
return 0;
}
bool zmq::mailbox_t::valid () const
{
return signaler.valid ();
}
......@@ -54,6 +54,8 @@ namespace zmq
void send (const command_t &cmd_);
int recv (command_t *cmd_, int timeout_);
bool valid () const;
#ifdef HAVE_FORK
// close the file descriptors in the signaller. This is used in a forked
// child process to close the file descriptors so that they do not interfere
......
......@@ -36,9 +36,13 @@
zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
object_t (ctx_, tid_),
mailbox_handle((poller_t::handle_t)NULL),
poller (NULL),
sockets (0),
terminating (false)
{
if (!mailbox.valid ())
return;
poller = new (std::nothrow) poller_t (*ctx_);
alloc_assert (poller);
......@@ -64,13 +68,17 @@ zmq::mailbox_t *zmq::reaper_t::get_mailbox ()
void zmq::reaper_t::start ()
{
zmq_assert (mailbox.valid ());
// Start the thread.
poller->start ();
}
void zmq::reaper_t::stop ()
{
send_stop ();
if (get_mailbox ()->valid ()) {
send_stop ();
}
}
void zmq::reaper_t::in_event ()
......
......@@ -55,6 +55,7 @@ zmq::select_t::select_t (const zmq::ctx_t &ctx_) :
#else
maxfd (retired_fd),
#endif
started (false),
stopping (false)
{
#if defined ZMQ_HAVE_WINDOWS
......@@ -65,7 +66,10 @@ zmq::select_t::select_t (const zmq::ctx_t &ctx_) :
zmq::select_t::~select_t ()
{
worker.stop ();
if (started) {
stop ();
worker.stop ();
}
}
zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
......@@ -257,6 +261,7 @@ void zmq::select_t::reset_pollout (handle_t handle_)
void zmq::select_t::start ()
{
ctx.start_thread (worker, worker_routine, this);
started = true;
}
void zmq::select_t::stop ()
......
......@@ -166,6 +166,9 @@ class select_t : public poller_base_t
static fd_entries_t::iterator
find_fd_entry_by_handle (fd_entries_t &fd_entries, handle_t handle_);
// If true, start has been called.
bool started;
// If true, thread is shutting down.
bool stopping;
......
......@@ -372,6 +372,11 @@ int zmq::signaler_t::recv_failable ()
return 0;
}
bool zmq::signaler_t::valid () const
{
return w != retired_fd;
}
#ifdef HAVE_FORK
void zmq::signaler_t::forked ()
{
......@@ -398,7 +403,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
errno_assert (errno == ENFILE || errno == EMFILE);
*w_ = *r_ = -1;
return -1;
}
}
else {
*w_ = *r_ = fd;
return 0;
......
......@@ -51,12 +51,16 @@ namespace zmq
signaler_t ();
~signaler_t ();
// Returns the socket/file descriptor
// May return retired_fd if the signaler could not be initialized.
fd_t get_fd () const;
void send ();
int wait (int timeout_);
void recv ();
int recv_failable ();
bool valid () const;
#ifdef HAVE_FORK
// close the file descriptors in a forked child process so that they
// do not interfere with the context in the parent process.
......@@ -70,7 +74,8 @@ namespace zmq
static int make_fdpair (fd_t *r_, fd_t *w_);
// Underlying write & read file descriptor
// Will be -1 if we exceeded number of available handles
// Will be -1 if an error occurred during initialization, e.g. we
// exceeded the number of available handles
fd_t w;
fd_t r;
......
......@@ -111,8 +111,19 @@ int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short e
zmq_assert (rc == 0);
if (thread_safe) {
if (signaler == NULL)
signaler = new signaler_t ();
if (signaler == NULL) {
signaler = new (std::nothrow) signaler_t ();
if (!signaler) {
errno = ENOMEM;
return -1;
}
if (!signaler->valid ()) {
delete signaler;
signaler = NULL;
errno = EMFILE;
return -1;
}
}
rc = socket_->add_signaler (signaler);
zmq_assert (rc == 0);
......
......@@ -161,7 +161,12 @@ void *zmq_ctx_new (void)
// Create 0MQ context.
zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
alloc_assert (ctx);
if (ctx) {
if (!ctx->valid ()) {
delete ctx;
return NULL;
}
}
return ctx;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment