Commit 44dd005f authored by Martin Sustrik's avatar Martin Sustrik

number of application threads to use 0MQ sockets is unlimited; app_threads…

number of application threads to use 0MQ sockets is unlimited; app_threads parameter in zmq_init is unused and obsolete
parent 235ed3a3
......@@ -27,6 +27,10 @@ namespace zmq
enum
{
// Maximal number of OS threads that can own 0MQ sockets
// at the same time.
max_app_threads = 512,
// Number of new messages in message pipe needed to trigger new memory
// allocation. Setting this parameter to 256 decreases the impact of
// memory allocation by approximately 99.6%
......
......@@ -34,7 +34,7 @@
#include "windows.h"
#endif
zmq::dispatcher_t::dispatcher_t (uint32_t app_threads_, uint32_t io_threads_) :
zmq::dispatcher_t::dispatcher_t (uint32_t io_threads_) :
sockets (0),
terminated (false)
{
......@@ -50,33 +50,19 @@ zmq::dispatcher_t::dispatcher_t (uint32_t app_threads_, uint32_t io_threads_) :
#endif
// Initialise the array of signalers.
signalers_count = app_threads_ + io_threads_;
signalers_count = max_app_threads + io_threads_;
signalers = (signaler_t**) malloc (sizeof (signaler_t*) * signalers_count);
zmq_assert (signalers);
memset (signalers, 0, sizeof (signaler_t*) * signalers_count);
// Create I/O thread objects.
// Create I/O thread objects and launch them.
for (uint32_t i = 0; i != io_threads_; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
zmq_assert (io_thread);
io_threads.push_back (io_thread);
signalers [i] = io_thread->get_signaler ();
io_thread->start ();
}
// Create application thread proxies.
for (uint32_t i = 0; i != app_threads_; i++) {
app_thread_info_t info;
info.associated = false;
info.app_thread = new (std::nothrow) app_thread_t (this,
i + io_threads_);
zmq_assert (info.app_thread);
app_threads.push_back (info);
signalers [i + io_threads_] = info.app_thread->get_signaler ();
}
// Launch I/O threads.
for (uint32_t i = 0; i != io_threads_; i++)
io_threads [i]->start ();
}
int zmq::dispatcher_t::term ()
......@@ -152,14 +138,35 @@ zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)
// If no app_thread_t is associated with the calling thread,
// associate it with one of the unused app_thread_t objects.
if (current == app_threads.size ()) {
// If all the existing app_threads are already used, create one more.
if (unused == app_threads.size ()) {
app_threads_sync.unlock ();
errno = EMTHREAD;
return NULL;
// If max_app_threads limit was reached, return error.
if (app_threads.size () == max_app_threads) {
app_threads_sync.unlock ();
errno = EMTHREAD;
return NULL;
}
// Create the new application thread proxy object.
app_thread_info_t info;
info.associated = false;
info.app_thread = new (std::nothrow) app_thread_t (this,
io_threads.size () + app_threads.size ());
zmq_assert (info.app_thread);
signalers [io_threads.size () + app_threads.size ()] =
info.app_thread->get_signaler ();
app_threads.push_back (info);
}
app_threads [unused].associated = true;
app_threads [unused].tid = thread_t::id ();
// Incidentally, this works both when there is an unused app_thread
// and when a new one is created.
current = unused;
// Associate the selected app_thread with the OS thread.
app_threads [current].associated = true;
app_threads [current].tid = thread_t::id ();
}
app_thread_t *thread = app_threads [current].app_thread;
......
......@@ -34,23 +34,14 @@
namespace zmq
{
// Dispatcher implements bidirectional thread-safe passing of commands
// between N threads. It consists of a ypipes to pass commands and
// signalers to wake up the receiver thread when new commands are
// available. Note that dispatcher is inefficient for passing messages
// within a thread (sender thread = receiver thread). The optimisation is
// not part of the class and should be implemented by individual threads
// (presumably by calling the command handling function directly).
class dispatcher_t
{
public:
// Create the dispatcher object. Matrix of pipes to communicate between
// each socket and each I/O thread is created along with appropriate
// signalers.
dispatcher_t (uint32_t app_threads_, uint32_t io_threads_);
// Create the dispatcher object. The argument specifies the size
// of I/O thread pool to create.
dispatcher_t (uint32_t io_threads_);
// This function is called when user invokes zmq_term. If there are
// no more sockets open it'll cause all the infrastructure to be shut
......
......@@ -226,7 +226,9 @@ size_t zmq_msg_size (zmq_msg_t *msg_)
return ((zmq::msg_content_t*) msg_->content)->size;
}
void *zmq_init (int app_threads_, int io_threads_, int flags_)
// TODO: app_threads and flags parameters are not used anymore...
// Reflect this in the API/ABI.
void *zmq_init (int /*app_threads_*/, int io_threads_, int /*flags_*/)
{
// There are no context flags defined at the moment, so flags_ is ignored.
......@@ -262,7 +264,7 @@ void *zmq_init (int app_threads_, int io_threads_, int flags_)
// Create 0MQ context.
zmq::dispatcher_t *dispatcher = new (std::nothrow) zmq::dispatcher_t (
(uint32_t) app_threads_, (uint32_t) io_threads_);
(uint32_t) io_threads_);
zmq_assert (dispatcher);
return (void*) dispatcher;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment