Commit 01533a5a authored by Martin Sustrik's avatar Martin Sustrik

ZMQII-65: Two OS threads are mapped to the same app_thread_t

parent 7593d815
...@@ -62,7 +62,6 @@ ...@@ -62,7 +62,6 @@
zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_, zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_,
int flags_) : int flags_) :
object_t (dispatcher_, thread_slot_), object_t (dispatcher_, thread_slot_),
associated (false),
last_processing_time (0) last_processing_time (0)
{ {
if (flags_ & ZMQ_POLL) { if (flags_ & ZMQ_POLL) {
...@@ -87,24 +86,6 @@ zmq::i_signaler *zmq::app_thread_t::get_signaler () ...@@ -87,24 +86,6 @@ zmq::i_signaler *zmq::app_thread_t::get_signaler ()
return signaler; return signaler;
} }
bool zmq::app_thread_t::is_current ()
{
return !sockets.empty () && associated &&
thread_t::equal (tid, thread_t::id ());
}
bool zmq::app_thread_t::make_current ()
{
// If there are object managed by this slot we cannot assign the slot
// to a different thread.
if (!sockets.empty ())
return false;
associated = true;
tid = thread_t::id ();
return true;
}
void zmq::app_thread_t::process_commands (bool block_, bool throttle_) void zmq::app_thread_t::process_commands (bool block_, bool throttle_)
{ {
uint64_t signals; uint64_t signals;
...@@ -191,6 +172,8 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) ...@@ -191,6 +172,8 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
s = new (std::nothrow) downstream_t (this); s = new (std::nothrow) downstream_t (this);
break; break;
default: default:
if (sockets.empty ())
dispatcher->no_sockets (this);
errno = EINVAL; errno = EINVAL;
return NULL; return NULL;
} }
...@@ -204,4 +187,6 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) ...@@ -204,4 +187,6 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
void zmq::app_thread_t::remove_socket (socket_base_t *socket_) void zmq::app_thread_t::remove_socket (socket_base_t *socket_)
{ {
sockets.erase (socket_); sockets.erase (socket_);
if (sockets.empty ())
dispatcher->no_sockets (this);
} }
...@@ -25,7 +25,6 @@ ...@@ -25,7 +25,6 @@
#include "stdint.hpp" #include "stdint.hpp"
#include "object.hpp" #include "object.hpp"
#include "yarray.hpp" #include "yarray.hpp"
#include "thread.hpp"
namespace zmq namespace zmq
{ {
...@@ -42,17 +41,6 @@ namespace zmq ...@@ -42,17 +41,6 @@ namespace zmq
// Returns signaler associated with this application thread. // Returns signaler associated with this application thread.
struct i_signaler *get_signaler (); struct i_signaler *get_signaler ();
// Nota bene: Following two functions are accessed from different
// threads. The caller (dispatcher) is responsible for synchronisation
// of accesses.
// Returns true is current thread is associated with the app thread.
bool is_current ();
// Tries to associate current thread with the app thread object.
// Returns true is successfull, false otherwise.
bool make_current ();
// Processes commands sent to this thread (if any). If 'block' is // Processes commands sent to this thread (if any). If 'block' is
// set to true, returns only after at least one command was processed. // set to true, returns only after at least one command was processed.
// If throttle argument is true, commands are processed at most once // If throttle argument is true, commands are processed at most once
...@@ -71,13 +59,6 @@ namespace zmq ...@@ -71,13 +59,6 @@ namespace zmq
typedef yarray_t <socket_base_t> sockets_t; typedef yarray_t <socket_base_t> sockets_t;
sockets_t sockets; sockets_t sockets;
// If false, app_thread_t object is not associated with any OS thread.
// In such case, 'tid' member contains a bogus value.
bool associated;
// Thread ID associated with this slot.
thread_t::id_t tid;
// App thread's signaler object. // App thread's signaler object.
struct i_signaler *signaler; struct i_signaler *signaler;
......
...@@ -51,11 +51,12 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_, ...@@ -51,11 +51,12 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_,
// Create application thread proxies. // Create application thread proxies.
for (int i = 0; i != app_threads_; i++) { for (int i = 0; i != app_threads_; i++) {
app_thread_t *app_thread = new (std::nothrow) app_thread_t (this, i, app_thread_info_t info;
flags_); info.associated = false;
zmq_assert (app_thread); info.app_thread = new (std::nothrow) app_thread_t (this, i, flags_);
app_threads.push_back (app_thread); zmq_assert (info.app_thread);
signalers.push_back (app_thread->get_signaler ()); app_threads.push_back (info);
signalers.push_back (info.app_thread->get_signaler ());
} }
// Create I/O thread objects. // Create I/O thread objects.
...@@ -110,7 +111,7 @@ zmq::dispatcher_t::~dispatcher_t () ...@@ -110,7 +111,7 @@ zmq::dispatcher_t::~dispatcher_t ()
// Close all application theads, sockets, io_objects etc. // Close all application theads, sockets, io_objects etc.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
delete app_threads [i]; delete app_threads [i].app_thread;
// Deallocate all the orphaned pipes. // Deallocate all the orphaned pipes.
while (!pipes.empty ()) while (!pipes.empty ())
...@@ -132,13 +133,37 @@ int zmq::dispatcher_t::thread_slot_count () ...@@ -132,13 +133,37 @@ int zmq::dispatcher_t::thread_slot_count ()
zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_) zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)
{ {
threads_sync.lock (); app_threads_sync.lock ();
app_thread_t *thread = choose_app_thread ();
if (!thread) { // Find whether the calling thread has app_thread_t object associated
threads_sync.unlock (); // already. At the same time find an unused app_thread_t so that it can
// be used if there's no associated object for the calling thread.
// Check whether thread ID is already assigned. If so, return it.
app_threads_t::size_type unused = app_threads.size ();
app_threads_t::size_type current;
for (current = 0; current != app_threads.size (); current++) {
if (app_threads [current].associated &&
thread_t::equal (thread_t::id (), app_threads [current].tid))
break;
if (!app_threads [current].associated)
unused = current;
}
// If no app_thread_t is associated with the calling thread,
// associate it with one of the unused app_thread_t objects.
if (current == app_threads.size ()) {
if (unused == app_threads.size ()) {
app_threads_sync.unlock ();
errno = EMTHREAD;
return NULL; return NULL;
} }
threads_sync.unlock (); app_threads [unused].associated = true;
app_threads [unused].tid = thread_t::id ();
current = unused;
}
app_thread_t *thread = app_threads [current].app_thread;
app_threads_sync.unlock ();
socket_base_t *s = thread->create_socket (type_); socket_base_t *s = thread->create_socket (type_);
if (!s) if (!s)
...@@ -165,6 +190,19 @@ void zmq::dispatcher_t::destroy_socket () ...@@ -165,6 +190,19 @@ void zmq::dispatcher_t::destroy_socket ()
delete this; delete this;
} }
void zmq::dispatcher_t::no_sockets (app_thread_t *thread_)
{
app_threads_sync.lock ();
app_threads_t::size_type i;
for (i = 0; i != app_threads.size (); i++)
if (app_threads [i].app_thread == thread_) {
app_threads [i].associated = false;
break;
}
zmq_assert (i != app_threads.size ());
app_threads_sync.unlock ();
}
void zmq::dispatcher_t::write (int source_, int destination_, void zmq::dispatcher_t::write (int source_, int destination_,
const command_t &command_) const command_t &command_)
{ {
...@@ -182,23 +220,6 @@ bool zmq::dispatcher_t::read (int source_, int destination_, ...@@ -182,23 +220,6 @@ bool zmq::dispatcher_t::read (int source_, int destination_,
destination_].read (command_); destination_].read (command_);
} }
zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread ()
{
// Check whether thread ID is already assigned. If so, return it.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
if (app_threads [i]->is_current ())
return app_threads [i];
// Check whether there's an unused thread slot in the cotext.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
if (app_threads [i]->make_current ())
return app_threads [i];
// Thread limit was exceeded.
errno = EMTHREAD;
return NULL;
}
zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t affinity_) zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t affinity_)
{ {
// Find the I/O thread with minimum load. // Find the I/O thread with minimum load.
......
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#include "config.hpp" #include "config.hpp"
#include "mutex.hpp" #include "mutex.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "thread.hpp"
namespace zmq namespace zmq
{ {
...@@ -64,6 +65,10 @@ namespace zmq ...@@ -64,6 +65,10 @@ namespace zmq
// Destroy a socket. // Destroy a socket.
void destroy_socket (); void destroy_socket ();
// Called by app_thread_t when it has no more sockets. The function
// should disassociate the object from the current OS thread.
void no_sockets (class app_thread_t *thread_);
// Returns number of thread slots in the dispatcher. To be used by // Returns number of thread slots in the dispatcher. To be used by
// individual threads to find out how many distinct signals can be // individual threads to find out how many distinct signals can be
// received. // received.
...@@ -94,14 +99,27 @@ namespace zmq ...@@ -94,14 +99,27 @@ namespace zmq
~dispatcher_t (); ~dispatcher_t ();
// Returns the app thread associated with the current thread. struct app_thread_info_t
// NULL if we are out of app thread slots. {
class app_thread_t *choose_app_thread (); // If false, 0MQ application thread is free, there's no associated
// OS thread.
bool associated;
// ID of the associated OS thread. If 'associated' is false,
// this field contains bogus data.
thread_t::id_t tid;
// Pointer to the 0MQ application thread object.
class app_thread_t *app_thread;
};
// Application threads. // Application threads.
typedef std::vector <class app_thread_t*> app_threads_t; typedef std::vector <app_thread_info_t> app_threads_t;
app_threads_t app_threads; app_threads_t app_threads;
// Synchronisation of accesses to shared application thread data.
mutex_t app_threads_sync;
// I/O threads. // I/O threads.
typedef std::vector <class io_thread_t*> io_threads_t; typedef std::vector <class io_thread_t*> io_threads_t;
io_threads_t io_threads; io_threads_t io_threads;
...@@ -116,9 +134,6 @@ namespace zmq ...@@ -116,9 +134,6 @@ namespace zmq
// NxN matrix of command pipes. // NxN matrix of command pipes.
command_pipe_t *command_pipes; command_pipe_t *command_pipes;
// Synchronisation of accesses to shared thread data.
mutex_t threads_sync;
// As pipes may reside in orphaned state in particular moments // As pipes may reside in orphaned state in particular moments
// of the pipe shutdown process, i.e. neither pipe reader nor // of the pipe shutdown process, i.e. neither pipe reader nor
// pipe writer hold reference to the pipe, we have to hold references // pipe writer hold reference to the pipe, we have to hold references
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment