Commit 3b483a8d authored by Ian Barber's avatar Ian Barber

Merge pull request #285 from hurtonm/ctx_patches

Ctx patches
parents 84707d3f 151d0717
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
*/ */
#include "platform.hpp" #include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
#include "windows.hpp" #include "windows.hpp"
#else #else
#include <unistd.h> #include <unistd.h>
...@@ -245,7 +245,7 @@ void zmq::ctx_t::destroy_socket (class socket_base_t *socket_) ...@@ -245,7 +245,7 @@ void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
{ {
slot_sync.lock (); slot_sync.lock ();
// Free the associared thread slot. // Free the associated thread slot.
uint32_t tid = socket_->get_tid (); uint32_t tid = socket_->get_tid ();
empty_slots.push_back (tid); empty_slots.push_back (tid);
slots [tid] = NULL; slots [tid] = NULL;
...@@ -278,18 +278,17 @@ zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_) ...@@ -278,18 +278,17 @@ zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
// Find the I/O thread with minimum load. // Find the I/O thread with minimum load.
int min_load = -1; int min_load = -1;
io_threads_t::size_type result = 0; io_thread_t *selected_io_thread = NULL;
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) { for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
if (!affinity_ || (affinity_ & (uint64_t (1) << i))) { if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
int load = io_threads [i]->get_load (); int load = io_threads [i]->get_load ();
if (min_load == -1 || load < min_load) { if (selected_io_thread == NULL || load < min_load) {
min_load = load; min_load = load;
result = i; selected_io_thread = io_threads [i];
} }
} }
} }
zmq_assert (min_load != -1); return selected_io_thread;
return io_threads [result];
} }
int zmq::ctx_t::register_endpoint (const char *addr_, endpoint_t &endpoint_) int zmq::ctx_t::register_endpoint (const char *addr_, endpoint_t &endpoint_)
...@@ -298,13 +297,14 @@ int zmq::ctx_t::register_endpoint (const char *addr_, endpoint_t &endpoint_) ...@@ -298,13 +297,14 @@ int zmq::ctx_t::register_endpoint (const char *addr_, endpoint_t &endpoint_)
bool inserted = endpoints.insert (endpoints_t::value_type ( bool inserted = endpoints.insert (endpoints_t::value_type (
std::string (addr_), endpoint_)).second; std::string (addr_), endpoint_)).second;
endpoints_sync.unlock ();
if (!inserted) { if (!inserted) {
errno = EADDRINUSE; errno = EADDRINUSE;
endpoints_sync.unlock ();
return -1; return -1;
} }
endpoints_sync.unlock ();
return 0; return 0;
} }
...@@ -337,16 +337,16 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_) ...@@ -337,16 +337,16 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
endpoint_t empty = {NULL, options_t()}; endpoint_t empty = {NULL, options_t()};
return empty; return empty;
} }
endpoint_t *endpoint = &it->second; endpoint_t endpoint = it->second;
// Increment the command sequence number of the peer so that it won't // Increment the command sequence number of the peer so that it won't
// get deallocated until "bind" command is issued by the caller. // get deallocated until "bind" command is issued by the caller.
// The subsequent 'bind' has to be called with inc_seqnum parameter // The subsequent 'bind' has to be called with inc_seqnum parameter
// set to false, so that the seqnum isn't incremented twice. // set to false, so that the seqnum isn't incremented twice.
endpoint->socket->inc_seqnum (); endpoint.socket->inc_seqnum ();
endpoints_sync.unlock (); endpoints_sync.unlock ();
return *endpoint; return endpoint;
} }
// The last used socket ID, or 0 if no socket was used so far. Note that this // The last used socket ID, or 0 if no socket was used so far. Note that this
......
...@@ -48,7 +48,7 @@ namespace zmq ...@@ -48,7 +48,7 @@ namespace zmq
// for synchronisation, handshaking or similar. // for synchronisation, handshaking or similar.
struct endpoint_t struct endpoint_t
{ {
class socket_base_t *socket; socket_base_t *socket;
options_t options; options_t options;
}; };
...@@ -59,7 +59,7 @@ namespace zmq ...@@ -59,7 +59,7 @@ namespace zmq
{ {
public: public:
// Create the context object // Create the context object.
ctx_t (); ctx_t ();
// Returns false if object is not a context. // Returns false if object is not a context.
...@@ -71,7 +71,7 @@ namespace zmq ...@@ -71,7 +71,7 @@ namespace zmq
// after the last one is closed. // after the last one is closed.
int terminate (); int terminate ();
// Set and set context properties // Set and get context properties.
int set (int option_, int optval_); int set (int option_, int optval_);
int get (int option_); int get (int option_);
...@@ -84,7 +84,7 @@ namespace zmq ...@@ -84,7 +84,7 @@ namespace zmq
// Returns the I/O thread that is the least busy at the moment. // Returns the I/O thread that is the least busy at the moment.
// Affinity specifies which I/O threads are eligible (0 = all). // Affinity specifies which I/O threads are eligible (0 = all).
// Returns NULL is no I/O thread is available. // Returns NULL if no I/O thread is available.
zmq::io_thread_t *choose_io_thread (uint64_t affinity_); zmq::io_thread_t *choose_io_thread (uint64_t affinity_);
// Returns reaper thread object. // Returns reaper thread object.
...@@ -117,8 +117,8 @@ namespace zmq ...@@ -117,8 +117,8 @@ namespace zmq
typedef std::vector <uint32_t> emtpy_slots_t; typedef std::vector <uint32_t> emtpy_slots_t;
emtpy_slots_t empty_slots; emtpy_slots_t empty_slots;
// If true, zmq_init has been called but no socket have been created // If true, zmq_init has been called but no socket has been created
// yes. Launching of I/O threads is delayed. // yet. Launching of I/O threads is delayed.
bool starting; bool starting;
// If true, zmq_term was already called. // If true, zmq_term was already called.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment