Commit f5ce81f2 authored by Martin Sustrik's avatar Martin Sustrik

Multi-hop REQ/REP, part VIII., new blob_t type used for holding identity

parent cdc2efe9
......@@ -57,6 +57,7 @@ libzmq_la_SOURCES = app_thread.hpp \
atomic_bitmap.hpp \
atomic_counter.hpp \
atomic_ptr.hpp \
blob.hpp \
command.hpp \
config.hpp \
decoder.hpp \
......
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_BLOB_HPP_INCLUDED__
#define __ZMQ_BLOB_HPP_INCLUDED__
#include <string>
namespace zmq
{
// Object to hold dynamically allocated opaque binary data.
typedef std::basic_string <unsigned char> blob_t;
}
#endif
......@@ -22,6 +22,8 @@
#include <stddef.h>
#include "blob.hpp"
namespace zmq
{
......@@ -42,8 +44,7 @@ namespace zmq
// Start tracing the message route. Engine should add the identity
// supplied to all inbound messages and trim identity from all the
// outbound messages.
virtual void traceroute (unsigned char *identity_,
size_t identity_size_) = 0;
virtual void traceroute (const blob_t &identity_) = 0;
};
}
......
......@@ -83,8 +83,8 @@ void zmq::object_t::process_command (command_t &cmd_)
case command_t::attach:
process_attach (cmd_.args.attach.engine,
cmd_.args.attach.peer_identity_size,
cmd_.args.attach.peer_identity);
blob_t (cmd_.args.attach.peer_identity,
cmd_.args.attach.peer_identity_size));
process_seqnum ();
break;
......@@ -184,8 +184,7 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
}
void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
unsigned char peer_identity_size_, unsigned char *peer_identity_,
bool inc_seqnum_)
const blob_t &peer_identity_, bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
......@@ -194,17 +193,17 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
cmd.destination = destination_;
cmd.type = command_t::attach;
cmd.args.attach.engine = engine_;
if (!peer_identity_size_) {
if (peer_identity_.empty ()) {
cmd.args.attach.peer_identity_size = 0;
cmd.args.attach.peer_identity = NULL;
}
else {
cmd.args.attach.peer_identity_size = peer_identity_size_;
cmd.args.attach.peer_identity_size = peer_identity_.size ();
cmd.args.attach.peer_identity =
(unsigned char*) malloc (peer_identity_size_);
(unsigned char*) malloc (peer_identity_.size ());
zmq_assert (cmd.args.attach.peer_identity_size);
memcpy (cmd.args.attach.peer_identity, peer_identity_,
peer_identity_size_);
memcpy (cmd.args.attach.peer_identity, peer_identity_.data (),
peer_identity_.size ());
}
send_command (cmd);
}
......@@ -289,7 +288,7 @@ void zmq::object_t::process_own (owned_t *object_)
}
void zmq::object_t::process_attach (i_engine *engine_,
unsigned char peer_identity_size_, unsigned char *peer_identity_)
const blob_t &peer_identity_)
{
zmq_assert (false);
}
......
......@@ -21,6 +21,7 @@
#define __ZMQ_OBJECT_HPP_INCLUDED__
#include "stdint.hpp"
#include "blob.hpp"
namespace zmq
{
......@@ -64,8 +65,8 @@ namespace zmq
void send_own (class socket_base_t *destination_,
class owned_t *object_);
void send_attach (class session_t *destination_,
struct i_engine *engine_, unsigned char peer_identity_size_,
unsigned char *peer_identity_, bool inc_seqnum_ = true);
struct i_engine *engine_, const blob_t &peer_identity_,
bool inc_seqnum_ = true);
void send_bind (class socket_base_t *destination_,
class reader_t *in_pipe_, class writer_t *out_pipe_,
bool inc_seqnum_ = true);
......@@ -83,7 +84,7 @@ namespace zmq
virtual void process_plug ();
virtual void process_own (class owned_t *object_);
virtual void process_attach (struct i_engine *engine_,
unsigned char peer_identity_size_, unsigned char *peer_identity_);
const blob_t &peer_identity_);
virtual void process_bind (class reader_t *in_pipe_,
class writer_t *out_pipe_);
virtual void process_revive ();
......
......@@ -88,8 +88,7 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert (false);
}
void zmq::pgm_receiver_t::traceroute (unsigned char *identity_,
size_t identity_size_)
void zmq::pgm_receiver_t::traceroute (const blob_t &identity_)
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
......
......@@ -54,7 +54,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
void traceroute (unsigned char *identity_, size_t identity_size_);
void traceroute (const blob_t &identity_);
// i_poll_events interface implementation.
void in_event ();
......
......@@ -102,8 +102,7 @@ void zmq::pgm_sender_t::revive ()
out_event ();
}
void zmq::pgm_sender_t::traceroute (unsigned char *identity_,
size_t identity_size_)
void zmq::pgm_sender_t::traceroute (const blob_t &identity_)
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
......
......@@ -52,7 +52,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
void traceroute (unsigned char *identity_, size_t identity_size_);
void traceroute (const blob_t &identity_);
// i_poll_events interface implementation.
void in_event ();
......
......@@ -40,24 +40,18 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
}
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
const options_t &options_, unsigned char peer_identity_size_,
unsigned char *peer_identity_) :
const options_t &options_, const blob_t &peer_identity_) :
owned_t (parent_, owner_),
in_pipe (NULL),
active (true),
out_pipe (NULL),
engine (NULL),
ordinal (0),
peer_identity (peer_identity_),
options (options_)
{
if (!peer_identity_size_)
// If peer identity is not supplied, leave it empty.
if (peer_identity_size_) {
peer_identity.assign ((char*) peer_identity_, peer_identity_size_);
if (!owner->register_session (peer_identity_size_, peer_identity_,
this)) {
if (!peer_identity.empty ()) {
if (!owner->register_session (peer_identity, this)) {
// TODO: There's already a session with the specified
// identity. We should presumably syslog it and drop the
......@@ -180,8 +174,7 @@ void zmq::session_t::process_unplug ()
if (ordinal)
owner->unregister_session (ordinal);
else if (!peer_identity.empty ())
owner->unregister_session ((unsigned char) peer_identity.size (),
(unsigned char*) peer_identity.data ());
owner->unregister_session (peer_identity);
// Ask associated pipes to terminate.
if (in_pipe) {
......@@ -201,26 +194,23 @@ void zmq::session_t::process_unplug ()
}
void zmq::session_t::process_attach (i_engine *engine_,
unsigned char peer_identity_size_, unsigned char *peer_identity_)
const blob_t &peer_identity_)
{
if (!peer_identity.empty ()) {
// If we already know the peer name do nothing, just check whether
// it haven't changed.
zmq_assert (peer_identity.size () == peer_identity_size_);
zmq_assert (memcmp (peer_identity.data (), peer_identity_,
peer_identity_size_) == 0);
zmq_assert (peer_identity == peer_identity_);
}
else if (peer_identity_size_) {
else if (!peer_identity_.empty ()) {
// Remember the peer identity.
peer_identity.assign ((char*) peer_identity_, peer_identity_size_);
// Store the peer identity.
peer_identity = peer_identity_;
// If the session is not registered with the ordinal, let's register
// it using the peer name.
if (!ordinal) {
if (!owner->register_session (peer_identity_size_, peer_identity_,
this)) {
if (!owner->register_session (peer_identity, this)) {
// TODO: There's already a session with the specified
// identity. We should presumably syslog it and drop the
......
......@@ -20,12 +20,11 @@
#ifndef __ZMQ_SESSION_HPP_INCLUDED__
#define __ZMQ_SESSION_HPP_INCLUDED__
#include <string>
#include "i_inout.hpp"
#include "i_endpoint.hpp"
#include "owned.hpp"
#include "options.hpp"
#include "blob.hpp"
namespace zmq
{
......@@ -38,11 +37,9 @@ namespace zmq
session_t (object_t *parent_, socket_base_t *owner_,
const options_t &options_);
// Creates named session. If name is NULL, transient session with
// auto-generated name is created.
// Creates named session.
session_t (object_t *parent_, socket_base_t *owner_,
const options_t &options_, unsigned char peer_identity_size_,
unsigned char *peer_identity_);
const options_t &options_, const blob_t &peer_identity_);
// i_inout interface implementation.
bool read (::zmq_msg_t *msg_);
......@@ -68,7 +65,7 @@ namespace zmq
void process_plug ();
void process_unplug ();
void process_attach (struct i_engine *engine_,
unsigned char peer_identity_size_, unsigned char *peer_identity_);
const blob_t &peer_identity_);
// Inbound pipe, i.e. one the session is getting messages from.
class reader_t *in_pipe;
......@@ -87,7 +84,7 @@ namespace zmq
uint64_t ordinal;
// Identity of the peer.
std::string peer_identity;
blob_t peer_identity;
// Inherited socket options.
options_t options;
......
......@@ -267,7 +267,7 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1;
}
send_attach (session, pgm_sender, 0, NULL);
send_attach (session, pgm_sender, blob_t ());
}
else if (options.requires_in) {
......@@ -282,7 +282,7 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1;
}
send_attach (session, pgm_receiver, 0, NULL);
send_attach (session, pgm_receiver, blob_t ());
}
else
zmq_assert (false);
......@@ -454,33 +454,29 @@ bool zmq::socket_base_t::has_out ()
return xhas_out ();
}
bool zmq::socket_base_t::register_session (unsigned char peer_identity_size_,
unsigned char *peer_identity_, session_t *session_)
bool zmq::socket_base_t::register_session (const blob_t &peer_identity_,
session_t *session_)
{
sessions_sync.lock ();
bool registered = named_sessions.insert (std::make_pair (std::string (
(char*) peer_identity_, peer_identity_size_), session_)).second;
bool registered = named_sessions.insert (
std::make_pair (peer_identity_, session_)).second;
sessions_sync.unlock ();
return registered;
}
void zmq::socket_base_t::unregister_session (unsigned char peer_identity_size_,
unsigned char *peer_identity_)
void zmq::socket_base_t::unregister_session (const blob_t &peer_identity_)
{
sessions_sync.lock ();
named_sessions_t::iterator it = named_sessions.find (std::string (
(char*) peer_identity_, peer_identity_size_));
named_sessions_t::iterator it = named_sessions.find (peer_identity_);
zmq_assert (it != named_sessions.end ());
named_sessions.erase (it);
sessions_sync.unlock ();
}
zmq::session_t *zmq::socket_base_t::find_session (
unsigned char peer_identity_size_, unsigned char *peer_identity_)
zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_)
{
sessions_sync.lock ();
named_sessions_t::iterator it = named_sessions.find (std::string (
(char*) peer_identity_, peer_identity_size_));
named_sessions_t::iterator it = named_sessions.find (peer_identity_);
if (it == named_sessions.end ()) {
sessions_sync.unlock ();
return NULL;
......
......@@ -23,7 +23,6 @@
#include <set>
#include <map>
#include <vector>
#include <string>
#include "../bindings/c/zmq.h"
......@@ -35,6 +34,7 @@
#include "stdint.hpp"
#include "atomic_counter.hpp"
#include "stdint.hpp"
#include "blob.hpp"
namespace zmq
{
......@@ -78,12 +78,10 @@ namespace zmq
// There are two distinct types of sessions: those identified by name
// and those identified by ordinal number. Thus two sets of session
// management functions.
bool register_session (unsigned char peer_identity_size_,
unsigned char *peer_identity_, class session_t *session_);
void unregister_session (unsigned char peer_identity_size_,
unsigned char *peer_identity_);
class session_t *find_session (unsigned char peer_identity_size_,
unsigned char *peer_identity_);
bool register_session (const blob_t &peer_identity_,
class session_t *session_);
void unregister_session (const blob_t &peer_identity_);
class session_t *find_session (const blob_t &peer_identity_);
uint64_t register_session (class session_t *session_);
void unregister_session (uint64_t ordinal_);
class session_t *find_session (uint64_t ordinal_);
......@@ -158,7 +156,7 @@ namespace zmq
// within the socket, instead they are used by I/O objects owned by
// the socket. As those objects can live in different threads,
// the access is synchronised by mutex.
typedef std::map <std::string, session_t*> named_sessions_t;
typedef std::map <blob_t, session_t*> named_sessions_t;
named_sessions_t named_sessions;
typedef std::map <uint64_t, session_t*> unnamed_sessions_t;
unnamed_sessions_t unnamed_sessions;
......
......@@ -27,9 +27,7 @@
zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) :
decoder_t <zmq_decoder_t> (bufsize_),
destination (NULL),
prefix (NULL),
prefix_size (0)
destination (NULL)
{
zmq_msg_init (&in_progress);
......@@ -39,9 +37,6 @@ zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) :
zmq::zmq_decoder_t::~zmq_decoder_t ()
{
if (prefix)
free (prefix);
zmq_msg_close (&in_progress);
}
......@@ -50,13 +45,9 @@ void zmq::zmq_decoder_t::set_inout (i_inout *destination_)
destination = destination_;
}
void zmq::zmq_decoder_t::add_prefix (unsigned char *prefix_,
size_t prefix_size_)
void zmq::zmq_decoder_t::add_prefix (const blob_t &prefix_)
{
prefix = malloc (prefix_size_);
zmq_assert (prefix);
memcpy (prefix, prefix_, prefix_size_);
prefix_size = prefix_size_;
prefix = prefix_;
}
bool zmq::zmq_decoder_t::one_byte_size_ready ()
......@@ -72,15 +63,16 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
int rc = zmq_msg_init_size (&in_progress, prefix_size + *tmpbuf);
int rc = zmq_msg_init_size (&in_progress, prefix.size () + *tmpbuf);
errno_assert (rc == 0);
// Fill in the message prefix if any.
if (prefix)
memcpy (zmq_msg_data (&in_progress), prefix, prefix_size);
if (!prefix.empty ())
memcpy (zmq_msg_data (&in_progress), prefix.data (),
prefix.size ());
next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size,
*tmpbuf, &zmq_decoder_t::message_ready);
next_step ((unsigned char*) zmq_msg_data (&in_progress) +
prefix.size (), *tmpbuf, &zmq_decoder_t::message_ready);
}
return true;
}
......@@ -95,14 +87,14 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
int rc = zmq_msg_init_size (&in_progress, prefix_size + size);
int rc = zmq_msg_init_size (&in_progress, prefix.size () + size);
errno_assert (rc == 0);
// Fill in the message prefix if any.
if (prefix)
memcpy (zmq_msg_data (&in_progress), prefix, prefix_size);
if (!prefix.empty ())
memcpy (zmq_msg_data (&in_progress), prefix.data (), prefix.size ());
next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix_size ,
next_step ((unsigned char*) zmq_msg_data (&in_progress) + prefix.size (),
size, &zmq_decoder_t::message_ready);
return true;
}
......
......@@ -23,6 +23,7 @@
#include "../bindings/c/zmq.h"
#include "decoder.hpp"
#include "blob.hpp"
namespace zmq
{
......@@ -41,7 +42,7 @@ namespace zmq
// Once called, all decoded messages will be prefixed by the specified
// prefix.
void add_prefix (unsigned char *prefix_, size_t prefix_size_);
void add_prefix (const blob_t &prefix_);
private:
......@@ -53,8 +54,7 @@ namespace zmq
unsigned char tmpbuf [8];
::zmq_msg_t in_progress;
void *prefix;
size_t prefix_size;
blob_t prefix;
zmq_decoder_t (const zmq_decoder_t&);
void operator = (const zmq_decoder_t&);
......
......@@ -160,11 +160,10 @@ void zmq::zmq_engine_t::revive ()
out_event ();
}
void zmq::zmq_engine_t::traceroute (unsigned char *identity_,
size_t identity_size_)
void zmq::zmq_engine_t::traceroute (const blob_t &identity_)
{
encoder.trim_prefix ();
decoder.add_prefix (identity_, identity_size_);
decoder.add_prefix (identity_);
}
void zmq::zmq_engine_t::error ()
......
......@@ -47,7 +47,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
void traceroute (unsigned char *identity_, size_t identity_size_);
void traceroute (const blob_t &identity_);
// i_poll_events interface implementation.
void in_event ();
......
......@@ -72,15 +72,14 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
return false;
// Retreieve the remote identity.
peer_identity.assign ((const char*) zmq_msg_data (msg_),
peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
received = true;
// Once the initial handshaking is over, XREP sockets should start
// tracerouting individual messages.
if (options.traceroute)
engine->traceroute ((unsigned char*) peer_identity.data (),
peer_identity.size ());
engine->traceroute (peer_identity);
return true;
}
......@@ -164,14 +163,11 @@ void zmq::zmq_init_t::finalise ()
// If the peer has a unique name, find the associated session. If it
// doesn't exist, create it.
else if (!peer_identity.empty ()) {
session = owner->find_session (
(unsigned char) peer_identity.size (),
(unsigned char*) peer_identity.data ());
session = owner->find_session (peer_identity);
if (!session) {
session = new (std::nothrow) session_t (
choose_io_thread (options.affinity), owner, options,
(unsigned char) peer_identity.size (),
(unsigned char*) peer_identity.c_str ());
peer_identity);
zmq_assert (session);
send_plug (session);
send_own (owner, session);
......@@ -185,7 +181,7 @@ void zmq::zmq_init_t::finalise ()
// transient session.
else {
session = new (std::nothrow) session_t (
choose_io_thread (options.affinity), owner, options, 0, NULL);
choose_io_thread (options.affinity), owner, options, blob_t ());
zmq_assert (session);
send_plug (session);
send_own (owner, session);
......@@ -195,8 +191,7 @@ void zmq::zmq_init_t::finalise ()
}
// No need to increment seqnum as it was already incremented above.
send_attach (session, engine, (unsigned char) peer_identity.size (),
(unsigned char*) peer_identity.data (), false);
send_attach (session, engine, peer_identity, false);
// Destroy the init object.
engine = NULL;
......
......@@ -20,8 +20,6 @@
#ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__
#define __ZMQ_ZMQ_INIT_HPP_INCLUDED__
#include <string>
#include "i_inout.hpp"
#include "i_engine.hpp"
#include "owned.hpp"
......@@ -29,6 +27,7 @@
#include "stdint.hpp"
#include "options.hpp"
#include "stdint.hpp"
#include "blob.hpp"
namespace zmq
{
......@@ -72,7 +71,7 @@ namespace zmq
bool received;
// Identity of the peer socket.
std::string peer_identity;
blob_t peer_identity;
// TCP connecter creates session before the name of the peer is known.
// Thus we know only its ordinal number.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment