Commit 835e893e authored by Martin Sustrik's avatar Martin Sustrik

dispatcher_t class renamed to ctx_t

parent 10f5334f
......@@ -55,9 +55,9 @@ libzmq_la_SOURCES = app_thread.hpp \
blob.hpp \
command.hpp \
config.hpp \
ctx.hpp \
decoder.hpp \
devpoll.hpp \
dispatcher.hpp \
downstream.hpp \
encoder.hpp \
epoll.hpp \
......@@ -122,8 +122,8 @@ libzmq_la_SOURCES = app_thread.hpp \
zmq_listener.hpp \
app_thread.cpp \
command.cpp \
ctx.cpp \
devpoll.cpp \
dispatcher.cpp \
downstream.cpp \
epoll.cpp \
err.cpp \
......
......@@ -34,7 +34,7 @@
#endif
#include "app_thread.hpp"
#include "dispatcher.hpp"
#include "ctx.hpp"
#include "err.hpp"
#include "pipe.hpp"
#include "config.hpp"
......@@ -57,9 +57,9 @@
#define ZMQ_DELAY_COMMANDS
#endif
zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_,
zmq::app_thread_t::app_thread_t (ctx_t *ctx_,
uint32_t thread_slot_) :
object_t (dispatcher_, thread_slot_),
object_t (ctx_, thread_slot_),
last_processing_time (0),
terminated (false)
{
......@@ -163,7 +163,7 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
break;
default:
if (sockets.empty ())
get_dispatcher ()->no_sockets (this);
get_ctx ()->no_sockets (this);
errno = EINVAL;
return NULL;
}
......@@ -178,7 +178,7 @@ void zmq::app_thread_t::remove_socket (socket_base_t *socket_)
{
sockets.erase (socket_);
if (sockets.empty ())
get_dispatcher ()->no_sockets (this);
get_ctx ()->no_sockets (this);
}
void zmq::app_thread_t::process_stop ()
......
......@@ -34,7 +34,7 @@ namespace zmq
{
public:
app_thread_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_);
app_thread_t (class ctx_t *ctx_, uint32_t thread_slot_);
~app_thread_t ();
......
......@@ -22,7 +22,7 @@
#include "../include/zmq.h"
#include "dispatcher.hpp"
#include "ctx.hpp"
#include "socket_base.hpp"
#include "app_thread.hpp"
#include "io_thread.hpp"
......@@ -34,7 +34,7 @@
#include "windows.h"
#endif
zmq::dispatcher_t::dispatcher_t (uint32_t io_threads_) :
zmq::ctx_t::ctx_t (uint32_t io_threads_) :
sockets (0),
terminated (false)
{
......@@ -65,7 +65,7 @@ zmq::dispatcher_t::dispatcher_t (uint32_t io_threads_) :
}
}
int zmq::dispatcher_t::term ()
int zmq::ctx_t::term ()
{
// First send stop command to application threads so that any
// blocking calls are interrupted.
......@@ -86,7 +86,7 @@ int zmq::dispatcher_t::term ()
return 0;
}
zmq::dispatcher_t::~dispatcher_t ()
zmq::ctx_t::~ctx_t ()
{
// Ask I/O threads to terminate. If stop signal wasn't sent to I/O
// thread subsequent invocation of destructor would hang-up.
......@@ -117,7 +117,7 @@ zmq::dispatcher_t::~dispatcher_t ()
#endif
}
zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
app_threads_sync.lock ();
......@@ -183,7 +183,7 @@ zmq::socket_base_t *zmq::dispatcher_t::create_socket (int type_)
return s;
}
void zmq::dispatcher_t::destroy_socket ()
void zmq::ctx_t::destroy_socket ()
{
// If zmq_term was already called and there are no more sockets,
// terminate the whole 0MQ infrastructure.
......@@ -197,7 +197,7 @@ void zmq::dispatcher_t::destroy_socket ()
delete this;
}
void zmq::dispatcher_t::no_sockets (app_thread_t *thread_)
void zmq::ctx_t::no_sockets (app_thread_t *thread_)
{
app_threads_sync.lock ();
app_threads_t::size_type i;
......@@ -210,19 +210,19 @@ void zmq::dispatcher_t::no_sockets (app_thread_t *thread_)
app_threads_sync.unlock ();
}
void zmq::dispatcher_t::send_command (uint32_t destination_,
void zmq::ctx_t::send_command (uint32_t destination_,
const command_t &command_)
{
signalers [destination_]->send (command_);
}
bool zmq::dispatcher_t::recv_command (uint32_t thread_slot_,
bool zmq::ctx_t::recv_command (uint32_t thread_slot_,
command_t *command_, bool block_)
{
return signalers [thread_slot_]->recv (command_, block_);
}
zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t affinity_)
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
{
// Find the I/O thread with minimum load.
zmq_assert (io_threads.size () > 0);
......@@ -241,7 +241,7 @@ zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t affinity_)
return io_threads [result];
}
void zmq::dispatcher_t::register_pipe (class pipe_t *pipe_)
void zmq::ctx_t::register_pipe (class pipe_t *pipe_)
{
pipes_sync.lock ();
bool inserted = pipes.insert (pipe_).second;
......@@ -249,7 +249,7 @@ void zmq::dispatcher_t::register_pipe (class pipe_t *pipe_)
pipes_sync.unlock ();
}
void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_)
void zmq::ctx_t::unregister_pipe (class pipe_t *pipe_)
{
pipes_sync.lock ();
pipes_t::size_type erased = pipes.erase (pipe_);
......@@ -257,7 +257,7 @@ void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_)
pipes_sync.unlock ();
}
int zmq::dispatcher_t::register_endpoint (const char *addr_,
int zmq::ctx_t::register_endpoint (const char *addr_,
socket_base_t *socket_)
{
endpoints_sync.lock ();
......@@ -274,7 +274,7 @@ int zmq::dispatcher_t::register_endpoint (const char *addr_,
return 0;
}
void zmq::dispatcher_t::unregister_endpoints (socket_base_t *socket_)
void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
{
endpoints_sync.lock ();
......@@ -292,7 +292,7 @@ void zmq::dispatcher_t::unregister_endpoints (socket_base_t *socket_)
endpoints_sync.unlock ();
}
zmq::socket_base_t *zmq::dispatcher_t::find_endpoint (const char *addr_)
zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_)
{
endpoints_sync.lock ();
......
......@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_DISPATCHER_HPP_INCLUDED__
#define __ZMQ_DISPATCHER_HPP_INCLUDED__
#ifndef __ZMQ_CTX_HPP_INCLUDED__
#define __ZMQ_CTX_HPP_INCLUDED__
#include <vector>
#include <set>
......@@ -35,13 +35,16 @@
namespace zmq
{
class dispatcher_t
// Context object encapsulates all the global state associated with
// the library.
class ctx_t
{
public:
// Create the dispatcher object. The argument specifies the size
// Create the context object. The argument specifies the size
// of I/O thread pool to create.
dispatcher_t (uint32_t io_threads_);
ctx_t (uint32_t io_threads_);
// This function is called when user invokes zmq_term. If there are
// no more sockets open it'll cause all the infrastructure to be shut
......@@ -70,7 +73,7 @@ namespace zmq
// Taskset specifies which I/O threads are eligible (0 = all).
class io_thread_t *choose_io_thread (uint64_t taskset_);
// All pipes are registered with the dispatcher so that even the
// All pipes are registered with the context so that even the
// orphaned pipes can be deallocated on the terminal shutdown.
void register_pipe (class pipe_t *pipe_);
void unregister_pipe (class pipe_t *pipe_);
......@@ -82,7 +85,7 @@ namespace zmq
private:
~dispatcher_t ();
~ctx_t ();
struct app_thread_info_t
{
......@@ -116,7 +119,7 @@ namespace zmq
// As pipes may reside in orphaned state in particular moments
// of the pipe shutdown process, i.e. neither pipe reader nor
// pipe writer hold reference to the pipe, we have to hold references
// to all pipes in dispatcher so that we can deallocate them
// to all pipes in context so that we can deallocate them
// during terminal shutdown even though it conincides with the
// pipe being in the orphaned state.
typedef std::set <class pipe_t*> pipes_t;
......@@ -143,8 +146,8 @@ namespace zmq
// Synchronisation of access to the list of inproc endpoints.
mutex_t endpoints_sync;
dispatcher_t (const dispatcher_t&);
void operator = (const dispatcher_t&);
ctx_t (const ctx_t&);
void operator = (const ctx_t&);
};
}
......
......@@ -24,11 +24,11 @@
#include "io_thread.hpp"
#include "platform.hpp"
#include "err.hpp"
#include "dispatcher.hpp"
#include "ctx.hpp"
zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_,
zmq::io_thread_t::io_thread_t (ctx_t *ctx_,
uint32_t thread_slot_) :
object_t (dispatcher_, thread_slot_)
object_t (ctx_, thread_slot_)
{
poller = new (std::nothrow) poller_t;
zmq_assert (poller);
......
......@@ -38,7 +38,7 @@ namespace zmq
{
public:
io_thread_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_);
io_thread_t (class ctx_t *ctx_, uint32_t thread_slot_);
// Clean-up. If the thread was started, it's neccessary to call 'stop'
// before invoking destructor. Otherwise the destructor would hang up.
......
......@@ -20,7 +20,7 @@
#include <string.h>
#include "object.hpp"
#include "dispatcher.hpp"
#include "ctx.hpp"
#include "err.hpp"
#include "pipe.hpp"
#include "io_thread.hpp"
......@@ -28,14 +28,14 @@
#include "session.hpp"
#include "socket_base.hpp"
zmq::object_t::object_t (dispatcher_t *dispatcher_, uint32_t thread_slot_) :
dispatcher (dispatcher_),
zmq::object_t::object_t (ctx_t *ctx_, uint32_t thread_slot_) :
ctx (ctx_),
thread_slot (thread_slot_)
{
}
zmq::object_t::object_t (object_t *parent_) :
dispatcher (parent_->dispatcher),
ctx (parent_->ctx),
thread_slot (parent_->thread_slot)
{
}
......@@ -49,9 +49,9 @@ uint32_t zmq::object_t::get_thread_slot ()
return thread_slot;
}
zmq::dispatcher_t *zmq::object_t::get_dispatcher ()
zmq::ctx_t *zmq::object_t::get_ctx ()
{
return dispatcher;
return ctx;
}
void zmq::object_t::process_command (command_t &cmd_)
......@@ -125,32 +125,32 @@ void zmq::object_t::process_command (command_t &cmd_)
void zmq::object_t::register_pipe (class pipe_t *pipe_)
{
dispatcher->register_pipe (pipe_);
ctx->register_pipe (pipe_);
}
void zmq::object_t::unregister_pipe (class pipe_t *pipe_)
{
dispatcher->unregister_pipe (pipe_);
ctx->unregister_pipe (pipe_);
}
int zmq::object_t::register_endpoint (const char *addr_, socket_base_t *socket_)
{
return dispatcher->register_endpoint (addr_, socket_);
return ctx->register_endpoint (addr_, socket_);
}
void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
{
return dispatcher->unregister_endpoints (socket_);
return ctx->unregister_endpoints (socket_);
}
zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_)
{
return dispatcher->find_endpoint (addr_);
return ctx->find_endpoint (addr_);
}
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
{
return dispatcher->choose_io_thread (taskset_);
return ctx->choose_io_thread (taskset_);
}
void zmq::object_t::send_stop ()
......@@ -160,7 +160,7 @@ void zmq::object_t::send_stop ()
command_t cmd;
cmd.destination = this;
cmd.type = command_t::stop;
dispatcher->send_command (thread_slot, cmd);
ctx->send_command (thread_slot, cmd);
}
void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_)
......@@ -369,6 +369,6 @@ void zmq::object_t::process_seqnum ()
void zmq::object_t::send_command (command_t &cmd_)
{
dispatcher->send_command (cmd_.destination->get_thread_slot (), cmd_);
ctx->send_command (cmd_.destination->get_thread_slot (), cmd_);
}
......@@ -32,15 +32,15 @@ namespace zmq
{
public:
object_t (class dispatcher_t *dispatcher_, uint32_t thread_slot_);
object_t (class ctx_t *ctx_, uint32_t thread_slot_);
object_t (object_t *parent_);
virtual ~object_t ();
uint32_t get_thread_slot ();
dispatcher_t *get_dispatcher ();
ctx_t *get_ctx ();
void process_command (struct command_t &cmd_);
// Allow pipe to access corresponding dispatcher functions.
// Allow pipe to access corresponding context functions.
void register_pipe (class pipe_t *pipe_);
void unregister_pipe (class pipe_t *pipe_);
......@@ -101,8 +101,8 @@ namespace zmq
private:
// Pointer to the root of the infrastructure.
class dispatcher_t *dispatcher;
// Context provides access to the global state.
class ctx_t *ctx;
// Slot ID of the thread the object belongs to.
uint32_t thread_slot;
......
......@@ -25,7 +25,7 @@
#include "socket_base.hpp"
#include "app_thread.hpp"
#include "dispatcher.hpp"
#include "zmq_listener.hpp"
#include "zmq_connecter.hpp"
#include "io_thread.hpp"
......@@ -34,6 +34,7 @@
#include "owned.hpp"
#include "pipe.hpp"
#include "err.hpp"
#include "ctx.hpp"
#include "platform.hpp"
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
......@@ -456,14 +457,14 @@ int zmq::socket_base_t::close ()
// Let the thread know that the socket is no longer available.
app_thread->remove_socket (this);
// Pointer to the dispatcher must be retrieved before the socket is
// Pointer to the context must be retrieved before the socket is
// deallocated. Afterwards it is not available.
dispatcher_t *dispatcher = get_dispatcher ();
ctx_t *ctx = get_ctx ();
// Unregister all inproc endpoints associated with this socket.
// From this point we are sure that inc_seqnum won't be called again
// on this object.
dispatcher->unregister_endpoints (this);
ctx->unregister_endpoints (this);
// Wait till all undelivered commands are delivered. This should happen
// very quickly. There's no way to wait here for extensive period of time.
......@@ -503,7 +504,7 @@ int zmq::socket_base_t::close ()
// This function must be called after the socket is completely deallocated
// as it may cause termination of the whole 0MQ infrastructure.
dispatcher->destroy_socket ();
ctx->destroy_socket ();
return 0;
}
......
......@@ -29,11 +29,11 @@
#include "streamer.hpp"
#include "socket_base.hpp"
#include "app_thread.hpp"
#include "dispatcher.hpp"
#include "msg_content.hpp"
#include "platform.hpp"
#include "stdint.hpp"
#include "config.hpp"
#include "ctx.hpp"
#include "err.hpp"
#include "fd.hpp"
......@@ -263,15 +263,14 @@ void *zmq_init (int /*app_threads_*/, int io_threads_, int /*flags_*/)
#endif
// Create 0MQ context.
zmq::dispatcher_t *dispatcher = new (std::nothrow) zmq::dispatcher_t (
(uint32_t) io_threads_);
zmq_assert (dispatcher);
return (void*) dispatcher;
zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_);
zmq_assert (ctx);
return (void*) ctx;
}
int zmq_term (void *dispatcher_)
int zmq_term (void *ctx_)
{
int rc = ((zmq::dispatcher_t*) dispatcher_)->term ();
int rc = ((zmq::ctx_t*) ctx_)->term ();
int en = errno;
#if defined ZMQ_HAVE_OPENPGM
......@@ -284,9 +283,9 @@ int zmq_term (void *dispatcher_)
return rc;
}
void *zmq_socket (void *dispatcher_, int type_)
void *zmq_socket (void *ctx_, int type_)
{
return (void*) (((zmq::dispatcher_t*) dispatcher_)->create_socket (type_));
return (void*) (((zmq::ctx_t*) ctx_)->create_socket (type_));
}
int zmq_close (void *s_)
......
......@@ -54,7 +54,7 @@ bool zmq::zmq_encoder_t::message_ready ()
// Destroy content of the old message.
zmq_msg_close(&in_progress);
// Read new message from the dispatcher. If there is none, return false.
// Read new message. If there is none, return false.
// Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine
// invocation.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment