Commit e5599de6 authored by Reza Ebrahimi's avatar Reza Ebrahimi Committed by Luca Boccassi

Convert manual (locking and unlocking) mutexes to scoped mutexes for the case of…

Convert manual (locking and unlocking) mutexes to scoped mutexes for the case of unlocking mutex even if the protected operation throws an exception (#2071)
parent bbece557
...@@ -84,16 +84,18 @@ static zmq::mutex_t compatible_get_tick_count64_mutex; ...@@ -84,16 +84,18 @@ static zmq::mutex_t compatible_get_tick_count64_mutex;
ULONGLONG compatible_get_tick_count64() ULONGLONG compatible_get_tick_count64()
{ {
compatible_get_tick_count64_mutex.lock(); zmq::scoped_lock_t locker(compatible_get_tick_count64_mutex);
static DWORD s_wrap = 0; static DWORD s_wrap = 0;
static DWORD s_last_tick = 0; static DWORD s_last_tick = 0;
const DWORD current_tick = ::GetTickCount(); const DWORD current_tick = ::GetTickCount();
if (current_tick < s_last_tick) if (current_tick < s_last_tick)
++s_wrap; ++s_wrap;
s_last_tick = current_tick; s_last_tick = current_tick;
const ULONGLONG result = (static_cast<ULONGLONG>(s_wrap) << 32) + static_cast<ULONGLONG>(current_tick); const ULONGLONG result = (static_cast<ULONGLONG>(s_wrap) << 32) + static_cast<ULONGLONG>(current_tick);
compatible_get_tick_count64_mutex.unlock();
return result; return result;
} }
......
...@@ -91,7 +91,7 @@ zmq::ctx_t::ctx_t () : ...@@ -91,7 +91,7 @@ zmq::ctx_t::ctx_t () :
vmci_family = -1; vmci_family = -1;
#endif #endif
crypto_sync.lock (); scoped_lock_t locker(crypto_sync);
#if defined (ZMQ_USE_TWEETNACL) #if defined (ZMQ_USE_TWEETNACL)
// allow opening of /dev/urandom // allow opening of /dev/urandom
unsigned char tmpbytes[4]; unsigned char tmpbytes[4];
...@@ -100,7 +100,6 @@ zmq::ctx_t::ctx_t () : ...@@ -100,7 +100,6 @@ zmq::ctx_t::ctx_t () :
int rc = sodium_init (); int rc = sodium_init ();
zmq_assert (rc != -1); zmq_assert (rc != -1);
#endif #endif
crypto_sync.unlock ();
} }
bool zmq::ctx_t::check_tag () bool zmq::ctx_t::check_tag ()
...@@ -218,7 +217,8 @@ int zmq::ctx_t::terminate () ...@@ -218,7 +217,8 @@ int zmq::ctx_t::terminate ()
int zmq::ctx_t::shutdown () int zmq::ctx_t::shutdown ()
{ {
slot_sync.lock (); scoped_lock_t locker(slot_sync);
if (!starting && !terminating) { if (!starting && !terminating) {
terminating = true; terminating = true;
...@@ -230,7 +230,6 @@ int zmq::ctx_t::shutdown () ...@@ -230,7 +230,6 @@ int zmq::ctx_t::shutdown ()
if (sockets.empty ()) if (sockets.empty ())
reaper->stop (); reaper->stop ();
} }
slot_sync.unlock ();
return 0; return 0;
} }
...@@ -240,45 +239,38 @@ int zmq::ctx_t::set (int option_, int optval_) ...@@ -240,45 +239,38 @@ int zmq::ctx_t::set (int option_, int optval_)
int rc = 0; int rc = 0;
if (option_ == ZMQ_MAX_SOCKETS if (option_ == ZMQ_MAX_SOCKETS
&& optval_ >= 1 && optval_ == clipped_maxsocket (optval_)) { && optval_ >= 1 && optval_ == clipped_maxsocket (optval_)) {
opt_sync.lock (); scoped_lock_t locker(opt_sync);
max_sockets = optval_; max_sockets = optval_;
opt_sync.unlock ();
} }
else else
if (option_ == ZMQ_IO_THREADS && optval_ >= 0) { if (option_ == ZMQ_IO_THREADS && optval_ >= 0) {
opt_sync.lock (); scoped_lock_t locker(opt_sync);
io_thread_count = optval_; io_thread_count = optval_;
opt_sync.unlock ();
} }
else else
if (option_ == ZMQ_IPV6 && optval_ >= 0) { if (option_ == ZMQ_IPV6 && optval_ >= 0) {
opt_sync.lock (); scoped_lock_t locker(opt_sync);
ipv6 = (optval_ != 0); ipv6 = (optval_ != 0);
opt_sync.unlock ();
} }
else else
if (option_ == ZMQ_THREAD_PRIORITY && optval_ >= 0) { if (option_ == ZMQ_THREAD_PRIORITY && optval_ >= 0) {
opt_sync.lock(); scoped_lock_t locker(opt_sync);
thread_priority = optval_; thread_priority = optval_;
opt_sync.unlock ();
} }
else else
if (option_ == ZMQ_THREAD_SCHED_POLICY && optval_ >= 0) { if (option_ == ZMQ_THREAD_SCHED_POLICY && optval_ >= 0) {
opt_sync.lock(); scoped_lock_t locker(opt_sync);
thread_sched_policy = optval_; thread_sched_policy = optval_;
opt_sync.unlock ();
} }
else else
if (option_ == ZMQ_BLOCKY && optval_ >= 0) { if (option_ == ZMQ_BLOCKY && optval_ >= 0) {
opt_sync.lock (); scoped_lock_t locker(opt_sync);
blocky = (optval_ != 0); blocky = (optval_ != 0);
opt_sync.unlock ();
} }
else else
if (option_ == ZMQ_MAX_MSGSZ && optval_ >= 0) { if (option_ == ZMQ_MAX_MSGSZ && optval_ >= 0) {
opt_sync.lock (); scoped_lock_t locker(opt_sync);
max_msgsz = optval_ < INT_MAX? optval_: INT_MAX; max_msgsz = optval_ < INT_MAX? optval_: INT_MAX;
opt_sync.unlock ();
} }
else { else {
errno = EINVAL; errno = EINVAL;
...@@ -316,7 +308,8 @@ int zmq::ctx_t::get (int option_) ...@@ -316,7 +308,8 @@ int zmq::ctx_t::get (int option_)
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{ {
slot_sync.lock (); scoped_lock_t locker(slot_sync);
if (unlikely (starting)) { if (unlikely (starting)) {
starting = false; starting = false;
...@@ -358,14 +351,12 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) ...@@ -358,14 +351,12 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
// Once zmq_ctx_term() was called, we can't create new sockets. // Once zmq_ctx_term() was called, we can't create new sockets.
if (terminating) { if (terminating) {
slot_sync.unlock ();
errno = ETERM; errno = ETERM;
return NULL; return NULL;
} }
// If max_sockets limit was reached, return error. // If max_sockets limit was reached, return error.
if (empty_slots.empty ()) { if (empty_slots.empty ()) {
slot_sync.unlock ();
errno = EMFILE; errno = EMFILE;
return NULL; return NULL;
} }
...@@ -381,19 +372,17 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) ...@@ -381,19 +372,17 @@ zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
socket_base_t *s = socket_base_t::create (type_, this, slot, sid); socket_base_t *s = socket_base_t::create (type_, this, slot, sid);
if (!s) { if (!s) {
empty_slots.push_back (slot); empty_slots.push_back (slot);
slot_sync.unlock ();
return NULL; return NULL;
} }
sockets.push_back (s); sockets.push_back (s);
slots [slot] = s->get_mailbox (); slots [slot] = s->get_mailbox ();
slot_sync.unlock ();
return s; return s;
} }
void zmq::ctx_t::destroy_socket (class socket_base_t *socket_) void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
{ {
slot_sync.lock (); scoped_lock_t locker(slot_sync);
// Free the associated thread slot. // Free the associated thread slot.
uint32_t tid = socket_->get_tid (); uint32_t tid = socket_->get_tid ();
...@@ -407,8 +396,6 @@ void zmq::ctx_t::destroy_socket (class socket_base_t *socket_) ...@@ -407,8 +396,6 @@ void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
// we can ask reaper thread to terminate. // we can ask reaper thread to terminate.
if (terminating && sockets.empty ()) if (terminating && sockets.empty ())
reaper->stop (); reaper->stop ();
slot_sync.unlock ();
} }
zmq::object_t *zmq::ctx_t::get_reaper () zmq::object_t *zmq::ctx_t::get_reaper ()
...@@ -450,13 +437,9 @@ zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_) ...@@ -450,13 +437,9 @@ zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
int zmq::ctx_t::register_endpoint (const char *addr_, int zmq::ctx_t::register_endpoint (const char *addr_,
const endpoint_t &endpoint_) const endpoint_t &endpoint_)
{ {
endpoints_sync.lock (); scoped_lock_t locker(endpoints_sync);
const bool inserted = endpoints.insert (
endpoints_t::value_type (std::string (addr_), endpoint_)).second;
endpoints_sync.unlock ();
const bool inserted = endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_)).second;
if (!inserted) { if (!inserted) {
errno = EADDRINUSE; errno = EADDRINUSE;
return -1; return -1;
...@@ -467,11 +450,10 @@ int zmq::ctx_t::register_endpoint (const char *addr_, ...@@ -467,11 +450,10 @@ int zmq::ctx_t::register_endpoint (const char *addr_,
int zmq::ctx_t::unregister_endpoint ( int zmq::ctx_t::unregister_endpoint (
const std::string &addr_, socket_base_t *socket_) const std::string &addr_, socket_base_t *socket_)
{ {
endpoints_sync.lock (); scoped_lock_t locker(endpoints_sync);
const endpoints_t::iterator it = endpoints.find (addr_); const endpoints_t::iterator it = endpoints.find (addr_);
if (it == endpoints.end () || it->second.socket != socket_) { if (it == endpoints.end () || it->second.socket != socket_) {
endpoints_sync.unlock ();
errno = ENOENT; errno = ENOENT;
return -1; return -1;
} }
...@@ -479,14 +461,12 @@ int zmq::ctx_t::unregister_endpoint ( ...@@ -479,14 +461,12 @@ int zmq::ctx_t::unregister_endpoint (
// Remove endpoint. // Remove endpoint.
endpoints.erase (it); endpoints.erase (it);
endpoints_sync.unlock ();
return 0; return 0;
} }
void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_) void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
{ {
endpoints_sync.lock (); scoped_lock_t locker(endpoints_sync);
endpoints_t::iterator it = endpoints.begin (); endpoints_t::iterator it = endpoints.begin ();
while (it != endpoints.end ()) { while (it != endpoints.end ()) {
...@@ -498,17 +478,14 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_) ...@@ -498,17 +478,14 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
} }
++it; ++it;
} }
endpoints_sync.unlock ();
} }
zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_) zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
{ {
endpoints_sync.lock (); scoped_lock_t locker(endpoints_sync);
endpoints_t::iterator it = endpoints.find (addr_); endpoints_t::iterator it = endpoints.find (addr_);
if (it == endpoints.end ()) { if (it == endpoints.end ()) {
endpoints_sync.unlock ();
errno = ECONNREFUSED; errno = ECONNREFUSED;
endpoint_t empty = {NULL, options_t()}; endpoint_t empty = {NULL, options_t()};
return empty; return empty;
...@@ -521,34 +498,30 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_) ...@@ -521,34 +498,30 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
// set to false, so that the seqnum isn't incremented twice. // set to false, so that the seqnum isn't incremented twice.
endpoint.socket->inc_seqnum (); endpoint.socket->inc_seqnum ();
endpoints_sync.unlock ();
return endpoint; return endpoint;
} }
void zmq::ctx_t::pend_connection (const std::string &addr_, void zmq::ctx_t::pend_connection (const std::string &addr_,
const endpoint_t &endpoint_, pipe_t **pipes_) const endpoint_t &endpoint_, pipe_t **pipes_)
{ {
const pending_connection_t pending_connection = scoped_lock_t locker(endpoints_sync);
{endpoint_, pipes_ [0], pipes_ [1]};
endpoints_sync.lock (); const pending_connection_t pending_connection = {endpoint_, pipes_ [0], pipes_ [1]};
endpoints_t::iterator it = endpoints.find (addr_); endpoints_t::iterator it = endpoints.find (addr_);
if (it == endpoints.end ()) { if (it == endpoints.end ()) {
// Still no bind. // Still no bind.
endpoint_.socket->inc_seqnum (); endpoint_.socket->inc_seqnum ();
pending_connections.insert (pending_connections_t::value_type (addr_, pending_connection)); pending_connections.insert (pending_connections_t::value_type (addr_, pending_connection));
} } else {
else // Bind has happened in the mean time, connect directly
// Bind has happened in the mean time, connect directly connect_inproc_sockets(it->second.socket, it->second.options, pending_connection, connect_side);
connect_inproc_sockets (it->second.socket, it->second.options, pending_connection, connect_side); }
endpoints_sync.unlock ();
} }
void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_) void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
{ {
endpoints_sync.lock (); scoped_lock_t locker(endpoints_sync);
std::pair<pending_connections_t::iterator, pending_connections_t::iterator> pending = pending_connections.equal_range(addr_); std::pair<pending_connections_t::iterator, pending_connections_t::iterator> pending = pending_connections.equal_range(addr_);
...@@ -556,7 +529,6 @@ void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_so ...@@ -556,7 +529,6 @@ void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_so
connect_inproc_sockets(bind_socket_, endpoints[addr_].options, p->second, bind_side); connect_inproc_sockets(bind_socket_, endpoints[addr_].options, p->second, bind_side);
pending_connections.erase(pending.first, pending.second); pending_connections.erase(pending.first, pending.second);
endpoints_sync.unlock ();
} }
void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_, void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
...@@ -618,7 +590,7 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_, ...@@ -618,7 +590,7 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
int zmq::ctx_t::get_vmci_socket_family () int zmq::ctx_t::get_vmci_socket_family ()
{ {
vmci_sync.lock (); zmq::scoped_lock_t locker(vmci_sync);
if (vmci_fd == -1) { if (vmci_fd == -1) {
vmci_family = VMCISock_GetAFValueFd (&vmci_fd); vmci_family = VMCISock_GetAFValueFd (&vmci_fd);
...@@ -631,8 +603,6 @@ int zmq::ctx_t::get_vmci_socket_family () ...@@ -631,8 +603,6 @@ int zmq::ctx_t::get_vmci_socket_family ()
} }
} }
vmci_sync.unlock ();
return vmci_family; return vmci_family;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment