Commit 6be4b014 authored by Martin Sustrik's avatar Martin Sustrik

session management implemented

parent a801b6d8
......@@ -94,50 +94,50 @@ typedef void (zmq_free_fn) (void *data);
// is shared, i.e. reference counting is used to manage its lifetime
// rather than straighforward malloc/free. struct zmq_msg_content is
// not declared in the API.
struct zmq_msg
struct zmq_msg_t
{
struct zmq_msg_content *content;
void *content;
unsigned char shared;
uint16_t vsm_size;
unsigned char vsm_data [ZMQ_MAX_VSM_SIZE];
};
// Initialise an empty message (zero bytes long).
ZMQ_EXPORT int zmq_msg_init (zmq_msg *msg);
ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg);
// Initialise a message 'size' bytes long.
//
// Errors: ENOMEM - the size is too large to allocate.
ZMQ_EXPORT int zmq_msg_init_size (zmq_msg *msg, size_t size);
ZMQ_EXPORT int zmq_msg_init_size (zmq_msg_t *msg, size_t size);
// Initialise a message from an existing buffer. Message isn't copied,
// instead 0SOCKETS infrastructure take ownership of the buffer and call
// deallocation functio (ffn) once it's not needed anymore.
ZMQ_EXPORT int zmq_msg_init_data (zmq_msg *msg, void *data, size_t size,
ZMQ_EXPORT int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size,
zmq_free_fn *ffn);
// Deallocate the message.
ZMQ_EXPORT int zmq_msg_close (zmq_msg *msg);
ZMQ_EXPORT int zmq_msg_close (zmq_msg_t *msg);
// Move the content of the message from 'src' to 'dest'. The content isn't
// copied, just moved. 'src' is an empty message after the call. Original
// content of 'dest' message is deallocated.
ZMQ_EXPORT int zmq_msg_move (zmq_msg *dest, zmq_msg *src);
ZMQ_EXPORT int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);
// Copy the 'src' message to 'dest'. The content isn't copied, instead
// reference count is increased. Don't modify the message data after the
// call as they are shared between two messages. Original content of 'dest'
// message is deallocated.
ZMQ_EXPORT int zmq_msg_copy (zmq_msg *dest, zmq_msg *src);
ZMQ_EXPORT int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);
// Returns pointer to message data.
ZMQ_EXPORT void *zmq_msg_data (zmq_msg *msg);
ZMQ_EXPORT void *zmq_msg_data (zmq_msg_t *msg);
// Return size of message data (in bytes).
ZMQ_EXPORT size_t zmq_msg_size (zmq_msg *msg);
ZMQ_EXPORT size_t zmq_msg_size (zmq_msg_t *msg);
// Returns type of the message.
ZMQ_EXPORT int zmq_msg_type (zmq_msg *msg);
ZMQ_EXPORT int zmq_msg_type (zmq_msg_t *msg);
// Initialise 0SOCKETS context. 'app_threads' specifies maximal number
// of application threads that can have open sockets at the same time.
......@@ -163,7 +163,7 @@ ZMQ_EXPORT int zmq_close (void *s);
// Sets an option on the socket.
// EINVAL - unknown option, a value with incorrect length or an invalid value.
ZMQ_EXPORT int zmq_setsockopt (void *s, int option_, void *optval_,
ZMQ_EXPORT int zmq_setsockopt (void *s, int option_, const void *optval_,
size_t optvallen_);
// Bind the socket to a particular address.
......@@ -182,7 +182,7 @@ ZMQ_EXPORT int zmq_connect (void *s, const char *addr);
// Errors: EAGAIN - message cannot be sent at the moment (applies only to
// non-blocking send).
// ENOTSUP - function isn't supported by particular socket type.
ZMQ_EXPORT int zmq_send (void *s, zmq_msg *msg, int flags);
ZMQ_EXPORT int zmq_send (void *s, zmq_msg_t *msg, int flags);
// Flush the messages that were send using ZMQ_NOFLUSH flag down the stream.
//
......@@ -196,7 +196,7 @@ ZMQ_EXPORT int zmq_flush (void *s);
// Errors: EAGAIN - message cannot be received at the moment (applies only to
// non-blocking receive).
// ENOTSUP - function isn't supported by particular socket type.
ZMQ_EXPORT int zmq_recv (void *s, zmq_msg *msg, int flags);
ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags);
#ifdef __cplusplus
}
......
......@@ -74,7 +74,7 @@ namespace zmq
// copied it - the behaviour is undefined. Don't change the body of the
// received message either - other threads may be accessing it in parallel.
class message_t : private zmq_msg
class message_t : private zmq_msg_t
{
friend class socket_t;
......@@ -139,7 +139,7 @@ namespace zmq
// of data after the operation.
inline void move_to (message_t *msg_)
{
int rc = zmq_msg_move (this, (zmq_msg*) msg_);
int rc = zmq_msg_move (this, (zmq_msg_t*) msg_);
assert (rc == 0);
}
......@@ -148,7 +148,7 @@ namespace zmq
// these get deallocated.
inline void copy_to (message_t *msg_)
{
int rc = zmq_msg_copy (this, (zmq_msg*) msg_);
int rc = zmq_msg_copy (this, (zmq_msg_t*) msg_);
assert (rc == 0);
}
......@@ -230,9 +230,10 @@ namespace zmq
assert (rc == 0);
}
template <typename T> inline void setsockopt (int option_, T &value_)
inline void setsockopt (int option_, const void *optval_,
size_t optvallen_)
{
int rc = zmq_setsockopt (ptr, option_, (void*) &value_, sizeof (T));
int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_);
assert (rc == 0);
}
......
......@@ -23,7 +23,7 @@ libzmq_la_SOURCES = \
i_poll_events.hpp \
i_signaler.hpp \
kqueue.hpp \
msg.hpp \
msg_content.hpp \
mutex.hpp \
object.hpp \
options.hpp \
......@@ -47,11 +47,12 @@ libzmq_la_SOURCES = \
ypollset.hpp \
yqueue.hpp \
zmq_connecter.hpp \
zmq_connecter_init.hpp \
zmq_decoder.hpp \
zmq_encoder.hpp \
zmq_engine.hpp \
zmq_init.hpp \
zmq_listener.hpp \
zmq_listener_init.hpp \
app_thread.cpp \
devpoll.cpp \
dispatcher.cpp \
......@@ -77,11 +78,12 @@ libzmq_la_SOURCES = \
ypollset.cpp \
zmq.cpp \
zmq_connecter.cpp \
zmq_connecter_init.cpp \
zmq_decoder.cpp \
zmq_encoder.cpp \
zmq_engine.cpp \
zmq_init.cpp \
zmq_listener.cpp
zmq_listener.cpp \
zmq_listener_init.cpp
libzmq_la_LDFLAGS = -version-info 0:0:0
libzmq_la_CXXFLAGS = -Wall -pedantic -Werror @ZMQ_EXTRA_CXXFLAGS@
......
......@@ -27,8 +27,8 @@ namespace zmq
struct i_inout
{
virtual bool read (::zmq_msg *msg_) = 0;
virtual bool write (::zmq_msg *msg_) = 0;
virtual bool read (::zmq_msg_t *msg_) = 0;
virtual bool write (::zmq_msg_t *msg_) = 0;
virtual void flush () = 0;
};
......
......@@ -26,8 +26,8 @@
#include "atomic_counter.hpp"
//namespace zmq
//{
namespace zmq
{
// Shared message buffer. Message data are either allocated in one
// continuous block along with this structure - thus avoiding one
......@@ -36,7 +36,8 @@
// used to deallocate the data. If the buffer is actually shared (there
// are at least 2 references to it) refcount member contains number of
// references.
struct zmq_msg_content
struct msg_content_t
{
void *data;
size_t size;
......@@ -44,6 +45,6 @@
zmq::atomic_counter_t refcnt;
};
//}
}
#endif
......@@ -131,9 +131,8 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
void zmq::object_t::send_attach (session_t *destination_, zmq_engine_t *engine_)
{
// Let the object know that it cannot shut down till it gets this command.
destination_->inc_seqnum ();
// The assumption here is that command sequence number of the destination
// object was already incremented in find_session function.
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::attach;
......
......@@ -30,6 +30,10 @@ namespace zmq
class object_t
{
// Repository of sessions needs to use caller's send_* functions
// when creating new session. TODO: Get rid of this dependency.
friend class socket_base_t;
public:
object_t (class dispatcher_t *dispatcher_, int thread_slot_);
......
......@@ -47,6 +47,14 @@ void zmq::owned_t::process_plug ()
finalise_command ();
}
void zmq::owned_t::process_attach (zmq_engine_t *engine_)
{
// Keep track of how many commands were processed so far.
processed_seqnum++;
finalise_command ();
}
void zmq::owned_t::term ()
{
send_term_req (owner, this);
......@@ -65,8 +73,8 @@ void zmq::owned_t::finalise_command ()
// If termination request was already received and there are no more
// commands to wait for, terminate the object.
if (shutting_down && processed_seqnum == sent_seqnum.get ()) {
send_term_ack (owner);
process_unplug ();
send_term_ack (owner);
delete this;
}
}
......
......@@ -59,6 +59,10 @@ namespace zmq
// handler.
void process_plug ();
// It's vital that session invokes io_object_t::process_attach
// at the end of it's own attach handler.
void process_attach (class zmq_engine_t *engine_);
// io_object_t defines a new handler used to disconnect the object
// from the poller object. Implement the handlen in the derived
// classes to ensure sane cleanup.
......
......@@ -29,7 +29,7 @@ namespace zmq
{
// Message pipe.
class pipe_t : public ypipe_t <zmq_msg, false, message_pipe_granularity>
class pipe_t : public ypipe_t <zmq_msg_t, false, message_pipe_granularity>
{
};
......
......@@ -22,9 +22,10 @@
#include "err.hpp"
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
zmq_engine_t *engine_) :
const char *name_) :
owned_t (parent_, owner_),
engine (engine_)
engine (NULL),
name (name_)
{
}
......@@ -32,12 +33,12 @@ zmq::session_t::~session_t ()
{
}
bool zmq::session_t::read (::zmq_msg *msg_)
bool zmq::session_t::read (::zmq_msg_t *msg_)
{
return false;
}
bool zmq::session_t::write (::zmq_msg *msg_)
bool zmq::session_t::write (::zmq_msg_t *msg_)
{
return false;
}
......@@ -48,14 +49,34 @@ void zmq::session_t::flush ()
void zmq::session_t::process_plug ()
{
zmq_assert (engine);
engine->plug (this);
// Register the session with the socket.
bool ok = owner->register_session (name.c_str (), this);
// There's already a session with the specified identity.
// We should syslog it and drop the session. TODO
zmq_assert (ok);
owned_t::process_plug ();
}
void zmq::session_t::process_unplug ()
{
zmq_assert (engine);
engine->unplug ();
delete engine;
// Unregister the session from the socket.
bool ok = owner->unregister_session (name.c_str ());
zmq_assert (ok);
if (engine) {
engine->unplug ();
delete engine;
engine = NULL;
}
}
void zmq::session_t::process_attach (class zmq_engine_t *engine_)
{
zmq_assert (engine_);
engine = engine_;
engine->plug (this);
owned_t::process_attach (engine_);
}
......@@ -20,8 +20,11 @@
#ifndef __ZMQ_SESSION_HPP_INCLUDED__
#define __ZMQ_SESSION_HPP_INCLUDED__
#include <string>
#include "i_inout.hpp"
#include "owned.hpp"
#include "options.hpp"
namespace zmq
{
......@@ -30,24 +33,28 @@ namespace zmq
{
public:
session_t (object_t *parent_, socket_base_t *owner_,
class zmq_engine_t *engine_);
session_t (object_t *parent_, socket_base_t *owner_, const char *name_);
private:
~session_t ();
// i_inout interface implementation.
bool read (::zmq_msg *msg_);
bool write (::zmq_msg *msg_);
bool read (::zmq_msg_t *msg_);
bool write (::zmq_msg_t *msg_);
void flush ();
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
void process_attach (class zmq_engine_t *engine_);
class zmq_engine_t *engine;
// The name of the session. One that is used to register it with
// socket-level repository of sessions.
std::string name;
session_t (const session_t&);
void operator = (const session_t&);
};
......
......@@ -17,6 +17,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string>
#include <algorithm>
#include "../include/zmq.h"
......@@ -30,16 +31,20 @@
#include "session.hpp"
#include "config.hpp"
#include "owned.hpp"
#include "uuid.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_),
pending_term_acks (0),
app_thread (parent_)
app_thread (parent_),
shutting_down (false)
{
}
zmq::socket_base_t::~socket_base_t ()
{
shutting_down = true;
while (true) {
// On third pass of the loop there should be no more I/O objects
......@@ -64,10 +69,12 @@ zmq::socket_base_t::~socket_base_t ()
}
// Check whether there are no session leaks.
sessions_sync.lock ();
zmq_assert (sessions.empty ());
sessions_sync.unlock ();
}
int zmq::socket_base_t::setsockopt (int option_, void *optval_,
int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
size_t optvallen_)
{
switch (option_) {
......@@ -113,11 +120,7 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_,
return 0;
case ZMQ_IDENTITY:
if (optvallen_ != sizeof (const char*)) {
errno = EINVAL;
return -1;
}
options.identity = (const char*) optval_;
options.identity.assign ((const char*) optval_, optvallen_);
return 0;
default:
......@@ -141,18 +144,34 @@ int zmq::socket_base_t::bind (const char *addr_)
int zmq::socket_base_t::connect (const char *addr_)
{
// Generate a unique name for the session.
std::string session_name ("#");
session_name += uuid_t ().to_string ();
// Create the session.
io_thread_t *io_thread = choose_io_thread (options.affinity);
session_t *session = new session_t (io_thread, this, session_name.c_str ());
zmq_assert (session);
send_plug (session);
send_own (this, session);
// Create the connecter object. Supply it with the session name so that
// it can bind the new connection to the session once it is established.
zmq_connecter_t *connecter = new zmq_connecter_t (
choose_io_thread (options.affinity), this, options);
choose_io_thread (options.affinity), this, options,
session_name.c_str ());
int rc = connecter->set_address (addr_);
if (rc != 0)
if (rc != 0) {
delete connecter;
return -1;
}
send_plug (connecter);
send_own (this, connecter);
return 0;
}
int zmq::socket_base_t::send (struct zmq_msg *msg_, int flags_)
int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
{
zmq_assert (false);
}
......@@ -162,7 +181,7 @@ int zmq::socket_base_t::flush ()
zmq_assert (false);
}
int zmq::socket_base_t::recv (struct zmq_msg *msg_, int flags_)
int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
{
zmq_assert (false);
}
......@@ -174,35 +193,40 @@ int zmq::socket_base_t::close ()
return 0;
}
void zmq::socket_base_t::register_session (const char *name_,
bool zmq::socket_base_t::register_session (const char *name_,
session_t *session_)
{
sessions_sync.lock ();
bool inserted = sessions.insert (std::make_pair (name_, session_)).second;
zmq_assert (inserted);
bool registered = sessions.insert (std::make_pair (name_, session_)).second;
sessions_sync.unlock ();
return registered;
}
void zmq::socket_base_t::unregister_session (const char *name_)
bool zmq::socket_base_t::unregister_session (const char *name_)
{
sessions_sync.lock ();
sessions_t::iterator it = sessions.find (name_);
zmq_assert (it != sessions.end ());
bool unregistered = (it != sessions.end ());
sessions.erase (it);
sessions_sync.unlock ();
return unregistered;
}
zmq::session_t *zmq::socket_base_t::get_session (const char *name_)
zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
{
sessions_sync.lock ();
sessions_t::iterator it = sessions.find (name_);
session_t *session = NULL;
if (it != sessions.end ()) {
session = it->second;
session->inc_seqnum ();
}
if (it == sessions.end ()) {
sessions_sync.unlock ();
return NULL;
}
// Prepare the session for subsequent attach command.
it->second->inc_seqnum ();
sessions_sync.unlock ();
return session;
return it->second;
}
void zmq::socket_base_t::process_own (owned_t *object_)
......@@ -212,6 +236,11 @@ void zmq::socket_base_t::process_own (owned_t *object_)
void zmq::socket_base_t::process_term_req (owned_t *object_)
{
// When shutting down we can ignore termination requests from owned
// objects. They are going to be terminated anyway.
if (shutting_down)
return;
// If I/O object is well and alive ask it to terminate.
io_objects_t::iterator it = std::find (io_objects.begin (),
io_objects.end (), object_);
......
......@@ -40,23 +40,22 @@ namespace zmq
~socket_base_t ();
// Interface for communication with the API layer.
virtual int setsockopt (int option_, void *optval_, size_t optvallen_);
virtual int setsockopt (int option_, const void *optval_,
size_t optvallen_);
virtual int bind (const char *addr_);
virtual int connect (const char *addr_);
virtual int send (struct zmq_msg *msg_, int flags_);
virtual int send (struct zmq_msg_t *msg_, int flags_);
virtual int flush ();
virtual int recv (struct zmq_msg *msg_, int flags_);
virtual int recv (struct zmq_msg_t *msg_, int flags_);
virtual int close ();
// Functions that owned objects use to manipulate socket's list
// of existing sessions.
// Note that this functionality cannot be implemented via inter-thread
// The list of sessions cannot be accessed via inter-thread
// commands as it is unacceptable to wait for the completion of the
// action till user application yields control of the application
// thread to 0MQ.
void register_session (const char *name_, class session_t *session_);
void unregister_session (const char *name_);
class session_t *get_session (const char *name_);
bool register_session (const char *name_, class session_t *session_);
bool unregister_session (const char *name_);
class session_t *find_session (const char *name_);
private:
......@@ -80,10 +79,17 @@ namespace zmq
// Socket options.
options_t options;
// If true, socket is already shutting down. No new work should be
// started.
bool shutting_down;
// List of existing sessions. This list is never referenced from within
// the socket, instead it is used by I/O objects owned by the session.
// As those objects can live in different threads, the access is
// synchronised using 'sessions_sync' mutex.
// Local sessions are those named by the local instance of 0MQ.
// Remote sessions are the sessions who's identities are provided by
// the remote party.
typedef std::map <std::string, session_t*> sessions_t;
sessions_t sessions;
mutex_t sessions_sync;
......
......@@ -26,76 +26,79 @@
#include "socket_base.hpp"
#include "err.hpp"
#include "dispatcher.hpp"
#include "msg.hpp"
#include "msg_content.hpp"
int zmq_msg_init (zmq_msg *msg_)
int zmq_msg_init (zmq_msg_t *msg_)
{
msg_->content = (zmq_msg_content*) ZMQ_VSM;
msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
msg_->vsm_size = 0;
return 0;
}
int zmq_msg_init_size (zmq_msg *msg_, size_t size_)
int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
{
if (size_ <= ZMQ_MAX_VSM_SIZE) {
msg_->content = (zmq_msg_content*) ZMQ_VSM;
msg_->content = (zmq::msg_content_t*) ZMQ_VSM;
msg_->vsm_size = (uint16_t) size_;
}
else {
msg_->content = (zmq_msg_content*) malloc (sizeof (zmq_msg_content) +
size_);
msg_->content =
(zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t) + size_);
if (!msg_->content) {
errno = ENOMEM;
return -1;
}
msg_->shared = 0;
msg_->content->data = (void*) (msg_->content + 1);
msg_->content->size = size_;
msg_->content->ffn = NULL;
new (&msg_->content->refcnt) zmq::atomic_counter_t ();
zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
content->data = (void*) (content + 1);
content->size = size_;
content->ffn = NULL;
new (&content->refcnt) zmq::atomic_counter_t ();
}
return 0;
}
int zmq_msg_init_data (zmq_msg *msg_, void *data_, size_t size_,
int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
zmq_free_fn *ffn_)
{
msg_->shared = 0;
msg_->content = (zmq_msg_content*) malloc (sizeof (zmq_msg_content));
msg_->content = (zmq::msg_content_t*) malloc (sizeof (zmq::msg_content_t));
zmq_assert (msg_->content);
msg_->content->data = data_;
msg_->content->size = size_;
msg_->content->ffn = ffn_;
new (&msg_->content->refcnt) zmq::atomic_counter_t ();
zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
content->data = data_;
content->size = size_;
content->ffn = ffn_;
new (&content->refcnt) zmq::atomic_counter_t ();
return 0;
}
int zmq_msg_close (zmq_msg *msg_)
int zmq_msg_close (zmq_msg_t *msg_)
{
// For VSMs and delimiters there are no resources to free
if (msg_->content == (zmq_msg_content*) ZMQ_DELIMITER ||
msg_->content == (zmq_msg_content*) ZMQ_VSM ||
msg_->content == (zmq_msg_content*) ZMQ_GAP)
// For VSMs and delimiters there are no resources to free.
if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER ||
msg_->content == (zmq::msg_content_t*) ZMQ_VSM ||
msg_->content == (zmq::msg_content_t*) ZMQ_GAP)
return 0;
// If the content is not shared, or if it is shared and the reference
// If the content is not shared, or if it is shared and the reference.
// count has dropped to zero, deallocate it.
if (!msg_->shared || !msg_->content->refcnt.sub (1)) {
zmq::msg_content_t *content = (zmq::msg_content_t*) msg_->content;
if (!msg_->shared || !content->refcnt.sub (1)) {
// We used "placement new" operator to initialize the reference
// We used "placement new" operator to initialize the reference.
// counter so we call its destructor now.
msg_->content->refcnt.~atomic_counter_t ();
content->refcnt.~atomic_counter_t ();
if (msg_->content->ffn)
msg_->content->ffn (msg_->content->data);
free (msg_->content);
if (content->ffn)
content->ffn (content->data);
free (content);
}
return 0;
}
int zmq_msg_move (zmq_msg *dest_, zmq_msg *src_)
int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
{
zmq_msg_close (dest_);
*dest_ = *src_;
......@@ -103,23 +106,24 @@ int zmq_msg_move (zmq_msg *dest_, zmq_msg *src_)
return 0;
}
int zmq_msg_copy (zmq_msg *dest_, zmq_msg *src_)
int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
{
zmq_msg_close (dest_);
// VSMs and delimiters require no special handling.
if (src_->content !=
(zmq_msg_content*) ZMQ_DELIMITER &&
src_->content != (zmq_msg_content*) ZMQ_VSM &&
src_->content != (zmq_msg_content*) ZMQ_GAP) {
(zmq::msg_content_t*) ZMQ_DELIMITER &&
src_->content != (zmq::msg_content_t*) ZMQ_VSM &&
src_->content != (zmq::msg_content_t*) ZMQ_GAP) {
// One reference is added to shared messages. Non-shared messages
// are turned into shared messages and reference count is set to 2.
zmq::msg_content_t *content = (zmq::msg_content_t*) src_->content;
if (src_->shared)
src_->content->refcnt.add (1);
content->refcnt.add (1);
else {
src_->shared = true;
src_->content->refcnt.set (2);
content->refcnt.set (2);
}
}
......@@ -127,32 +131,34 @@ int zmq_msg_copy (zmq_msg *dest_, zmq_msg *src_)
return 0;
}
void *zmq_msg_data (zmq_msg *msg_)
void *zmq_msg_data (zmq_msg_t *msg_)
{
if (msg_->content == (zmq_msg_content*) ZMQ_VSM)
if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
return msg_->vsm_data;
if (msg_->content ==
(zmq_msg_content*) ZMQ_DELIMITER ||
msg_->content == (zmq_msg_content*) ZMQ_GAP)
(zmq::msg_content_t*) ZMQ_DELIMITER ||
msg_->content == (zmq::msg_content_t*) ZMQ_GAP)
return NULL;
return msg_->content->data;
return ((zmq::msg_content_t*) msg_->content)->data;
}
size_t zmq_msg_size (zmq_msg *msg_)
size_t zmq_msg_size (zmq_msg_t *msg_)
{
if (msg_->content == (zmq_msg_content*) ZMQ_VSM)
if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
return msg_->vsm_size;
if (msg_->content ==
(zmq_msg_content*) ZMQ_DELIMITER ||
msg_->content == (zmq_msg_content*) ZMQ_GAP)
(zmq::msg_content_t*) ZMQ_DELIMITER ||
msg_->content == (zmq::msg_content_t*) ZMQ_GAP)
return 0;
return msg_->content->size;
return ((zmq::msg_content_t*) msg_->content)->size;
}
int zmq_msg_type (zmq_msg *msg_)
int zmq_msg_type (zmq_msg_t *msg_)
{
// If it's a genuine message, return 0.
if (msg_->content >= (zmq_msg_content*) ZMQ_VSM)
if (msg_->content >= (zmq::msg_content_t*) ZMQ_VSM)
return 0;
// Trick the compiler to believe that content is an integer.
......@@ -192,7 +198,8 @@ int zmq_close (void *s_)
return 0;
}
int zmq_setsockopt (void *s_, int option_, void *optval_, size_t optvallen_)
int zmq_setsockopt (void *s_, int option_, const void *optval_,
size_t optvallen_)
{
return (((zmq::socket_base_t*) s_)->setsockopt (option_, optval_,
optvallen_));
......@@ -208,7 +215,7 @@ int zmq_connect (void *s_, const char *addr_)
return (((zmq::socket_base_t*) s_)->connect (addr_));
}
int zmq_send (void *s_, zmq_msg *msg_, int flags_)
int zmq_send (void *s_, zmq_msg_t *msg_, int flags_)
{
return (((zmq::socket_base_t*) s_)->send (msg_, flags_));
}
......@@ -218,7 +225,7 @@ int zmq_flush (void *s_)
return (((zmq::socket_base_t*) s_)->flush ());
}
int zmq_recv (void *s_, zmq_msg *msg_, int flags_)
int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
{
return (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
}
......@@ -18,16 +18,18 @@
*/
#include "zmq_connecter.hpp"
#include "zmq_init.hpp"
#include "zmq_connecter_init.hpp"
#include "io_thread.hpp"
#include "err.hpp"
zmq::zmq_connecter_t::zmq_connecter_t (io_thread_t *parent_,
socket_base_t *owner_, const options_t &options_) :
socket_base_t *owner_, const options_t &options_,
const char *session_name_) :
owned_t (parent_, owner_),
io_object_t (parent_),
handle_valid (false),
options (options_)
options (options_),
session_name (session_name_)
{
}
......@@ -76,7 +78,8 @@ void zmq::zmq_connecter_t::out_event ()
// Create an init object.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_init_t *init = new zmq_init_t (io_thread, owner, fd, true, options);
zmq_connecter_init_t *init = new zmq_connecter_init_t (io_thread, owner,
fd, options, session_name.c_str ());
zmq_assert (init);
send_plug (init);
send_own (owner, init);
......
......@@ -20,6 +20,8 @@
#ifndef __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__
#define __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__
#include <string>
#include "owned.hpp"
#include "io_object.hpp"
#include "tcp_connecter.hpp"
......@@ -34,15 +36,14 @@ namespace zmq
public:
zmq_connecter_t (class io_thread_t *parent_, socket_base_t *owner_,
const options_t &options_);
const options_t &options_, const char *session_name_);
~zmq_connecter_t ();
// Set IP address to connect to.
int set_address (const char *addr_);
private:
~zmq_connecter_t ();
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
......@@ -68,6 +69,9 @@ namespace zmq
// Associated socket options.
options_t options;
// Name of the session associated with the connecter.
std::string session_name;
zmq_connecter_t (const zmq_connecter_t&);
void operator = (const zmq_connecter_t&);
};
......
......@@ -17,96 +17,78 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "zmq_init.hpp"
#include "zmq_connecter_init.hpp"
#include "io_thread.hpp"
#include "session.hpp"
#include "err.hpp"
zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_,
fd_t fd_, bool connected_, const options_t &options_) :
zmq::zmq_connecter_init_t::zmq_connecter_init_t (io_thread_t *parent_,
socket_base_t *owner_, fd_t fd_, const options_t &options_,
const char *session_name_) :
owned_t (parent_, owner_),
connected (connected_),
options (options_)
options (options_),
session_name (session_name_)
{
// Create associated engine object.
engine = new zmq_engine_t (parent_, fd_);
zmq_assert (engine);
}
zmq::zmq_init_t::~zmq_init_t ()
zmq::zmq_connecter_init_t::~zmq_connecter_init_t ()
{
if (engine)
delete engine;
}
bool zmq::zmq_init_t::read (::zmq_msg *msg_)
bool zmq::zmq_connecter_init_t::read (::zmq_msg_t *msg_)
{
// On the listening side, no initialisation data are sent to the peer.
if (!connected)
return false;
// Send identity.
int rc = zmq_msg_init_size (msg_, options.identity.size ());
zmq_assert (rc == 0);
memcpy (zmq_msg_data (msg_), options.identity.c_str (),
options.identity.size ());
// Initialisation is done.
create_session ();
// Initialisation is done at this point. Disconnect the engine from
// the init object.
engine->unplug ();
// Find the session associated with this connecter. If it doesn't exist
// drop the newly created connection. If it does, attach it to the
// connection.
session_t *session = owner->find_session (session_name.c_str ());
if (!session) {
// TODO
zmq_assert (false);
}
send_attach (session, engine);
engine = NULL;
// Destroy the init object.
term ();
return true;
}
bool zmq::zmq_init_t::write (::zmq_msg *msg_)
bool zmq::zmq_connecter_init_t::write (::zmq_msg_t *msg_)
{
// On the connecting side no initialisation data are expected.
if (connected)
return false;
// Retreieve the identity.
options.identity = std::string ((const char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
// Initialisation is done.
create_session ();
return true;
return false;
}
void zmq::zmq_init_t::flush ()
void zmq::zmq_connecter_init_t::flush ()
{
// No need to do anything. zmq_init_t does no batching of messages.
// Each message is processed immediately on write.
// We are not expecting any messages. No point in flushing.
zmq_assert (false);
}
void zmq::zmq_init_t::process_plug ()
void zmq::zmq_connecter_init_t::process_plug ()
{
zmq_assert (engine);
engine->plug (this);
owned_t::process_plug ();
}
void zmq::zmq_init_t::process_unplug ()
void zmq::zmq_connecter_init_t::process_unplug ()
{
if (engine)
engine->unplug ();
}
void zmq::zmq_init_t::create_session ()
{
// Disconnect engine from the init object.
engine->unplug ();
// Create the session instance.
io_thread_t *io_thread = choose_io_thread (options.affinity);
session_t *session = new session_t (io_thread, owner, engine);
zmq_assert (session);
engine = NULL;
// Pass session/engine pair to a chosen I/O thread.
send_plug (session);
send_own (owner, session);
// Destroy the init object.
term ();
}
......@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__
#define __ZMQ_ZMQ_INIT_HPP_INCLUDED__
#ifndef __ZMQ_ZMQ_CONNECTER_INIT_HPP_INCLUDED__
#define __ZMQ_ZMQ_CONNECTER_INIT_HPP_INCLUDED__
#include <string>
......@@ -33,48 +33,41 @@ namespace zmq
{
// The class handles initialisation phase of native 0MQ wire-level
// protocol. Currently it can be used to handle both sides of the
// connection. If it grows to complex, we can separate the two into
// distinct classes.
// protocol on the connecting side of the connection.
class zmq_init_t : public owned_t, public i_inout
class zmq_connecter_init_t : public owned_t, public i_inout
{
public:
// Set 'connected' to true if the connection was created by 'connect'
// function. If it was accepted from a listening socket, set it to
// false.
zmq_init_t (class io_thread_t *parent_, socket_base_t *owner_, fd_t fd_,
bool connected_, const options_t &options);
~zmq_init_t ();
zmq_connecter_init_t (class io_thread_t *parent_, socket_base_t *owner_,
fd_t fd_, const options_t &options, const char *session_name_);
~zmq_connecter_init_t ();
private:
// i_inout interface implementation.
bool read (::zmq_msg *msg_);
bool write (::zmq_msg *msg_);
bool read (::zmq_msg_t *msg_);
bool write (::zmq_msg_t *msg_);
void flush ();
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
void create_session ();
// Engine is created by zmq_init_t object. Once the initialisation
// phase is over it is passed to a session object, possibly running
// in a different I/O thread.
// Engine is created by zmq_connecter_init_t object. Once the
// initialisation phase is over it is passed to a session object,
// possibly running in a different I/O thread.
zmq_engine_t *engine;
// If true, we are on the connecting side. If false, we are on the
// listening side.
bool connected;
// Associated socket options.
options_t options;
zmq_init_t (const zmq_init_t&);
void operator = (const zmq_init_t&);
// Name of the session to bind new connection to. Makes sense only
// when 'connected' is true.
std::string session_name;
zmq_connecter_init_t (const zmq_connecter_init_t&);
void operator = (const zmq_connecter_init_t&);
};
}
......
......@@ -45,7 +45,7 @@ namespace zmq
struct i_inout *destination;
unsigned char tmpbuf [8];
::zmq_msg in_progress;
::zmq_msg_t in_progress;
zmq_decoder_t (const zmq_decoder_t&);
void operator = (const zmq_decoder_t&);
......
......@@ -43,7 +43,7 @@ namespace zmq
bool message_ready ();
struct i_inout *source;
::zmq_msg in_progress;
::zmq_msg_t in_progress;
unsigned char tmpbuf [9];
zmq_encoder_t (const zmq_encoder_t&);
......
......@@ -18,7 +18,7 @@
*/
#include "zmq_listener.hpp"
#include "zmq_init.hpp"
#include "zmq_listener_init.hpp"
#include "io_thread.hpp"
#include "err.hpp"
......@@ -68,7 +68,8 @@ void zmq::zmq_listener_t::in_event ()
// Create an init object.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_init_t *init = new zmq_init_t (io_thread, owner, fd, false, options);
zmq_listener_init_t *init = new zmq_listener_init_t (io_thread, owner,
fd, options);
zmq_assert (init);
send_plug (init);
send_own (owner, init);
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string>
#include "zmq_listener_init.hpp"
#include "io_thread.hpp"
#include "session.hpp"
#include "err.hpp"
zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_,
socket_base_t *owner_, fd_t fd_, const options_t &options_) :
owned_t (parent_, owner_),
options (options_)
{
// Create associated engine object.
engine = new zmq_engine_t (parent_, fd_);
zmq_assert (engine);
}
zmq::zmq_listener_init_t::~zmq_listener_init_t ()
{
if (engine)
delete engine;
}
bool zmq::zmq_listener_init_t::read (::zmq_msg_t *msg_)
{
return false;
}
bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_)
{
// Retreieve the remote identity. We'll use it as a local session name.
std::string session_name = std::string ((const char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
// Initialisation is done. Disconnect the engine from the init object.
engine->unplug ();
// Have a look whether the session already exists. If it does, attach it
// to the engine. If it doesn't create it first.
session_t *session = owner->find_session (session_name.c_str ());
if (!session) {
io_thread_t *io_thread = choose_io_thread (options.affinity);
session = new session_t (io_thread, owner, session_name.c_str ());
zmq_assert (session);
send_plug (session);
send_own (owner, session);
// Reserve a sequence number for following 'attach' command.
session->inc_seqnum ();
}
send_attach (session, engine);
engine = NULL;
// Destroy the init object.
term ();
return true;
}
void zmq::zmq_listener_init_t::flush ()
{
// No need to do anything. zmq_listener_init_t does no batching
// of messages. Each message is processed immediately on write.
}
void zmq::zmq_listener_init_t::process_plug ()
{
zmq_assert (engine);
engine->plug (this);
owned_t::process_plug ();
}
void zmq::zmq_listener_init_t::process_unplug ()
{
if (engine)
engine->unplug ();
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_ZMQ_LISTENER_INIT_HPP_INCLUDED__
#define __ZMQ_ZMQ_LISTENER_INIT_HPP_INCLUDED__
#include <string>
#include "i_inout.hpp"
#include "owned.hpp"
#include "zmq_engine.hpp"
#include "stdint.hpp"
#include "fd.hpp"
#include "options.hpp"
namespace zmq
{
// The class handles initialisation phase of native 0MQ wire-level
// protocol on the listening side of the connection.
class zmq_listener_init_t : public owned_t, public i_inout
{
public:
zmq_listener_init_t (class io_thread_t *parent_, socket_base_t *owner_,
fd_t fd_, const options_t &options);
~zmq_listener_init_t ();
private:
// i_inout interface implementation.
bool read (::zmq_msg_t *msg_);
bool write (::zmq_msg_t *msg_);
void flush ();
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
// Engine is created by zmq_listener_init_t object. Once the
// initialisation phase is over it is passed to a session object,
// possibly running in a different I/O thread.
zmq_engine_t *engine;
// Associated socket options.
options_t options;
zmq_listener_init_t (const zmq_listener_init_t&);
void operator = (const zmq_listener_init_t&);
};
}
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment