Commit 5d42fe1b authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #1349 from somdoron/master

problem: client and server sockets are not thread safe
parents 88c6e696 5a897f75
......@@ -423,6 +423,7 @@ set(cxx-sources
kqueue.cpp
lb.cpp
mailbox.cpp
mailbox_safe.cpp
mechanism.cpp
metadata.cpp
msg.cpp
......
......@@ -31,6 +31,7 @@ src_libzmq_la_SOURCES = \
src/clock.cpp \
src/clock.hpp \
src/command.hpp \
src/condition_variable.hpp \
src/config.hpp \
src/ctx.cpp \
src/ctx.hpp \
......@@ -63,6 +64,7 @@ src_libzmq_la_SOURCES = \
src/i_encoder.hpp \
src/i_engine.hpp \
src/i_decoder.hpp \
src/i_mailbox.hpp \
src/i_poll_events.hpp \
src/io_object.cpp \
src/io_object.hpp \
......@@ -83,6 +85,8 @@ src_libzmq_la_SOURCES = \
src/likely.hpp \
src/mailbox.cpp \
src/mailbox.hpp \
src/mailbox_safe.cpp \
src/mailbox_safe.hpp \
src/mechanism.cpp \
src/mechanism.hpp \
src/metadata.cpp \
......@@ -349,7 +353,8 @@ test_apps = \
tests/test_atomics \
tests/test_client_server \
tests/test_server_drop_more \
tests/test_client_drop_more
tests/test_client_drop_more \
tests/test_thread_safe
tests_test_system_SOURCES = tests/test_system.cpp
tests_test_system_LDADD = src/libzmq.la
......@@ -530,6 +535,11 @@ tests_test_server_drop_more_LDADD = src/libzmq.la
tests_test_client_drop_more_SOURCES = tests/test_client_drop_more.cpp
tests_test_client_drop_more_LDADD = src/libzmq.la
tests_test_thread_safe_SOURCES = tests/test_thread_safe.cpp
tests_test_thread_safe_LDADD = src/libzmq.la
if !ON_MINGW
if !ON_CYGWIN
test_apps += \
......
......@@ -86,5 +86,14 @@
#cmakedefine ZMQ_HAVE_WINDOWS
#ifdef ZMQ_HAVE_WINDOWS
#if defined _WIN32_WINNT && _WIN32_WINNT < 0x0600
#undef _WIN32_WINNT
#endif
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#endif
#endif
#endif
\ No newline at end of file
......@@ -29,4 +29,13 @@
#define ZMQ_HAVE_WINDOWS
#if defined _WIN32_WINNT && _WIN32_WINNT < 0x0600
#undef _WIN32_WINNT
#endif
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#endif
#endif
......@@ -29,4 +29,12 @@
#define ZMQ_HAVE_WINDOWS
#if defined _WIN32_WINNT && _WIN32_WINNT < 0x0600
#undef _WIN32_WINNT
#endif
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#endif
#endif
......@@ -76,6 +76,7 @@
<ClInclude Include="..\..\..\..\src\client.hpp" />
<ClInclude Include="..\..\..\..\src\clock.hpp" />
<ClInclude Include="..\..\..\..\src\command.hpp" />
<ClInclude Include="..\..\..\..\src\condition_variable.hpp" />
<ClInclude Include="..\..\..\..\src\config.hpp" />
<ClInclude Include="..\..\..\..\src\ctx.hpp" />
<ClInclude Include="..\..\..\..\src\curve_client.hpp" />
......@@ -103,6 +104,7 @@
<ClInclude Include="..\..\..\..\src\lb.hpp" />
<ClInclude Include="..\..\..\..\src\likely.hpp" />
<ClInclude Include="..\..\..\..\src\mailbox.hpp" />
<ClInclude Include="..\..\..\..\src\mailbox_safe.hpp" />
<ClInclude Include="..\..\..\..\src\msg.hpp" />
<ClInclude Include="..\..\..\..\src\mtrie.hpp" />
<ClInclude Include="..\..\..\..\src\mutex.hpp" />
......@@ -183,6 +185,7 @@
<ClCompile Include="..\..\..\..\src\kqueue.cpp" />
<ClCompile Include="..\..\..\..\src\lb.cpp" />
<ClCompile Include="..\..\..\..\src\mailbox.cpp" />
<ClCompile Include="..\..\..\..\src\mailbox_safe.cpp" />
<ClCompile Include="..\..\..\..\src\mechanism.cpp" />
<ClCompile Include="..\..\..\..\src\metadata.cpp" />
<ClCompile Include="..\..\..\..\src\msg.cpp" />
......
......@@ -232,6 +232,9 @@
<ClCompile Include="..\..\..\..\src\client.cpp">
<Filter>src</Filter>
</ClCompile>
<ClCompile Include="..\..\..\..\src\mailbox_safe.cpp">
<Filter>src</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\..\..\include\zmq_utils.h">
......@@ -504,6 +507,12 @@
<ClInclude Include="..\..\..\..\src\client.hpp">
<Filter>src\include</Filter>
</ClInclude>
<ClInclude Include="..\..\..\..\src\mailbox_safe.hpp">
<Filter>src\include</Filter>
</ClInclude>
<ClInclude Include="..\..\..\..\src\condition_variable.hpp">
<Filter>src\include</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<Filter Include="include">
......
......@@ -22,7 +22,7 @@
#include "msg.hpp"
zmq::client_t::client_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
socket_base_t (parent_, tid_, sid_, true)
{
options.type = ZMQ_CLIENT;
}
......
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__
#define __ZMQ_CONDITON_VARIABLE_HPP_INCLUDED__
#include "platform.hpp"
#include "err.hpp"
#include "mutex.hpp"
// Condition variable class encapsulates OS mutex in a platform-independent way.
#ifdef ZMQ_HAVE_WINDOWS
#include "windows.hpp"
namespace zmq
{
class condition_variable_t
{
public:
inline condition_variable_t ()
{
InitializeConditionVariable (&cv);
}
inline ~condition_variable_t ()
{
}
inline int wait (mutex_t* mutex_, int timeout_ )
{
int rc = SleepConditionVariableCS(&cv, mutex_->get_cs (), timeout_);
if (rc != 0)
return 0;
rc = GetLastError();
if (rc != ERROR_TIMEOUT)
win_assert(rc);
errno = EAGAIN;
return -1;
}
inline void broadcast ()
{
WakeAllConditionVariable(&cv);
}
private:
CONDITION_VARIABLE cv;
// Disable copy construction and assignment.
condition_variable_t (const condition_variable_t&);
void operator = (const condition_variable_t&);
};
}
#else
#include <pthread.h>
namespace zmq
{
class condition_variable_t
{
public:
inline condition_variable_t ()
{
int rc = pthread_cond_init (&cond, NULL);
posix_assert (rc);
}
inline ~condition_variable_t ()
{
int rc = pthread_cond_destroy (&cond);
posix_assert (rc);
}
inline int wait (mutex_t* mutex_, int timeout_)
{
int rc;
if (timeout_ != -1) {
struct timespec timeout;
clock_gettime(CLOCK_REALTIME, &timeout);
timeout.tv_sec += timeout_ / 1000;
timeout.tv_nsec += (timeout_ % 1000) * 1000000;
rc = pthread_cond_timedwait (&cond, mutex_->get_mutex (), &timeout);
}
else
rc = pthread_cond_wait(&cond, mutex_->get_mutex());
if (rc == 0)
return 0;
if (rc == ETIMEDOUT){
errno= EAGAIN;
return -1;
}
posix_assert (rc);
return -1;
}
inline void broadcast ()
{
int rc = pthread_cond_broadcast (&cond);
posix_assert (rc);
}
private:
pthread_cond_t cond;
// Disable copy construction and assignment.
condition_variable_t (const condition_variable_t&);
const condition_variable_t &operator = (const condition_variable_t&);
};
}
#endif
#endif
......@@ -273,7 +273,7 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
int ios = io_thread_count;
opt_sync.unlock ();
slot_count = mazmq + ios + 2;
slots = (mailbox_t **) malloc (sizeof (mailbox_t*) * slot_count);
slots = (i_mailbox **) malloc (sizeof (i_mailbox*) * slot_count);
alloc_assert (slots);
// Initialise the infrastructure for zmq_ctx_term thread.
......
......@@ -162,7 +162,7 @@ namespace zmq
// Array of pointers to mailboxes for both application and I/O threads.
uint32_t slot_count;
mailbox_t **slots;
i_mailbox **slots;
// Mailbox for zmq_term thread.
mailbox_t term_mailbox;
......
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_MAILBOX_HPP_INCLUDED__
#define __ZMQ_I_MAILBOX_HPP_INCLUDED__
#include "stdint.hpp"
namespace zmq
{
// Interface to be implemented by mailbox.
class i_mailbox
{
public:
virtual ~i_mailbox () {}
virtual void send (const command_t &cmd_) = 0;
virtual int recv (command_t *cmd_, int timeout_) = 0;
#ifdef HAVE_FORK
// close the file descriptors in the signaller. This is used in a forked
// child process to close the file descriptors so that they do not interfere
// with the context in the parent process.
virtual void forked () = 0;
#endif
};
}
#endif
......@@ -29,11 +29,12 @@
#include "command.hpp"
#include "ypipe.hpp"
#include "mutex.hpp"
#include "i_mailbox.hpp"
namespace zmq
{
class mailbox_t
class mailbox_t : public i_mailbox
{
public:
......
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mailbox_safe.hpp"
#include "err.hpp"
zmq::mailbox_safe_t::mailbox_safe_t (mutex_t* socket_mutex_) :
socket_mutex (socket_mutex_)
{
// Get the pipe into passive state. That way, if the users starts by
// polling on the associated file descriptor it will get woken up when
// new command is posted.
const bool ok = cpipe.read (NULL);
zmq_assert (!ok);
}
zmq::mailbox_safe_t::~mailbox_safe_t ()
{
// TODO: Retrieve and deallocate commands inside the cpipe.
// Work around problem that other threads might still be in our
// send() method, by waiting on the mutex before disappearing.
sync.lock ();
sync.unlock ();
}
void zmq::mailbox_safe_t::add_signaler(signaler_t* signaler)
{
sync.lock();
signalers.push_back(signaler);
sync.unlock();
}
void zmq::mailbox_safe_t::remove_signaler(signaler_t* signaler)
{
sync.lock();
std::vector<signaler_t*>::iterator it = signalers.begin();
// TODO: make a copy of array and signal outside the lock
for (; it != signalers.end(); ++it){
if (*it == signaler)
break;
}
if (it != signalers.end())
signalers.erase(it);
sync.unlock();
}
void zmq::mailbox_safe_t::send (const command_t &cmd_)
{
sync.lock ();
cpipe.write (cmd_, false);
const bool ok = cpipe.flush ();
if (!ok) {
for (std::vector<signaler_t*>::iterator it = signalers.begin(); it != signalers.end(); ++it){
(*it)->send();
}
}
sync.unlock ();
if (!ok)
cond_var.broadcast ();
}
int zmq::mailbox_safe_t::recv (command_t *cmd_, int timeout_)
{
// Try to get the command straight away.
if (cpipe.read (cmd_))
return 0;
if (timeout_ == 0) {
errno = EAGAIN;
return -1;
}
// Wait for signal from the command sender.
int rc = cond_var.wait (socket_mutex, timeout_);
if (rc == -1) {
errno_assert (errno == EAGAIN || errno == EINTR);
return -1;
}
// Another thread may already fetch the command
const bool ok = cpipe.read (cmd_);
if (!ok) {
errno = EAGAIN;
return -1;
}
return 0;
}
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
void send (const command_t &cmd_);
int recv (command_t *cmd_, int timeout_);
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_MAILBOX_SAFE_HPP_INCLUDED__
#define __ZMQ_MAILBOX_SAFE_HPP_INCLUDED__
#include <vector>
#include <stddef.h>
#include "platform.hpp"
#include "signaler.hpp"
#include "fd.hpp"
#include "config.hpp"
#include "command.hpp"
#include "ypipe.hpp"
#include "mutex.hpp"
#include "i_mailbox.hpp"
#include "condition_variable.hpp"
namespace zmq
{
class mailbox_safe_t : public i_mailbox
{
public:
mailbox_safe_t (mutex_t* socket_mutex_);
~mailbox_safe_t ();
void send (const command_t &cmd_);
int recv (command_t *cmd_, int timeout_);
// Add signaler to mailbox which will be called when a message is ready
void add_signaler(signaler_t* signaler);
void remove_signaler(signaler_t* signaler);
#ifdef HAVE_FORK
// close the file descriptors in the signaller. This is used in a forked
// child process to close the file descriptors so that they do not interfere
// with the context in the parent process.
void forked ()
{
// TODO: call fork on the condition variable
}
#endif
private:
// The pipe to store actual commands.
typedef ypipe_t <command_t, command_pipe_granularity> cpipe_t;
cpipe_t cpipe;
// Condition variable to pass signals from writer thread to reader thread.
condition_variable_t cond_var;
// There's only one thread receiving from the mailbox, but there
// is arbitrary number of threads sending. Given that ypipe requires
// synchronised access on both of its endpoints, we have to synchronise
// the sending side.
mutex_t sync;
mutex_t* socket_mutex;
std::vector <zmq::signaler_t* > signalers;
// Disable copying of mailbox_t object.
mailbox_safe_t (const mailbox_safe_t&);
const mailbox_safe_t &operator = (const mailbox_safe_t&);
};
}
#endif
......@@ -60,6 +60,11 @@ namespace zmq
LeaveCriticalSection (&cs);
}
inline CRITICAL_SECTION* get_cs()
{
return &cs;
}
private:
CRITICAL_SECTION cs;
......@@ -115,6 +120,11 @@ namespace zmq
posix_assert (rc);
}
inline pthread_mutex_t* get_mutex()
{
return &mutex;
}
private:
pthread_mutex_t mutex;
......
......@@ -25,7 +25,7 @@
#include "err.hpp"
zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
socket_base_t (parent_, tid_, sid_, true),
next_rid (generate_random ())
{
options.type = ZMQ_SERVER;
......
This diff is collapsed.
......@@ -31,7 +31,7 @@
#include "poller.hpp"
#include "atomic_counter.hpp"
#include "i_poll_events.hpp"
#include "mailbox.hpp"
#include "i_mailbox.hpp"
#include "stdint.hpp"
#include "clock.hpp"
#include "pipe.hpp"
......@@ -66,7 +66,7 @@ namespace zmq
uint32_t tid_, int sid_);
// Returns the mailbox associated with this socket.
mailbox_t *get_mailbox ();
i_mailbox *get_mailbox ();
// Interrupt blocking call if the socket is stuck in one.
// This function can be called from a different thread!
......@@ -123,7 +123,7 @@ namespace zmq
protected:
socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_ = false);
virtual ~socket_base_t ();
// Concrete algorithms for the x- methods are to be defined by
......@@ -223,7 +223,7 @@ namespace zmq
void process_term (int linger_);
// Socket's mailbox object.
mailbox_t mailbox;
i_mailbox* mailbox;
// List of attached pipes.
typedef array_t <pipe_t, 3> pipes_t;
......@@ -257,9 +257,17 @@ namespace zmq
// Last socket endpoint resolved URI
std::string last_endpoint;
socket_base_t (const socket_base_t&);
const socket_base_t &operator = (const socket_base_t&);
// Indicate if the socket is thread safe
bool thread_safe;
// Signaler to be used in the reaping stage
signaler_t* reaper_signaler;
// Mutex for synchronize access to the socket in thread safe mode
mutex_t sync;
socket_base_t (const socket_base_t&);
const socket_base_t &operator = (const socket_base_t&);
};
}
......
......@@ -45,6 +45,8 @@ set(tests
test_connect_rid
test_xpub_nodrop
test_pub_invert_matching
test_thread_safe
test_client_server
)
if(NOT WIN32)
list(APPEND tests
......
/*:
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
void worker1(void* s);
void worker2(void* s);
int main (void)
{
setup_test_environment();
void *ctx = zmq_ctx_new ();
assert (ctx);
void *client = zmq_socket (ctx, ZMQ_CLIENT);
void *client2 = zmq_socket (ctx, ZMQ_CLIENT);
int rc;
rc = zmq_bind (client, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = zmq_connect (client2, "tcp://127.0.0.1:5560");
assert (rc == 0);
void* t1 = zmq_threadstart(worker1, client2);
void* t2 = zmq_threadstart(worker2, client2);
char data[1];
data[0] = 0;
for (int i=0; i < 10; i++) {
rc = zmq_send_const(client, data, 1, 0);
assert (rc == 1);
rc = zmq_send_const(client, data, 1, 0);
assert(rc == 1);
char a, b;
rc = zmq_recv(client, &a, 1, 0);
assert(rc == 1);
rc = zmq_recv(client, &b, 1, 0);
assert(rc == 1);
// make sure they came from different threads
assert((a == 1 && b == 2) || (a == 2 && b == 1));
}
// make the thread exit
data[0] = 1;
rc = zmq_send_const(client, data, 1, 0);
assert (rc == 1);
rc = zmq_send_const(client, data, 1, 0);
assert(rc == 1);
zmq_threadclose(t1);
zmq_threadclose(t2);
rc = zmq_close (client2);
assert (rc == 0);
rc = zmq_close (client);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
}
void worker1(void* s)
{
const char worker_id = 1;
char c;
while (true)
{
int rc = zmq_recv(s, &c,1, 0);
assert(rc == 1);
if (c == 0)
{
msleep(10);
rc = zmq_send_const(s,&worker_id, 1, 0);
assert(rc == 1);
}
else
{
// we got exit request
break;
}
}
}
void worker2(void* s)
{
const char worker_id = 2;
char c;
while (true)
{
int rc = zmq_recv(s, &c,1, 0);
assert(rc == 1);
assert(c == 1 || c == 0);
if (c == 0)
{
msleep(10);
rc = zmq_send_const(s,&worker_id, 1, 0);
assert(rc == 1);
}
else
{
// we got exit request
break;
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment