Commit b8b4acef authored by Martin Sustrik's avatar Martin Sustrik

dispatcher renamed to context

parent 43fa72b7
...@@ -8,10 +8,10 @@ libzmq_la_SOURCES = \ ...@@ -8,10 +8,10 @@ libzmq_la_SOURCES = \
command.hpp \ command.hpp \
config.hpp \ config.hpp \
connecter.hpp \ connecter.hpp \
context.hpp \
data_distributor.hpp \ data_distributor.hpp \
decoder.hpp \ decoder.hpp \
devpoll.hpp \ devpoll.hpp \
dispatcher.hpp \
dummy_aggregator.hpp \ dummy_aggregator.hpp \
dummy_distributor.hpp \ dummy_distributor.hpp \
encoder.hpp \ encoder.hpp \
...@@ -70,9 +70,9 @@ libzmq_la_SOURCES = \ ...@@ -70,9 +70,9 @@ libzmq_la_SOURCES = \
zmq_tcp_engine.hpp \ zmq_tcp_engine.hpp \
app_thread.cpp \ app_thread.cpp \
connecter.cpp \ connecter.cpp \
context.cpp \
data_distributor.cpp \ data_distributor.cpp \
devpoll.hpp \ devpoll.hpp \
dispatcher.cpp \
dummy_aggregator.cpp \ dummy_aggregator.cpp \
dummy_distributor.cpp \ dummy_distributor.cpp \
epoll.cpp \ epoll.cpp \
......
...@@ -26,7 +26,7 @@ ...@@ -26,7 +26,7 @@
#endif #endif
#include "app_thread.hpp" #include "app_thread.hpp"
#include "dispatcher.hpp" #include "context.hpp"
#include "err.hpp" #include "err.hpp"
#include "session.hpp" #include "session.hpp"
#include "pipe.hpp" #include "pipe.hpp"
...@@ -51,8 +51,8 @@ ...@@ -51,8 +51,8 @@
#define ZMQ_DELAY_COMMANDS #define ZMQ_DELAY_COMMANDS
#endif #endif
zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) :
object_t (dispatcher_, thread_slot_), object_t (context_, thread_slot_),
tid (0), tid (0),
last_processing_time (0) last_processing_time (0)
{ {
...@@ -213,7 +213,7 @@ void zmq::app_thread_t::process_commands (bool block_) ...@@ -213,7 +213,7 @@ void zmq::app_thread_t::process_commands (bool block_)
for (int i = 0; i != thread_slot_count (); i++) { for (int i = 0; i != thread_slot_count (); i++) {
if (signals & (ypollset_t::signals_t (1) << i)) { if (signals & (ypollset_t::signals_t (1) << i)) {
command_t cmd; command_t cmd;
while (dispatcher->read (i, get_thread_slot (), &cmd)) while (context->read (i, get_thread_slot (), &cmd))
cmd.destination->process_command (cmd); cmd.destination->process_command (cmd);
} }
} }
......
...@@ -34,7 +34,7 @@ namespace zmq ...@@ -34,7 +34,7 @@ namespace zmq
{ {
public: public:
app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); app_thread_t (class context_t *context_, int thread_slot_);
// To be called when the whole infrastrucure is being closed. // To be called when the whole infrastrucure is being closed.
void shutdown (); void shutdown ();
...@@ -47,7 +47,7 @@ namespace zmq ...@@ -47,7 +47,7 @@ namespace zmq
struct i_api *create_socket (int type_); struct i_api *create_socket (int type_);
// Nota bene: The following two functions are accessed from different // Nota bene: The following two functions are accessed from different
// threads. The caller (dispatcher) is responsible for synchronisation // threads. The caller (context) is responsible for synchronisation
// of accesses. // of accesses.
// Returns true is current thread is associated with the app thread. // Returns true is current thread is associated with the app thread.
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#include "../include/zmq.h" #include "../include/zmq.h"
#include "dispatcher.hpp" #include "context.hpp"
#include "app_thread.hpp" #include "app_thread.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "platform.hpp" #include "platform.hpp"
...@@ -34,7 +34,7 @@ ...@@ -34,7 +34,7 @@
#include "windows.h" #include "windows.h"
#endif #endif
zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) zmq::context_t::context_t (int app_threads_, int io_threads_)
{ {
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
// Intialise Windows sockets. Note that WSAStartup can be called multiple // Intialise Windows sockets. Note that WSAStartup can be called multiple
...@@ -72,12 +72,12 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) ...@@ -72,12 +72,12 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_)
io_threads [i]->start (); io_threads [i]->start ();
} }
void zmq::dispatcher_t::shutdown () void zmq::context_t::shutdown ()
{ {
delete this; delete this;
} }
zmq::dispatcher_t::~dispatcher_t () zmq::context_t::~context_t ()
{ {
// Ask I/O threads to terminate. // Ask I/O threads to terminate.
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
...@@ -110,12 +110,12 @@ zmq::dispatcher_t::~dispatcher_t () ...@@ -110,12 +110,12 @@ zmq::dispatcher_t::~dispatcher_t ()
#endif #endif
} }
int zmq::dispatcher_t::thread_slot_count () int zmq::context_t::thread_slot_count ()
{ {
return signalers.size (); return signalers.size ();
} }
zmq::i_api *zmq::dispatcher_t::create_socket (int type_) zmq::i_api *zmq::context_t::create_socket (int type_)
{ {
threads_sync.lock (); threads_sync.lock ();
app_thread_t *thread = choose_app_thread (); app_thread_t *thread = choose_app_thread ();
...@@ -128,14 +128,14 @@ zmq::i_api *zmq::dispatcher_t::create_socket (int type_) ...@@ -128,14 +128,14 @@ zmq::i_api *zmq::dispatcher_t::create_socket (int type_)
return s; return s;
} }
zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread () zmq::app_thread_t *zmq::context_t::choose_app_thread ()
{ {
// Check whether thread ID is already assigned. If so, return it. // Check whether thread ID is already assigned. If so, return it.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
if (app_threads [i]->is_current ()) if (app_threads [i]->is_current ())
return app_threads [i]; return app_threads [i];
// Check whether there's an unused thread slot in the dispatcher. // Check whether there's an unused thread slot in the cotext.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
if (app_threads [i]->make_current ()) if (app_threads [i]->make_current ())
return app_threads [i]; return app_threads [i];
...@@ -145,7 +145,7 @@ zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread () ...@@ -145,7 +145,7 @@ zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread ()
return NULL; return NULL;
} }
zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t taskset_) zmq::io_thread_t *zmq::context_t::choose_io_thread (uint64_t taskset_)
{ {
zmq_assert (io_threads.size () > 0); zmq_assert (io_threads.size () > 0);
...@@ -165,7 +165,7 @@ zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t taskset_) ...@@ -165,7 +165,7 @@ zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t taskset_)
return io_threads [result]; return io_threads [result];
} }
void zmq::dispatcher_t::create_pipe (object_t *reader_parent_, void zmq::context_t::create_pipe (object_t *reader_parent_,
object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_,
pipe_reader_t **reader_, pipe_writer_t **writer_) pipe_reader_t **reader_, pipe_writer_t **writer_)
{ {
...@@ -191,7 +191,7 @@ void zmq::dispatcher_t::create_pipe (object_t *reader_parent_, ...@@ -191,7 +191,7 @@ void zmq::dispatcher_t::create_pipe (object_t *reader_parent_,
*writer_ = writer; *writer_ = writer;
} }
void zmq::dispatcher_t::destroy_pipe (pipe_t *pipe_) void zmq::context_t::destroy_pipe (pipe_t *pipe_)
{ {
// Remove the pipe from the repository. // Remove the pipe from the repository.
pipe_info_t info; pipe_info_t info;
...@@ -209,7 +209,7 @@ void zmq::dispatcher_t::destroy_pipe (pipe_t *pipe_) ...@@ -209,7 +209,7 @@ void zmq::dispatcher_t::destroy_pipe (pipe_t *pipe_)
delete info.writer; delete info.writer;
} }
int zmq::dispatcher_t::register_inproc_endpoint (const char *endpoint_, int zmq::context_t::register_inproc_endpoint (const char *endpoint_,
session_t *session_) session_t *session_)
{ {
inproc_endpoint_sync.lock (); inproc_endpoint_sync.lock ();
...@@ -227,7 +227,7 @@ int zmq::dispatcher_t::register_inproc_endpoint (const char *endpoint_, ...@@ -227,7 +227,7 @@ int zmq::dispatcher_t::register_inproc_endpoint (const char *endpoint_,
return 0; return 0;
} }
zmq::object_t *zmq::dispatcher_t::get_inproc_endpoint (const char *endpoint_) zmq::object_t *zmq::context_t::get_inproc_endpoint (const char *endpoint_)
{ {
inproc_endpoint_sync.lock (); inproc_endpoint_sync.lock ();
inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_); inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_);
...@@ -245,7 +245,7 @@ zmq::object_t *zmq::dispatcher_t::get_inproc_endpoint (const char *endpoint_) ...@@ -245,7 +245,7 @@ zmq::object_t *zmq::dispatcher_t::get_inproc_endpoint (const char *endpoint_)
return session; return session;
} }
void zmq::dispatcher_t::unregister_inproc_endpoints (session_t *session_) void zmq::context_t::unregister_inproc_endpoints (session_t *session_)
{ {
inproc_endpoint_sync.lock (); inproc_endpoint_sync.lock ();
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef __ZMQ_DISPATCHER_HPP_INCLUDED__ #ifndef __ZMQ_CONTEXT_HPP_INCLUDED__
#define __ZMQ_DISPATCHER_HPP_INCLUDED__ #define __ZMQ_CONTEXT_HPP_INCLUDED__
#include <vector> #include <vector>
#include <map> #include <map>
...@@ -37,19 +37,19 @@ namespace zmq ...@@ -37,19 +37,19 @@ namespace zmq
// Dispatcher implements bidirectional thread-safe passing of commands // Dispatcher implements bidirectional thread-safe passing of commands
// between N threads. It consists of a ypipes to pass commands and // between N threads. It consists of a ypipes to pass commands and
// signalers to wake up the receiver thread when new commands are // signalers to wake up the receiver thread when new commands are
// available. Note that dispatcher is inefficient for passing messages // available. Note that context is inefficient for passing messages
// within a thread (sender thread = receiver thread). The optimisation is // within a thread (sender thread = receiver thread). The optimisation is
// not part of the class and should be implemented by individual threads // not part of the class and should be implemented by individual threads
// (presumably by calling the command handling function directly). // (presumably by calling the command handling function directly).
class dispatcher_t class context_t
{ {
public: public:
// Create the dispatcher object. Matrix of pipes to communicate between // Create the context object. Matrix of pipes to communicate between
// each socket and each I/O thread is created along with appropriate // each socket and each I/O thread is created along with appropriate
// signalers. // signalers.
dispatcher_t (int app_threads_, int io_threads_); context_t (int app_threads_, int io_threads_);
// To be called to terminate the whole infrastructure (zmq_term). // To be called to terminate the whole infrastructure (zmq_term).
void shutdown (); void shutdown ();
...@@ -57,12 +57,12 @@ namespace zmq ...@@ -57,12 +57,12 @@ namespace zmq
// Create a socket engine. // Create a socket engine.
struct i_api *create_socket (int type_); struct i_api *create_socket (int type_);
// Returns number of thread slots in the dispatcher. To be used by // Returns number of thread slots in the context. To be used by
// individual threads to find out how many distinct signals can be // individual threads to find out how many distinct signals can be
// received. // received.
int thread_slot_count (); int thread_slot_count ();
// Write command to the dispatcher. // Send command from the source to the destination.
inline void write (int source_, int destination_, inline void write (int source_, int destination_,
const command_t &command_) const command_t &command_)
{ {
...@@ -73,7 +73,7 @@ namespace zmq ...@@ -73,7 +73,7 @@ namespace zmq
signalers [destination_]->signal (source_); signalers [destination_]->signal (source_);
} }
// Read command from the dispatcher. Returns false if there is no // Receive command from the source. Returns false if there is no
// command available. // command available.
inline bool read (int source_, int destination_, command_t *command_) inline bool read (int source_, int destination_, command_t *command_)
{ {
...@@ -110,7 +110,7 @@ namespace zmq ...@@ -110,7 +110,7 @@ namespace zmq
private: private:
// Clean-up. // Clean-up.
~dispatcher_t (); ~context_t ();
// Returns the app thread associated with the current thread. // Returns the app thread associated with the current thread.
// NULL if we are out of app thread slots. // NULL if we are out of app thread slots.
...@@ -160,8 +160,8 @@ namespace zmq ...@@ -160,8 +160,8 @@ namespace zmq
// of inproc endpoints. // of inproc endpoints.
mutex_t inproc_endpoint_sync; mutex_t inproc_endpoint_sync;
dispatcher_t (const dispatcher_t&); context_t (const context_t&);
void operator = (const dispatcher_t&); void operator = (const context_t&);
}; };
} }
......
...@@ -29,13 +29,13 @@ ...@@ -29,13 +29,13 @@
#include "select.hpp" #include "select.hpp"
#include "devpoll.hpp" #include "devpoll.hpp"
#include "kqueue.hpp" #include "kqueue.hpp"
#include "dispatcher.hpp" #include "context.hpp"
#include "session.hpp" #include "session.hpp"
#include "simple_semaphore.hpp" #include "simple_semaphore.hpp"
#include "session.hpp" #include "session.hpp"
zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) : zmq::io_thread_t::io_thread_t (context_t *context_, int thread_slot_) :
object_t (dispatcher_, thread_slot_) object_t (context_, thread_slot_)
{ {
#if defined ZMQ_FORCE_SELECT #if defined ZMQ_FORCE_SELECT
poller = new select_t; poller = new select_t;
...@@ -131,7 +131,7 @@ void zmq::io_thread_t::in_event () ...@@ -131,7 +131,7 @@ void zmq::io_thread_t::in_event ()
// Read all the commands from particular thread. // Read all the commands from particular thread.
command_t cmd; command_t cmd;
while (dispatcher->read (source_thread_slot, thread_slot, &cmd)) while (context->read (source_thread_slot, thread_slot, &cmd))
cmd.destination->process_command (cmd); cmd.destination->process_command (cmd);
} }
} }
......
...@@ -38,7 +38,7 @@ namespace zmq ...@@ -38,7 +38,7 @@ namespace zmq
{ {
public: public:
io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_); io_thread_t (class context_t *context_, int thread_slot_);
// Launch the physical thread. // Launch the physical thread.
void start (); void start ();
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
*/ */
#include "object.hpp" #include "object.hpp"
#include "dispatcher.hpp" #include "context.hpp"
#include "err.hpp" #include "err.hpp"
#include "pipe_reader.hpp" #include "pipe_reader.hpp"
#include "pipe_writer.hpp" #include "pipe_writer.hpp"
...@@ -27,14 +27,14 @@ ...@@ -27,14 +27,14 @@
#include "simple_semaphore.hpp" #include "simple_semaphore.hpp"
#include "i_engine.hpp" #include "i_engine.hpp"
zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) : zmq::object_t::object_t (context_t *context_, int thread_slot_) :
dispatcher (dispatcher_), context (context_),
thread_slot (thread_slot_) thread_slot (thread_slot_)
{ {
} }
zmq::object_t::object_t (object_t *parent_) : zmq::object_t::object_t (object_t *parent_) :
dispatcher (parent_->dispatcher), context (parent_->context),
thread_slot (parent_->thread_slot) thread_slot (parent_->thread_slot)
{ {
} }
...@@ -45,7 +45,7 @@ zmq::object_t::~object_t () ...@@ -45,7 +45,7 @@ zmq::object_t::~object_t ()
int zmq::object_t::thread_slot_count () int zmq::object_t::thread_slot_count ()
{ {
return dispatcher->thread_slot_count (); return context->thread_slot_count ();
} }
int zmq::object_t::get_thread_slot () int zmq::object_t::get_thread_slot ()
...@@ -107,34 +107,34 @@ void zmq::object_t::create_pipe (object_t *reader_parent_, ...@@ -107,34 +107,34 @@ void zmq::object_t::create_pipe (object_t *reader_parent_,
object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_,
pipe_reader_t **reader_, pipe_writer_t **writer_) pipe_reader_t **reader_, pipe_writer_t **writer_)
{ {
dispatcher->create_pipe (reader_parent_, writer_parent_, hwm_, lwm_, context->create_pipe (reader_parent_, writer_parent_, hwm_, lwm_,
reader_, writer_); reader_, writer_);
} }
void zmq::object_t::destroy_pipe (pipe_t *pipe_) void zmq::object_t::destroy_pipe (pipe_t *pipe_)
{ {
dispatcher->destroy_pipe (pipe_); context->destroy_pipe (pipe_);
} }
int zmq::object_t::register_inproc_endpoint (const char *endpoint_, int zmq::object_t::register_inproc_endpoint (const char *endpoint_,
session_t *session_) session_t *session_)
{ {
return dispatcher->register_inproc_endpoint (endpoint_, session_); return context->register_inproc_endpoint (endpoint_, session_);
} }
zmq::object_t *zmq::object_t::get_inproc_endpoint (const char *endpoint_) zmq::object_t *zmq::object_t::get_inproc_endpoint (const char *endpoint_)
{ {
return dispatcher->get_inproc_endpoint (endpoint_); return context->get_inproc_endpoint (endpoint_);
} }
void zmq::object_t::unregister_inproc_endpoints (session_t *session_) void zmq::object_t::unregister_inproc_endpoints (session_t *session_)
{ {
dispatcher->unregister_inproc_endpoints (session_); context->unregister_inproc_endpoints (session_);
} }
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
{ {
return dispatcher->choose_io_thread (taskset_); return context->choose_io_thread (taskset_);
} }
void zmq::object_t::send_stop () void zmq::object_t::send_stop ()
...@@ -144,7 +144,7 @@ void zmq::object_t::send_stop () ...@@ -144,7 +144,7 @@ void zmq::object_t::send_stop ()
command_t cmd; command_t cmd;
cmd.destination = this; cmd.destination = this;
cmd.type = command_t::stop; cmd.type = command_t::stop;
dispatcher->write (thread_slot, thread_slot, cmd); context->write (thread_slot, thread_slot, cmd);
} }
void zmq::object_t::send_bind (object_t *destination_, pipe_reader_t *reader_, void zmq::object_t::send_bind (object_t *destination_, pipe_reader_t *reader_,
...@@ -289,6 +289,6 @@ void zmq::object_t::send_command (command_t &cmd_) ...@@ -289,6 +289,6 @@ void zmq::object_t::send_command (command_t &cmd_)
if (destination_thread_slot == thread_slot) if (destination_thread_slot == thread_slot)
cmd_.destination->process_command (cmd_); cmd_.destination->process_command (cmd_);
else else
dispatcher->write (thread_slot, destination_thread_slot, cmd_); context->write (thread_slot, destination_thread_slot, cmd_);
} }
...@@ -32,7 +32,7 @@ namespace zmq ...@@ -32,7 +32,7 @@ namespace zmq
{ {
public: public:
object_t (class dispatcher_t *dispatcher_, int thread_slot_); object_t (class context_t *context_, int thread_slot_);
object_t (object_t *parent_); object_t (object_t *parent_);
~object_t (); ~object_t ();
...@@ -42,7 +42,7 @@ namespace zmq ...@@ -42,7 +42,7 @@ namespace zmq
protected: protected:
// Derived object can use following functions to interact with // Derived object can use following functions to interact with
// global repositories. See dispatcher.hpp for function details. // global repositories. See context.hpp for function details.
int thread_slot_count (); int thread_slot_count ();
void create_pipe (class object_t *reader_parent_, void create_pipe (class object_t *reader_parent_,
class object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_, class object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_,
...@@ -87,7 +87,7 @@ namespace zmq ...@@ -87,7 +87,7 @@ namespace zmq
virtual void process_terminate_ack (); virtual void process_terminate_ack ();
// Pointer to the root of the infrastructure. // Pointer to the root of the infrastructure.
class dispatcher_t *dispatcher; class context_t *context;
// Slot ID of the thread the object belongs to. // Slot ID of the thread the object belongs to.
int thread_slot; int thread_slot;
......
...@@ -32,10 +32,10 @@ namespace zmq ...@@ -32,10 +32,10 @@ namespace zmq
class pipe_t : public ypipe_t <zmq_msg, false, message_pipe_granularity> class pipe_t : public ypipe_t <zmq_msg, false, message_pipe_granularity>
{ {
// Dispatcher is a friend so that it can create & destroy the pipes. // Context is a friend so that it can create & destroy the pipes.
// By making constructor & destructor private we are sure that nobody // By making constructor & destructor private we are sure that nobody
// except dispatcher messes with pipes. // except context messes with pipes.
friend class dispatcher_t; friend class context_t;
private: private:
...@@ -45,7 +45,7 @@ namespace zmq ...@@ -45,7 +45,7 @@ namespace zmq
void set_index (int index_); void set_index (int index_);
int get_index (); int get_index ();
// Index of the pipe in dispatcher's array of pipes. // Index of the pipe in context's array of pipes.
int index; int index;
pipe_t (const pipe_t&); pipe_t (const pipe_t&);
......
...@@ -113,6 +113,6 @@ void zmq::pipe_reader_t::terminate () ...@@ -113,6 +113,6 @@ void zmq::pipe_reader_t::terminate ()
void zmq::pipe_reader_t::process_terminate_ack () void zmq::pipe_reader_t::process_terminate_ack ()
{ {
// Ask dispatcher to deallocate the pipe. // Ask context to deallocate the pipe.
destroy_pipe (pipe); destroy_pipe (pipe);
} }
...@@ -28,10 +28,10 @@ namespace zmq ...@@ -28,10 +28,10 @@ namespace zmq
class pipe_reader_t : public object_t class pipe_reader_t : public object_t
{ {
// Dispatcher is a friend so that it can create & destroy the reader. // Context is a friend so that it can create & destroy the reader.
// By making constructor & destructor private we are sure that nobody // By making constructor & destructor private we are sure that nobody
// except dispatcher messes with readers. // except context messes with readers.
friend class dispatcher_t; friend class context_t;
public: public:
......
...@@ -28,10 +28,10 @@ namespace zmq ...@@ -28,10 +28,10 @@ namespace zmq
class pipe_writer_t : public object_t class pipe_writer_t : public object_t
{ {
// Dispatcher is a friend so that it can create & destroy the writer. // Context is a friend so that it can create & destroy the writer.
// By making constructor & destructor private we are sure that nobody // By making constructor & destructor private we are sure that nobody
// except dispatcher messes with writers. // except context messes with writers.
friend class dispatcher_t; friend class context_t;
public: public:
......
...@@ -19,9 +19,9 @@ ...@@ -19,9 +19,9 @@
#include "safe_object.hpp" #include "safe_object.hpp"
zmq::safe_object_t::safe_object_t (class dispatcher_t *dispatcher_, zmq::safe_object_t::safe_object_t (class context_t *context_,
int thread_slot_) : int thread_slot_) :
object_t (dispatcher_, thread_slot_), object_t (context_, thread_slot_),
processed_seqnum (0), processed_seqnum (0),
terminating (false) terminating (false)
{ {
......
...@@ -36,7 +36,7 @@ namespace zmq ...@@ -36,7 +36,7 @@ namespace zmq
{ {
public: public:
safe_object_t (class dispatcher_t *dispatcher_, int thread_slot_); safe_object_t (class context_t *context_, int thread_slot_);
safe_object_t (object_t *parent_); safe_object_t (object_t *parent_);
void inc_seqnum (); void inc_seqnum ();
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
#include "i_api.hpp" #include "i_api.hpp"
#include "err.hpp" #include "err.hpp"
#include "dispatcher.hpp" #include "context.hpp"
#include "msg.hpp" #include "msg.hpp"
int zmq_msg_init (zmq_msg *msg_) int zmq_msg_init (zmq_msg *msg_)
...@@ -162,28 +162,27 @@ int zmq_msg_type (zmq_msg *msg_) ...@@ -162,28 +162,27 @@ int zmq_msg_type (zmq_msg *msg_)
void *zmq_init (int app_threads_, int io_threads_) void *zmq_init (int app_threads_, int io_threads_)
{ {
// There should be at least a single thread managed by the dispatcher. // There should be at least a single thread managed by the context.
if (app_threads_ < 0 || io_threads_ < 0 || if (app_threads_ < 0 || io_threads_ < 0 ||
app_threads_ + io_threads_ == 0) { app_threads_ + io_threads_ == 0) {
errno = EINVAL; errno = EINVAL;
return NULL; return NULL;
} }
zmq::dispatcher_t *dispatcher = zmq::context_t *context = new zmq::context_t (app_threads_, io_threads_);
new zmq::dispatcher_t (app_threads_, io_threads_); zmq_assert (context);
zmq_assert (dispatcher); return (void*) context;
return (void*) dispatcher;
} }
int zmq_term (void *context_) int zmq_term (void *context_)
{ {
((zmq::dispatcher_t*) context_)->shutdown (); ((zmq::context_t*) context_)->shutdown ();
return 0; return 0;
} }
void *zmq_socket (void *context_, int type_) void *zmq_socket (void *context_, int type_)
{ {
return (void*) (((zmq::dispatcher_t*) context_)->create_socket (type_)); return (void*) (((zmq::context_t*) context_)->create_socket (type_));
} }
int zmq_close (void *s_) int zmq_close (void *s_)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment