Commit 55886b8b authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #1352 from somdoron/master

thread safety bug - lock when sending
parents cac4d7aa deaa8965
......@@ -20,8 +20,8 @@
#include "mailbox_safe.hpp"
#include "err.hpp"
zmq::mailbox_safe_t::mailbox_safe_t (mutex_t* socket_mutex_) :
socket_mutex (socket_mutex_)
zmq::mailbox_safe_t::mailbox_safe_t (mutex_t* sync_) :
sync (sync_)
{
// Get the pipe into passive state. That way, if the users starts by
// polling on the associated file descriptor it will get woken up when
......@@ -36,20 +36,17 @@ zmq::mailbox_safe_t::~mailbox_safe_t ()
// Work around problem that other threads might still be in our
// send() method, by waiting on the mutex before disappearing.
sync.lock ();
sync.unlock ();
sync->lock ();
sync->unlock ();
}
void zmq::mailbox_safe_t::add_signaler(signaler_t* signaler)
{
sync.lock();
signalers.push_back(signaler);
sync.unlock();
}
void zmq::mailbox_safe_t::remove_signaler(signaler_t* signaler)
{
sync.lock();
std::vector<signaler_t*>::iterator it = signalers.begin();
// TODO: make a copy of array and signal outside the lock
......@@ -60,25 +57,22 @@ void zmq::mailbox_safe_t::remove_signaler(signaler_t* signaler)
if (it != signalers.end())
signalers.erase(it);
sync.unlock();
}
void zmq::mailbox_safe_t::send (const command_t &cmd_)
{
sync.lock ();
sync->lock ();
cpipe.write (cmd_, false);
const bool ok = cpipe.flush ();
if (!ok) {
cond_var.broadcast ();
for (std::vector<signaler_t*>::iterator it = signalers.begin(); it != signalers.end(); ++it){
(*it)->send();
}
}
sync.unlock ();
if (!ok)
cond_var.broadcast ();
sync->unlock ();
}
int zmq::mailbox_safe_t::recv (command_t *cmd_, int timeout_)
......@@ -87,13 +81,8 @@ int zmq::mailbox_safe_t::recv (command_t *cmd_, int timeout_)
if (cpipe.read (cmd_))
return 0;
if (timeout_ == 0) {
errno = EAGAIN;
return -1;
}
// Wait for signal from the command sender.
int rc = cond_var.wait (socket_mutex, timeout_);
int rc = cond_var.wait (sync, timeout_);
if (rc == -1) {
errno_assert (errno == EAGAIN || errno == EINTR);
return -1;
......
......@@ -43,7 +43,7 @@ namespace zmq
{
public:
mailbox_safe_t (mutex_t* socket_mutex_);
mailbox_safe_t (mutex_t* sync_);
~mailbox_safe_t ();
void send (const command_t &cmd_);
......@@ -72,13 +72,8 @@ namespace zmq
// Condition variable to pass signals from writer thread to reader thread.
condition_variable_t cond_var;
// There's only one thread receiving from the mailbox, but there
// is arbitrary number of threads sending. Given that ypipe requires
// synchronised access on both of its endpoints, we have to synchronise
// the sending side.
mutex_t sync;
mutex_t* socket_mutex;
// Synchronize access to the mailbox from receivers and senders
mutex_t* sync;
std::vector <zmq::signaler_t* > signalers;
......
......@@ -88,7 +88,13 @@ namespace zmq
public:
inline mutex_t ()
{
int rc = pthread_mutex_init (&mutex, NULL);
int rc = pthread_mutexattr_init(&attr);
posix_assert (rc);
rc = pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
posix_assert (rc);
rc = pthread_mutex_init (&mutex, &attr);
posix_assert (rc);
}
......@@ -96,6 +102,9 @@ namespace zmq
{
int rc = pthread_mutex_destroy (&mutex);
posix_assert (rc);
rc = pthread_mutexattr_destroy (&attr);
posix_assert (rc);
}
inline void lock ()
......@@ -128,6 +137,7 @@ namespace zmq
private:
pthread_mutex_t mutex;
pthread_mutexattr_t attr;
// Disable copy construction and assignment.
mutex_t (const mutex_t&);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment