Commit ce0972dc authored by Martin Sustrik's avatar Martin Sustrik

context creates an inproc endpoint ('inproc://log') to distribute 0MQ's log messages

parent db73c763
...@@ -20,8 +20,6 @@ ...@@ -20,8 +20,6 @@
#include <new> #include <new>
#include <string.h> #include <string.h>
#include "../include/zmq.h"
#include "ctx.hpp" #include "ctx.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
...@@ -68,6 +66,12 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) : ...@@ -68,6 +66,12 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
empty_slots.push_back (i); empty_slots.push_back (i);
slots [i] = NULL; slots [i] = NULL;
} }
// Create the logging infrastructure.
log_socket = create_socket (ZMQ_PUB);
zmq_assert (log_socket);
int rc = log_socket->bind ("inproc://log");
zmq_assert (rc == 0);
} }
zmq::ctx_t::~ctx_t () zmq::ctx_t::~ctx_t ()
...@@ -104,6 +108,13 @@ int zmq::ctx_t::terminate () ...@@ -104,6 +108,13 @@ int zmq::ctx_t::terminate ()
for (sockets_t::size_type i = 0; i != sockets.size (); i++) for (sockets_t::size_type i = 0; i != sockets.size (); i++)
sockets [i]->stop (); sockets [i]->stop ();
// Close the logging infrastructure.
log_sync.lock ();
int rc = log_socket->close ();
zmq_assert (rc == 0);
log_socket = NULL;
log_sync.unlock ();
// Find out whether there are any open sockets to care about. // Find out whether there are any open sockets to care about.
// If so, sleep till they are closed. Note that we can use // If so, sleep till they are closed. Note that we can use
// no_sockets_notify safely out of the critical section as once set // no_sockets_notify safely out of the critical section as once set
...@@ -287,6 +298,16 @@ zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_) ...@@ -287,6 +298,16 @@ zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_)
return endpoint; return endpoint;
} }
void zmq::ctx_t::log (zmq_msg_t *msg_)
{
// At this point we migrate the log socket to the current thread.
// We rely on mutex for executing the memory barrier.
log_sync.lock ();
if (log_socket)
log_socket->send (msg_, 0);
log_sync.unlock ();
}
void zmq::ctx_t::dezombify () void zmq::ctx_t::dezombify ()
{ {
// Try to dezombify each zombie in the list. Note that caller is // Try to dezombify each zombie in the list. Note that caller is
......
...@@ -24,6 +24,8 @@ ...@@ -24,6 +24,8 @@
#include <vector> #include <vector>
#include <string> #include <string>
#include "../include/zmq.h"
#include "signaler.hpp" #include "signaler.hpp"
#include "semaphore.hpp" #include "semaphore.hpp"
#include "ypipe.hpp" #include "ypipe.hpp"
...@@ -74,6 +76,9 @@ namespace zmq ...@@ -74,6 +76,9 @@ namespace zmq
void unregister_endpoints (class socket_base_t *socket_); void unregister_endpoints (class socket_base_t *socket_);
class socket_base_t *find_endpoint (const char *addr_); class socket_base_t *find_endpoint (const char *addr_);
// Logging.
void log (zmq_msg_t *msg_);
private: private:
~ctx_t (); ~ctx_t ();
...@@ -125,6 +130,11 @@ namespace zmq ...@@ -125,6 +130,11 @@ namespace zmq
// Synchronisation of access to the list of inproc endpoints. // Synchronisation of access to the list of inproc endpoints.
mutex_t endpoints_sync; mutex_t endpoints_sync;
// PUB socket for logging. The socket is shared among all the threads,
// thus it is synchronised by a mutex.
class socket_base_t *log_socket;
mutex_t log_sync;
ctx_t (const ctx_t&); ctx_t (const ctx_t&);
void operator = (const ctx_t&); void operator = (const ctx_t&);
}; };
......
...@@ -137,6 +137,11 @@ zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_) ...@@ -137,6 +137,11 @@ zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_)
return ctx->find_endpoint (addr_); return ctx->find_endpoint (addr_);
} }
void zmq::object_t::log (zmq_msg_t *msg_)
{
ctx->log (msg_);
}
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
{ {
return ctx->choose_io_thread (taskset_); return ctx->choose_io_thread (taskset_);
......
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
#ifndef __ZMQ_OBJECT_HPP_INCLUDED__ #ifndef __ZMQ_OBJECT_HPP_INCLUDED__
#define __ZMQ_OBJECT_HPP_INCLUDED__ #define __ZMQ_OBJECT_HPP_INCLUDED__
#include "../include/zmq.h"
#include "stdint.hpp" #include "stdint.hpp"
#include "blob.hpp" #include "blob.hpp"
...@@ -48,6 +50,9 @@ namespace zmq ...@@ -48,6 +50,9 @@ namespace zmq
void unregister_endpoints (class socket_base_t *socket_); void unregister_endpoints (class socket_base_t *socket_);
class socket_base_t *find_endpoint (const char *addr_); class socket_base_t *find_endpoint (const char *addr_);
// Logs an message.
void log (zmq_msg_t *msg_);
// Chooses least loaded I/O thread. // Chooses least loaded I/O thread.
class io_thread_t *choose_io_thread (uint64_t taskset_); class io_thread_t *choose_io_thread (uint64_t taskset_);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment