Commit 36a57637 authored by Martin Sustrik's avatar Martin Sustrik

Multi-hop REQ/REP, part I., tracerouting switched on on XREP socket

parent 7b4cf2a4
......@@ -20,6 +20,8 @@
#ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__
#define __ZMQ_I_ENGINE_HPP_INCLUDED__
#include <stddef.h>
namespace zmq
{
......@@ -36,6 +38,12 @@ namespace zmq
// This method is called by the session to signalise that there
// are messages to send available.
virtual void revive () = 0;
// Start tracing the message route. Engine should add the identity
// supplied to all inbound messages and trim identity from all the
// outbound messages.
virtual void traceroute (unsigned char *identity_,
size_t identity_size_) = 0;
};
}
......
......@@ -88,6 +88,13 @@ void zmq::pgm_receiver_t::revive ()
zmq_assert (false);
}
void zmq::pgm_receiver_t::traceroute (unsigned char *identity_,
size_t identity_size_)
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
}
void zmq::pgm_receiver_t::in_event ()
{
// Read data from the underlying pgm_socket.
......@@ -151,7 +158,7 @@ void zmq::pgm_receiver_t::in_event ()
it->second.joined = true;
// Create and connect decoder for the peer.
it->second.decoder = new (std::nothrow) zmq_decoder_t (0, NULL, 0);
it->second.decoder = new (std::nothrow) zmq_decoder_t (0);
it->second.decoder->set_inout (inout);
}
......
......@@ -54,6 +54,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
void traceroute (unsigned char *identity_, size_t identity_size_);
// i_poll_events interface implementation.
void in_event ();
......
......@@ -36,7 +36,7 @@
zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
const options_t &options_) :
io_object_t (parent_),
encoder (0, false),
encoder (0),
pgm_socket (false, options_),
options (options_),
out_buffer (NULL),
......@@ -102,6 +102,13 @@ void zmq::pgm_sender_t::revive ()
out_event ();
}
void zmq::pgm_sender_t::traceroute (unsigned char *identity_,
size_t identity_size_)
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
}
zmq::pgm_sender_t::~pgm_sender_t ()
{
if (out_buffer) {
......
......@@ -52,6 +52,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
void traceroute (unsigned char *identity_, size_t identity_size_);
// i_poll_events interface implementation.
void in_event ();
......
......@@ -25,24 +25,14 @@
#include "wire.hpp"
#include "err.hpp"
zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_,
void *prefix_, size_t prefix_size_) :
zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) :
decoder_t <zmq_decoder_t> (bufsize_),
destination (NULL)
destination (NULL),
prefix (NULL),
prefix_size (0)
{
zmq_msg_init (&in_progress);
if (!prefix_) {
prefix = NULL;
prefix_size = 0;
}
else {
prefix = malloc (prefix_size_);
zmq_assert (prefix);
memcpy (prefix, prefix_, prefix_size_);
prefix_size = prefix_size_;
}
// At the beginning, read one byte and go to one_byte_size_ready state.
next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready);
}
......@@ -60,6 +50,15 @@ void zmq::zmq_decoder_t::set_inout (i_inout *destination_)
destination = destination_;
}
void zmq::zmq_decoder_t::add_prefix (unsigned char *prefix_,
size_t prefix_size_)
{
prefix = malloc (prefix_size_);
zmq_assert (prefix);
memcpy (prefix, prefix_, prefix_size_);
prefix_size = prefix_size_;
}
bool zmq::zmq_decoder_t::one_byte_size_ready ()
{
// First byte of size is read. If it is 0xff read 8-byte size.
......
......@@ -34,11 +34,15 @@ namespace zmq
// If prefix is not NULL, it will be glued to the beginning of every
// decoded message.
zmq_decoder_t (size_t bufsize_, void *prefix_, size_t prefix_size_);
zmq_decoder_t (size_t bufsize_);
~zmq_decoder_t ();
void set_inout (struct i_inout *destination_);
// Once called, all decoded messages will be prefixed by the specified
// prefix.
void add_prefix (unsigned char *prefix_, size_t prefix_size_);
private:
bool one_byte_size_ready ();
......
......@@ -21,10 +21,10 @@
#include "i_inout.hpp"
#include "wire.hpp"
zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_, bool trim_prefix_) :
zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_) :
encoder_t <zmq_encoder_t> (bufsize_),
source (NULL),
trim_prefix (trim_prefix_)
trim (false)
{
zmq_msg_init (&in_progress);
......@@ -42,10 +42,15 @@ void zmq::zmq_encoder_t::set_inout (i_inout *source_)
source = source_;
}
void zmq::zmq_encoder_t::trim_prefix ()
{
trim = true;
}
bool zmq::zmq_encoder_t::size_ready ()
{
// Write message body into the buffer.
if (!trim_prefix) {
if (!trim) {
next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
&zmq_encoder_t::message_ready, false);
}
......@@ -75,7 +80,7 @@ bool zmq::zmq_encoder_t::message_ready ()
// Get the message size. If the prefix is not to be sent, adjust the
// size accordingly.
size_t size = zmq_msg_size (&in_progress);
if (trim_prefix)
if (trim)
size -= *(unsigned char*) zmq_msg_data (&in_progress);
// For messages less than 255 bytes long, write one byte of message size.
......
......@@ -32,11 +32,15 @@ namespace zmq
{
public:
zmq_encoder_t (size_t bufsize_, bool trim_prefix_);
zmq_encoder_t (size_t bufsize_);
~zmq_encoder_t ();
void set_inout (struct i_inout *source_);
// Once called, encoder will start trimming frefixes from outbound
// messages.
void trim_prefix ();
private:
bool size_ready ();
......@@ -46,7 +50,7 @@ namespace zmq
::zmq_msg_t in_progress;
unsigned char tmpbuf [9];
bool trim_prefix;
bool trim;
zmq_encoder_t (const zmq_encoder_t&);
void operator = (const zmq_encoder_t&);
......
......@@ -32,10 +32,10 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
io_object_t (parent_),
inpos (NULL),
insize (0),
decoder (in_batch_size, NULL, 0),
decoder (in_batch_size),
outpos (NULL),
outsize (0),
encoder (out_batch_size, false),
encoder (out_batch_size),
inout (NULL),
options (options_),
reconnect (reconnect_)
......@@ -160,6 +160,13 @@ void zmq::zmq_engine_t::revive ()
out_event ();
}
void zmq::zmq_engine_t::traceroute (unsigned char *identity_,
size_t identity_size_)
{
encoder.trim_prefix ();
decoder.add_prefix (identity_, identity_size_);
}
void zmq::zmq_engine_t::error ()
{
zmq_assert (inout);
......
......@@ -47,6 +47,7 @@ namespace zmq
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
void traceroute (unsigned char *identity_, size_t identity_size_);
// i_poll_events interface implementation.
void in_event ();
......
......@@ -76,6 +76,12 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
zmq_msg_size (msg_));
received = true;
// Once the initial handshaking is over, XREP sockets should start
// tracerouting individual messages.
if (options.type == ZMQ_XREP)
engine->traceroute ((unsigned char*) peer_identity.data (),
peer_identity.size ());
return true;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment