Commit 9201009f authored by a4z's avatar a4z

Problem: socket_base uses macros for mutex lock and unlock

Solution: add a lock guard that takes optional a mutex and use it
parent fb34c323
...@@ -181,6 +181,33 @@ namespace zmq ...@@ -181,6 +181,33 @@ namespace zmq
scoped_lock_t (const scoped_lock_t&); scoped_lock_t (const scoped_lock_t&);
const scoped_lock_t &operator = (const scoped_lock_t&); const scoped_lock_t &operator = (const scoped_lock_t&);
}; };
struct scoped_optional_lock_t
{
scoped_optional_lock_t (mutex_t* mutex_)
: mutex (mutex_)
{
if(mutex != 0)
mutex->lock ();
}
~scoped_optional_lock_t ()
{
if(mutex != 0)
mutex->unlock ();
}
private:
mutex_t* mutex;
// Disable copy construction and assignment.
scoped_optional_lock_t (const scoped_lock_t&);
const scoped_optional_lock_t &operator = (const scoped_lock_t&);
};
} }
#endif #endif
...@@ -97,13 +97,7 @@ ...@@ -97,13 +97,7 @@
#include "scatter.hpp" #include "scatter.hpp"
#include "dgram.hpp" #include "dgram.hpp"
#define ENTER_MUTEX() \
if (thread_safe) \
sync.lock();
#define EXIT_MUTEX(); \
if (thread_safe) \
sync.unlock();
bool zmq::socket_base_t::check_tag () bool zmq::socket_base_t::check_tag ()
{ {
...@@ -338,24 +332,21 @@ void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_) ...@@ -338,24 +332,21 @@ void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
int zmq::socket_base_t::setsockopt (int option_, const void *optval_, int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
ENTER_MUTEX (); scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0);
if (!options.is_valid(option_)) { if (!options.is_valid(option_)) {
errno = EINVAL; errno = EINVAL;
EXIT_MUTEX ();
return -1; return -1;
} }
if (unlikely (ctx_terminated)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
EXIT_MUTEX ();
return -1; return -1;
} }
// First, check whether specific socket type overloads the option. // First, check whether specific socket type overloads the option.
int rc = xsetsockopt (option_, optval_, optvallen_); int rc = xsetsockopt (option_, optval_, optvallen_);
if (rc == 0 || errno != EINVAL) { if (rc == 0 || errno != EINVAL) {
EXIT_MUTEX ();
return rc; return rc;
} }
...@@ -364,64 +355,55 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, ...@@ -364,64 +355,55 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
rc = options.setsockopt (option_, optval_, optvallen_); rc = options.setsockopt (option_, optval_, optvallen_);
update_pipe_options(option_); update_pipe_options(option_);
EXIT_MUTEX ();
return rc; return rc;
} }
int zmq::socket_base_t::getsockopt (int option_, void *optval_, int zmq::socket_base_t::getsockopt (int option_, void *optval_,
size_t *optvallen_) size_t *optvallen_)
{ {
ENTER_MUTEX (); scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0);
if (unlikely (ctx_terminated)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
EXIT_MUTEX ();
return -1; return -1;
} }
if (option_ == ZMQ_RCVMORE) { if (option_ == ZMQ_RCVMORE) {
if (*optvallen_ < sizeof (int)) { if (*optvallen_ < sizeof (int)) {
errno = EINVAL; errno = EINVAL;
EXIT_MUTEX ();
return -1; return -1;
} }
memset(optval_, 0, *optvallen_); memset(optval_, 0, *optvallen_);
*((int*) optval_) = rcvmore ? 1 : 0; *((int*) optval_) = rcvmore ? 1 : 0;
*optvallen_ = sizeof (int); *optvallen_ = sizeof (int);
EXIT_MUTEX ();
return 0; return 0;
} }
if (option_ == ZMQ_FD) { if (option_ == ZMQ_FD) {
if (*optvallen_ < sizeof (fd_t)) { if (*optvallen_ < sizeof (fd_t)) {
errno = EINVAL; errno = EINVAL;
EXIT_MUTEX ();
return -1; return -1;
} }
if (thread_safe) { if (thread_safe) {
// thread safe socket doesn't provide file descriptor // thread safe socket doesn't provide file descriptor
errno = EINVAL; errno = EINVAL;
EXIT_MUTEX ();
return -1; return -1;
} }
*((fd_t*)optval_) = ((mailbox_t*)mailbox)->get_fd(); *((fd_t*)optval_) = ((mailbox_t*)mailbox)->get_fd();
*optvallen_ = sizeof(fd_t); *optvallen_ = sizeof(fd_t);
EXIT_MUTEX ();
return 0; return 0;
} }
if (option_ == ZMQ_EVENTS) { if (option_ == ZMQ_EVENTS) {
if (*optvallen_ < sizeof (int)) { if (*optvallen_ < sizeof (int)) {
errno = EINVAL; errno = EINVAL;
EXIT_MUTEX ();
return -1; return -1;
} }
int rc = process_commands (0, false); int rc = process_commands (0, false);
if (rc != 0 && (errno == EINTR || errno == ETERM)) { if (rc != 0 && (errno == EINTR || errno == ETERM)) {
EXIT_MUTEX ();
return -1; return -1;
} }
errno_assert (rc == 0); errno_assert (rc == 0);
...@@ -431,108 +413,94 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, ...@@ -431,108 +413,94 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
if (has_in ()) if (has_in ())
*((int*) optval_) |= ZMQ_POLLIN; *((int*) optval_) |= ZMQ_POLLIN;
*optvallen_ = sizeof (int); *optvallen_ = sizeof (int);
EXIT_MUTEX ();
return 0; return 0;
} }
if (option_ == ZMQ_LAST_ENDPOINT) { if (option_ == ZMQ_LAST_ENDPOINT) {
if (*optvallen_ < last_endpoint.size () + 1) { if (*optvallen_ < last_endpoint.size () + 1) {
errno = EINVAL; errno = EINVAL;
EXIT_MUTEX ();
return -1; return -1;
} }
strncpy(static_cast <char *> (optval_), last_endpoint.c_str(), last_endpoint.size() + 1); strncpy(static_cast <char *> (optval_), last_endpoint.c_str(), last_endpoint.size() + 1);
*optvallen_ = last_endpoint.size () + 1; *optvallen_ = last_endpoint.size () + 1;
EXIT_MUTEX ();
return 0; return 0;
} }
if (option_ == ZMQ_THREAD_SAFE) { if (option_ == ZMQ_THREAD_SAFE) {
if (*optvallen_ < sizeof (int)) { if (*optvallen_ < sizeof (int)) {
errno = EINVAL; errno = EINVAL;
EXIT_MUTEX ();
return -1; return -1;
} }
memset(optval_, 0, *optvallen_); memset(optval_, 0, *optvallen_);
*((int*) optval_) = thread_safe ? 1 : 0; *((int*) optval_) = thread_safe ? 1 : 0;
*optvallen_ = sizeof (int); *optvallen_ = sizeof (int);
EXIT_MUTEX ();
return 0; return 0;
} }
int rc = options.getsockopt (option_, optval_, optvallen_); int rc = options.getsockopt (option_, optval_, optvallen_);
EXIT_MUTEX ();
return rc; return rc;
} }
int zmq::socket_base_t::join (const char* group_) int zmq::socket_base_t::join (const char* group_)
{ {
ENTER_MUTEX (); scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0);
int rc = xjoin (group_); int rc = xjoin (group_);
EXIT_MUTEX();
return rc; return rc;
} }
int zmq::socket_base_t::leave (const char* group_) int zmq::socket_base_t::leave (const char* group_)
{ {
ENTER_MUTEX (); scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0);
int rc = xleave (group_); int rc = xleave (group_);
EXIT_MUTEX();
return rc; return rc;
} }
int zmq::socket_base_t::add_signaler(signaler_t *s_) int zmq::socket_base_t::add_signaler(signaler_t *s_)
{ {
ENTER_MUTEX (); scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0);
if (!thread_safe) { if (!thread_safe) {
errno = EINVAL; errno = EINVAL;
EXIT_MUTEX ();
return -1; return -1;
} }
((mailbox_safe_t*)mailbox)->add_signaler(s_); ((mailbox_safe_t*)mailbox)->add_signaler(s_);
EXIT_MUTEX ();
return 0; return 0;
} }
int zmq::socket_base_t::remove_signaler(signaler_t *s_) int zmq::socket_base_t::remove_signaler(signaler_t *s_)
{ {
ENTER_MUTEX (); scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0);
if (!thread_safe) { if (!thread_safe) {
errno = EINVAL; errno = EINVAL;
EXIT_MUTEX ();
return -1; return -1;
} }
((mailbox_safe_t*)mailbox)->remove_signaler(s_); ((mailbox_safe_t*)mailbox)->remove_signaler(s_);
EXIT_MUTEX ();
return 0; return 0;
} }
int zmq::socket_base_t::bind (const char *addr_) int zmq::socket_base_t::bind (const char *addr_)
{ {
ENTER_MUTEX (); scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0);
if (unlikely (ctx_terminated)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
EXIT_MUTEX ();
return -1; return -1;
} }
// Process pending commands, if any. // Process pending commands, if any.
int rc = process_commands (0, false); int rc = process_commands (0, false);
if (unlikely (rc != 0)) { if (unlikely (rc != 0)) {
EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -540,7 +508,6 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -540,7 +508,6 @@ int zmq::socket_base_t::bind (const char *addr_)
std::string protocol; std::string protocol;
std::string address; std::string address;
if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) { if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -552,14 +519,12 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -552,14 +519,12 @@ int zmq::socket_base_t::bind (const char *addr_)
last_endpoint.assign (addr_); last_endpoint.assign (addr_);
options.connected = true; options.connected = true;
} }
EXIT_MUTEX ();
return rc; return rc;
} }
if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") { if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") {
// For convenience's sake, bind can be used interchangeable with // For convenience's sake, bind can be used interchangeable with
// connect for PGM, EPGM, NORM transports. // connect for PGM, EPGM, NORM transports.
EXIT_MUTEX ();
rc = connect (addr_); rc = connect (addr_);
if (rc != -1) if (rc != -1)
options.connected = true; options.connected = true;
...@@ -569,7 +534,6 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -569,7 +534,6 @@ int zmq::socket_base_t::bind (const char *addr_)
if (protocol == "udp") { if (protocol == "udp") {
if (!(options.type == ZMQ_DGRAM || options.type == ZMQ_DISH)) { if (!(options.type == ZMQ_DGRAM || options.type == ZMQ_DISH)) {
errno = ENOCOMPATPROTO; errno = ENOCOMPATPROTO;
EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -577,7 +541,6 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -577,7 +541,6 @@ int zmq::socket_base_t::bind (const char *addr_)
io_thread_t *io_thread = choose_io_thread (options.affinity); io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) { if (!io_thread) {
errno = EMTHREAD; errno = EMTHREAD;
EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -589,7 +552,6 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -589,7 +552,6 @@ int zmq::socket_base_t::bind (const char *addr_)
rc = paddr->resolved.udp_addr->resolve (address.c_str(), true); rc = paddr->resolved.udp_addr->resolve (address.c_str(), true);
if (rc != 0) { if (rc != 0) {
LIBZMQ_DELETE(paddr); LIBZMQ_DELETE(paddr);
EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -620,7 +582,6 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -620,7 +582,6 @@ int zmq::socket_base_t::bind (const char *addr_)
add_endpoint (addr_, (own_t *) session, newpipe); add_endpoint (addr_, (own_t *) session, newpipe);
EXIT_MUTEX ();
return 0; return 0;
} }
...@@ -629,7 +590,6 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -629,7 +590,6 @@ int zmq::socket_base_t::bind (const char *addr_)
io_thread_t *io_thread = choose_io_thread (options.affinity); io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) { if (!io_thread) {
errno = EMTHREAD; errno = EMTHREAD;
EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -641,7 +601,6 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -641,7 +601,6 @@ int zmq::socket_base_t::bind (const char *addr_)
if (rc != 0) { if (rc != 0) {
LIBZMQ_DELETE(listener); LIBZMQ_DELETE(listener);
event_bind_failed (address, zmq_errno()); event_bind_failed (address, zmq_errno());
EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -650,7 +609,6 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -650,7 +609,6 @@ int zmq::socket_base_t::bind (const char *addr_)
add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL); add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
options.connected = true; options.connected = true;
EXIT_MUTEX ();
return 0; return 0;
} }
...@@ -663,7 +621,6 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -663,7 +621,6 @@ int zmq::socket_base_t::bind (const char *addr_)
if (rc != 0) { if (rc != 0) {
LIBZMQ_DELETE(listener); LIBZMQ_DELETE(listener);
event_bind_failed (address, zmq_errno()); event_bind_failed (address, zmq_errno());
EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -672,7 +629,6 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -672,7 +629,6 @@ int zmq::socket_base_t::bind (const char *addr_)
add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL); add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
options.connected = true; options.connected = true;
EXIT_MUTEX ();
return 0; return 0;
} }
#endif #endif
...@@ -685,7 +641,6 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -685,7 +641,6 @@ int zmq::socket_base_t::bind (const char *addr_)
if (rc != 0) { if (rc != 0) {
LIBZMQ_DELETE(listener); LIBZMQ_DELETE(listener);
event_bind_failed (address, zmq_errno()); event_bind_failed (address, zmq_errno());
EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -694,7 +649,6 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -694,7 +649,6 @@ int zmq::socket_base_t::bind (const char *addr_)
add_endpoint (addr_, (own_t *) listener, NULL); add_endpoint (addr_, (own_t *) listener, NULL);
options.connected = true; options.connected = true;
EXIT_MUTEX ();
return 0; return 0;
} }
#endif #endif
...@@ -707,7 +661,6 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -707,7 +661,6 @@ int zmq::socket_base_t::bind (const char *addr_)
if (rc != 0) { if (rc != 0) {
LIBZMQ_DELETE(listener); LIBZMQ_DELETE(listener);
event_bind_failed (address, zmq_errno ()); event_bind_failed (address, zmq_errno ());
EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -715,30 +668,26 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -715,30 +668,26 @@ int zmq::socket_base_t::bind (const char *addr_)
add_endpoint (last_endpoint.c_str(), (own_t *) listener, NULL); add_endpoint (last_endpoint.c_str(), (own_t *) listener, NULL);
options.connected = true; options.connected = true;
EXIT_MUTEX ();
return 0; return 0;
} }
#endif #endif
EXIT_MUTEX ();
zmq_assert (false); zmq_assert (false);
return -1; return -1;
} }
int zmq::socket_base_t::connect (const char *addr_) int zmq::socket_base_t::connect (const char *addr_)
{ {
ENTER_MUTEX (); scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0);
if (unlikely (ctx_terminated)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
EXIT_MUTEX ();
return -1; return -1;
} }
// Process pending commands, if any. // Process pending commands, if any.
int rc = process_commands (0, false); int rc = process_commands (0, false);
if (unlikely (rc != 0)) { if (unlikely (rc != 0)) {
EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -746,7 +695,6 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -746,7 +695,6 @@ int zmq::socket_base_t::connect (const char *addr_)
std::string protocol; std::string protocol;
std::string address; std::string address;
if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) { if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -852,7 +800,6 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -852,7 +800,6 @@ int zmq::socket_base_t::connect (const char *addr_)
inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0])); inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0]));
options.connected = true; options.connected = true;
EXIT_MUTEX ();
return 0; return 0;
} }
bool is_single_connect = (options.type == ZMQ_DEALER || bool is_single_connect = (options.type == ZMQ_DEALER ||
...@@ -864,7 +811,6 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -864,7 +811,6 @@ int zmq::socket_base_t::connect (const char *addr_)
// There is no valid use for multiple connects for SUB-PUB nor // There is no valid use for multiple connects for SUB-PUB nor
// DEALER-ROUTER nor REQ-REP. Multiple connects produces // DEALER-ROUTER nor REQ-REP. Multiple connects produces
// nonsensical results. // nonsensical results.
EXIT_MUTEX ();
return 0; return 0;
} }
} }
...@@ -873,7 +819,6 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -873,7 +819,6 @@ int zmq::socket_base_t::connect (const char *addr_)
io_thread_t *io_thread = choose_io_thread (options.affinity); io_thread_t *io_thread = choose_io_thread (options.affinity);
if (!io_thread) { if (!io_thread) {
errno = EMTHREAD; errno = EMTHREAD;
EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -918,7 +863,6 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -918,7 +863,6 @@ int zmq::socket_base_t::connect (const char *addr_)
if (rc == -1) { if (rc == -1) {
errno = EINVAL; errno = EINVAL;
LIBZMQ_DELETE(paddr); LIBZMQ_DELETE(paddr);
EXIT_MUTEX ();
return -1; return -1;
} }
// Defer resolution until a socket is opened // Defer resolution until a socket is opened
...@@ -932,7 +876,6 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -932,7 +876,6 @@ int zmq::socket_base_t::connect (const char *addr_)
int rc = paddr->resolved.ipc_addr->resolve (address.c_str ()); int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
if (rc != 0) { if (rc != 0) {
LIBZMQ_DELETE(paddr); LIBZMQ_DELETE(paddr);
EXIT_MUTEX ();
return -1; return -1;
} }
} }
...@@ -942,7 +885,6 @@ if (protocol == "udp") { ...@@ -942,7 +885,6 @@ if (protocol == "udp") {
if (options.type != ZMQ_RADIO) { if (options.type != ZMQ_RADIO) {
errno = ENOCOMPATPROTO; errno = ENOCOMPATPROTO;
LIBZMQ_DELETE(paddr); LIBZMQ_DELETE(paddr);
EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -951,7 +893,6 @@ if (protocol == "udp") { ...@@ -951,7 +893,6 @@ if (protocol == "udp") {
rc = paddr->resolved.udp_addr->resolve (address.c_str(), false); rc = paddr->resolved.udp_addr->resolve (address.c_str(), false);
if (rc != 0) { if (rc != 0) {
LIBZMQ_DELETE(paddr); LIBZMQ_DELETE(paddr);
EXIT_MUTEX ();
return -1; return -1;
} }
} }
...@@ -966,7 +907,6 @@ if (protocol == "udp") { ...@@ -966,7 +907,6 @@ if (protocol == "udp") {
if (res != NULL) if (res != NULL)
pgm_freeaddrinfo (res); pgm_freeaddrinfo (res);
if (rc != 0 || port_number == 0) { if (rc != 0 || port_number == 0) {
EXIT_MUTEX ();
return -1; return -1;
} }
} }
...@@ -979,7 +919,6 @@ if (protocol == "udp") { ...@@ -979,7 +919,6 @@ if (protocol == "udp") {
int rc = paddr->resolved.tipc_addr->resolve (address.c_str()); int rc = paddr->resolved.tipc_addr->resolve (address.c_str());
if (rc != 0) { if (rc != 0) {
LIBZMQ_DELETE(paddr); LIBZMQ_DELETE(paddr);
EXIT_MUTEX ();
return -1; return -1;
} }
} }
...@@ -992,7 +931,6 @@ if (protocol == "udp") { ...@@ -992,7 +931,6 @@ if (protocol == "udp") {
int rc = paddr->resolved.vmci_addr->resolve (address.c_str ()); int rc = paddr->resolved.vmci_addr->resolve (address.c_str ());
if (rc != 0) { if (rc != 0) {
LIBZMQ_DELETE(paddr); LIBZMQ_DELETE(paddr);
EXIT_MUTEX ();
return -1; return -1;
} }
} }
...@@ -1038,7 +976,6 @@ if (protocol == "udp") { ...@@ -1038,7 +976,6 @@ if (protocol == "udp") {
paddr->to_string (last_endpoint); paddr->to_string (last_endpoint);
add_endpoint (addr_, (own_t *) session, newpipe); add_endpoint (addr_, (own_t *) session, newpipe);
EXIT_MUTEX ();
return 0; return 0;
} }
...@@ -1051,19 +988,17 @@ void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe ...@@ -1051,19 +988,17 @@ void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe
int zmq::socket_base_t::term_endpoint (const char *addr_) int zmq::socket_base_t::term_endpoint (const char *addr_)
{ {
ENTER_MUTEX (); scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0);
// Check whether the library haven't been shut down yet. // Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
EXIT_MUTEX ();
return -1; return -1;
} }
// Check whether endpoint address passed to the function is valid. // Check whether endpoint address passed to the function is valid.
if (unlikely (!addr_)) { if (unlikely (!addr_)) {
errno = EINVAL; errno = EINVAL;
EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -1071,7 +1006,6 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) ...@@ -1071,7 +1006,6 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
// (from launch_child() for example) we're asked to terminate now. // (from launch_child() for example) we're asked to terminate now.
int rc = process_commands (0, false); int rc = process_commands (0, false);
if (unlikely(rc != 0)) { if (unlikely(rc != 0)) {
EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -1079,27 +1013,23 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) ...@@ -1079,27 +1013,23 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
std::string protocol; std::string protocol;
std::string address; std::string address;
if (parse_uri(addr_, protocol, address) || check_protocol(protocol)) { if (parse_uri(addr_, protocol, address) || check_protocol(protocol)) {
EXIT_MUTEX ();
return -1; return -1;
} }
// Disconnect an inproc socket // Disconnect an inproc socket
if (protocol == "inproc") { if (protocol == "inproc") {
if (unregister_endpoint (std::string(addr_), this) == 0) { if (unregister_endpoint (std::string(addr_), this) == 0) {
EXIT_MUTEX ();
return 0; return 0;
} }
std::pair <inprocs_t::iterator, inprocs_t::iterator> range = inprocs.equal_range (std::string (addr_)); std::pair <inprocs_t::iterator, inprocs_t::iterator> range = inprocs.equal_range (std::string (addr_));
if (range.first == range.second) { if (range.first == range.second) {
errno = ENOENT; errno = ENOENT;
EXIT_MUTEX ();
return -1; return -1;
} }
for (inprocs_t::iterator it = range.first; it != range.second; ++it) for (inprocs_t::iterator it = range.first; it != range.second; ++it)
it->second->terminate (true); it->second->terminate (true);
inprocs.erase (range.first, range.second); inprocs.erase (range.first, range.second);
EXIT_MUTEX ();
return 0; return 0;
} }
...@@ -1137,7 +1067,6 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) ...@@ -1137,7 +1067,6 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
range = endpoints.equal_range (resolved_addr); range = endpoints.equal_range (resolved_addr);
if (range.first == range.second) { if (range.first == range.second) {
errno = ENOENT; errno = ENOENT;
EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -1148,32 +1077,28 @@ int zmq::socket_base_t::term_endpoint (const char *addr_) ...@@ -1148,32 +1077,28 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
term_child (it->second.first); term_child (it->second.first);
} }
endpoints.erase (range.first, range.second); endpoints.erase (range.first, range.second);
EXIT_MUTEX ();
return 0; return 0;
} }
int zmq::socket_base_t::send (msg_t *msg_, int flags_) int zmq::socket_base_t::send (msg_t *msg_, int flags_)
{ {
ENTER_MUTEX (); scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0);
// Check whether the library haven't been shut down yet. // Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
EXIT_MUTEX ();
return -1; return -1;
} }
// Check whether message passed to the function is valid. // Check whether message passed to the function is valid.
if (unlikely (!msg_ || !msg_->check ())) { if (unlikely (!msg_ || !msg_->check ())) {
errno = EFAULT; errno = EFAULT;
EXIT_MUTEX ();
return -1; return -1;
} }
// Process pending commands, if any. // Process pending commands, if any.
int rc = process_commands (0, true); int rc = process_commands (0, true);
if (unlikely (rc != 0)) { if (unlikely (rc != 0)) {
EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -1189,18 +1114,15 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) ...@@ -1189,18 +1114,15 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
// Try to send the message using method in each socket class // Try to send the message using method in each socket class
rc = xsend (msg_); rc = xsend (msg_);
if (rc == 0) { if (rc == 0) {
EXIT_MUTEX ();
return 0; return 0;
} }
if (unlikely (errno != EAGAIN)) { if (unlikely (errno != EAGAIN)) {
EXIT_MUTEX ();
return -1; return -1;
} }
// In case of non-blocking send we'll simply propagate // In case of non-blocking send we'll simply propagate
// the error - including EAGAIN - up the stack. // the error - including EAGAIN - up the stack.
if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) { if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) {
EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -1214,45 +1136,39 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) ...@@ -1214,45 +1136,39 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
// If timeout is reached in the meantime, return EAGAIN. // If timeout is reached in the meantime, return EAGAIN.
while (true) { while (true) {
if (unlikely (process_commands (timeout, false) != 0)) { if (unlikely (process_commands (timeout, false) != 0)) {
EXIT_MUTEX ();
return -1; return -1;
} }
rc = xsend (msg_); rc = xsend (msg_);
if (rc == 0) if (rc == 0)
break; break;
if (unlikely (errno != EAGAIN)) { if (unlikely (errno != EAGAIN)) {
EXIT_MUTEX ();
return -1; return -1;
} }
if (timeout > 0) { if (timeout > 0) {
timeout = (int) (end - clock.now_ms ()); timeout = (int) (end - clock.now_ms ());
if (timeout <= 0) { if (timeout <= 0) {
errno = EAGAIN; errno = EAGAIN;
EXIT_MUTEX ();
return -1; return -1;
} }
} }
} }
EXIT_MUTEX ();
return 0; return 0;
} }
int zmq::socket_base_t::recv (msg_t *msg_, int flags_) int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
{ {
ENTER_MUTEX (); scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0);
// Check whether the library haven't been shut down yet. // Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
EXIT_MUTEX ();
return -1; return -1;
} }
// Check whether message passed to the function is valid. // Check whether message passed to the function is valid.
if (unlikely (!msg_ || !msg_->check ())) { if (unlikely (!msg_ || !msg_->check ())) {
errno = EFAULT; errno = EFAULT;
EXIT_MUTEX ();
return -1; return -1;
} }
...@@ -1266,7 +1182,6 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -1266,7 +1182,6 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// ticks is more efficient than doing RDTSC all the time. // ticks is more efficient than doing RDTSC all the time.
if (++ticks == inbound_poll_rate) { if (++ticks == inbound_poll_rate) {
if (unlikely (process_commands (0, false) != 0)) { if (unlikely (process_commands (0, false) != 0)) {
EXIT_MUTEX ();
return -1; return -1;
} }
ticks = 0; ticks = 0;
...@@ -1275,14 +1190,12 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -1275,14 +1190,12 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// Get the message. // Get the message.
int rc = xrecv (msg_); int rc = xrecv (msg_);
if (unlikely (rc != 0 && errno != EAGAIN)) { if (unlikely (rc != 0 && errno != EAGAIN)) {
EXIT_MUTEX ();
return -1; return -1;
} }
// If we have the message, return immediately. // If we have the message, return immediately.
if (rc == 0) { if (rc == 0) {
extract_flags (msg_); extract_flags (msg_);
EXIT_MUTEX ();
return 0; return 0;
} }
...@@ -1292,19 +1205,16 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -1292,19 +1205,16 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// If it's not, return EAGAIN. // If it's not, return EAGAIN.
if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) { if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) {
if (unlikely (process_commands (0, false) != 0)) { if (unlikely (process_commands (0, false) != 0)) {
EXIT_MUTEX ();
return -1; return -1;
} }
ticks = 0; ticks = 0;
rc = xrecv (msg_); rc = xrecv (msg_);
if (rc < 0) { if (rc < 0) {
EXIT_MUTEX ();
return rc; return rc;
} }
extract_flags (msg_); extract_flags (msg_);
EXIT_MUTEX ();
return 0; return 0;
} }
...@@ -1318,7 +1228,6 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -1318,7 +1228,6 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
bool block = (ticks != 0); bool block = (ticks != 0);
while (true) { while (true) {
if (unlikely (process_commands (block ? timeout : 0, false) != 0)) { if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
EXIT_MUTEX ();
return -1; return -1;
} }
rc = xrecv (msg_); rc = xrecv (msg_);
...@@ -1327,7 +1236,6 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -1327,7 +1236,6 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
break; break;
} }
if (unlikely (errno != EAGAIN)) { if (unlikely (errno != EAGAIN)) {
EXIT_MUTEX ();
return -1; return -1;
} }
block = true; block = true;
...@@ -1335,20 +1243,18 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) ...@@ -1335,20 +1243,18 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
timeout = (int) (end - clock.now_ms ()); timeout = (int) (end - clock.now_ms ());
if (timeout <= 0) { if (timeout <= 0) {
errno = EAGAIN; errno = EAGAIN;
EXIT_MUTEX ();
return -1; return -1;
} }
} }
} }
extract_flags (msg_); extract_flags (msg_);
EXIT_MUTEX ();
return 0; return 0;
} }
int zmq::socket_base_t::close () int zmq::socket_base_t::close ()
{ {
ENTER_MUTEX (); scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0);
// Remove all existing signalers for thread safe sockets // Remove all existing signalers for thread safe sockets
if (thread_safe) if (thread_safe)
...@@ -1357,7 +1263,6 @@ int zmq::socket_base_t::close () ...@@ -1357,7 +1263,6 @@ int zmq::socket_base_t::close ()
// Mark the socket as dead // Mark the socket as dead
tag = 0xdeadbeef; tag = 0xdeadbeef;
EXIT_MUTEX ();
// Transfer the ownership of the socket from this application thread // Transfer the ownership of the socket from this application thread
// to the reaper thread which will take care of the rest of shutdown // to the reaper thread which will take care of the rest of shutdown
...@@ -1387,7 +1292,7 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_) ...@@ -1387,7 +1292,7 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_)
if (!thread_safe) if (!thread_safe)
fd = ((mailbox_t*)mailbox)->get_fd(); fd = ((mailbox_t*)mailbox)->get_fd();
else { else {
ENTER_MUTEX (); scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0);
reaper_signaler = new signaler_t(); reaper_signaler = new signaler_t();
...@@ -1398,7 +1303,6 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_) ...@@ -1398,7 +1303,6 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_)
// Send a signal to make sure reaper handle existing commands // Send a signal to make sure reaper handle existing commands
reaper_signaler->send(); reaper_signaler->send();
EXIT_MUTEX ();
} }
handle = poller->add_fd (fd, this); handle = poller->add_fd (fd, this);
...@@ -1581,14 +1485,13 @@ void zmq::socket_base_t::in_event () ...@@ -1581,14 +1485,13 @@ void zmq::socket_base_t::in_event ()
// of the reaper thread. Process any commands from other threads/sockets // of the reaper thread. Process any commands from other threads/sockets
// that may be available at the moment. Ultimately, the socket will // that may be available at the moment. Ultimately, the socket will
// be destroyed. // be destroyed.
ENTER_MUTEX (); scoped_optional_lock_t sync_lock(thread_safe ? &sync : 0);
// If the socket is thread safe we need to unsignal the reaper signaler // If the socket is thread safe we need to unsignal the reaper signaler
if (thread_safe) if (thread_safe)
reaper_signaler->recv(); reaper_signaler->recv();
process_commands (0, false); process_commands (0, false);
EXIT_MUTEX();
check_destroy(); check_destroy();
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment