Commit 9426bd5b authored by Pieter Hintjens's avatar Pieter Hintjens

Merge branch 'issue-336'

parents 87fa8e78 bdefa181
...@@ -41,8 +41,6 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : ...@@ -41,8 +41,6 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
tag (0xbadcafe0), tag (0xbadcafe0),
terminating (false) terminating (false)
{ {
int rc;
// Initialise the array of mailboxes. Additional three slots are for // Initialise the array of mailboxes. Additional three slots are for
// internal log socket and the zmq_term thread the reaper thread. // internal log socket and the zmq_term thread the reaper thread.
slot_count = max_sockets + io_threads_ + 3; slot_count = max_sockets + io_threads_ + 3;
...@@ -73,12 +71,6 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : ...@@ -73,12 +71,6 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
empty_slots.push_back (i); empty_slots.push_back (i);
slots [i] = NULL; slots [i] = NULL;
} }
// Create the logging infrastructure.
log_socket = create_socket (ZMQ_PUB);
zmq_assert (log_socket);
rc = log_socket->bind ("sys://log");
zmq_assert (rc == 0);
} }
bool zmq::ctx_t::check_tag () bool zmq::ctx_t::check_tag ()
...@@ -122,14 +114,6 @@ int zmq::ctx_t::terminate () ...@@ -122,14 +114,6 @@ int zmq::ctx_t::terminate ()
// First attempt to terminate the context. // First attempt to terminate the context.
if (!restarted) { if (!restarted) {
// Close the logging infrastructure.
log_sync.lock ();
int rc = log_socket->close ();
zmq_assert (rc == 0);
log_socket = NULL;
log_sync.unlock ();
// First send stop command to sockets so that any blocking calls can be // First send stop command to sockets so that any blocking calls can be
// interrupted. If there are no sockets we can ask reaper thread to stop. // interrupted. If there are no sockets we can ask reaper thread to stop.
slot_sync.lock (); slot_sync.lock ();
...@@ -302,24 +286,3 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_) ...@@ -302,24 +286,3 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
endpoints_sync.unlock (); endpoints_sync.unlock ();
return *endpoint; return *endpoint;
} }
void zmq::ctx_t::log (const char *format_, va_list args_)
{
// Create the log message.
msg_t msg;
int rc = msg.init_size (strlen (format_) + 1);
errno_assert (rc == 0);
memcpy (msg.data (), format_, msg.size ());
// At this point we migrate the log socket to the current thread.
// We rely on mutex for executing the memory barrier.
log_sync.lock ();
if (log_socket)
log_socket->send (&msg, 0);
log_sync.unlock ();
rc = msg.close ();
errno_assert (rc == 0);
}
...@@ -91,9 +91,6 @@ namespace zmq ...@@ -91,9 +91,6 @@ namespace zmq
void unregister_endpoints (zmq::socket_base_t *socket_); void unregister_endpoints (zmq::socket_base_t *socket_);
endpoint_t find_endpoint (const char *addr_); endpoint_t find_endpoint (const char *addr_);
// Logging.
void log (const char *format_, va_list args_);
enum { enum {
term_tid = 0, term_tid = 0,
reaper_tid = 1 reaper_tid = 1
...@@ -146,11 +143,6 @@ namespace zmq ...@@ -146,11 +143,6 @@ namespace zmq
// Synchronisation of access to the list of inproc endpoints. // Synchronisation of access to the list of inproc endpoints.
mutex_t endpoints_sync; mutex_t endpoints_sync;
// PUB socket for logging. The socket is shared among all the threads,
// thus it is synchronised by a mutex.
zmq::socket_base_t *log_socket;
mutex_t log_sync;
ctx_t (const ctx_t&); ctx_t (const ctx_t&);
const ctx_t &operator = (const ctx_t&); const ctx_t &operator = (const ctx_t&);
}; };
......
...@@ -149,14 +149,6 @@ void zmq::object_t::destroy_socket (socket_base_t *socket_) ...@@ -149,14 +149,6 @@ void zmq::object_t::destroy_socket (socket_base_t *socket_)
ctx->destroy_socket (socket_); ctx->destroy_socket (socket_);
} }
void zmq::object_t::log (const char *format_, ...)
{
va_list args;
va_start (args, format_);
ctx->log (format_, args);
va_end (args);
}
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_) zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_)
{ {
return ctx->choose_io_thread (affinity_); return ctx->choose_io_thread (affinity_);
......
...@@ -179,7 +179,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) ...@@ -179,7 +179,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
{ {
// First check out whether the protcol is something we are aware of. // First check out whether the protcol is something we are aware of.
if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" && if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" &&
protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "sys") { protocol_ != "pgm" && protocol_ != "epgm") {
errno = EPROTONOSUPPORT; errno = EPROTONOSUPPORT;
return -1; return -1;
} }
...@@ -318,7 +318,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -318,7 +318,7 @@ int zmq::socket_base_t::bind (const char *addr_)
if (rc != 0) if (rc != 0)
return -1; return -1;
if (protocol == "inproc" || protocol == "sys") { if (protocol == "inproc") {
endpoint_t endpoint = {this, options}; endpoint_t endpoint = {this, options};
return register_endpoint (addr_, endpoint); return register_endpoint (addr_, endpoint);
} }
...@@ -392,7 +392,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -392,7 +392,7 @@ int zmq::socket_base_t::connect (const char *addr_)
if (rc != 0) if (rc != 0)
return -1; return -1;
if (protocol == "inproc" || protocol == "sys") { if (protocol == "inproc") {
// TODO: inproc connect is specific with respect to creating pipes // TODO: inproc connect is specific with respect to creating pipes
// as there's no 'reconnect' functionality implemented. Once that // as there's no 'reconnect' functionality implemented. Once that
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment