Commit dfdaff5e authored by Martin Sustrik's avatar Martin Sustrik

XREP-style prefixing/trimming messages removed

parent cbaf1097
...@@ -22,8 +22,6 @@ ...@@ -22,8 +22,6 @@
#include <stddef.h> #include <stddef.h>
#include "blob.hpp"
namespace zmq namespace zmq
{ {
...@@ -41,13 +39,9 @@ namespace zmq ...@@ -41,13 +39,9 @@ namespace zmq
// are messages to send available. // are messages to send available.
virtual void revive () = 0; virtual void revive () = 0;
// This method is called by the session to signalise that more
// messages can be written to the pipe.
virtual void resume_input () = 0; virtual void resume_input () = 0;
// Engine should add the prefix supplied to all inbound messages.
virtual void add_prefix (const blob_t &identity_) = 0;
// Engine should trim prefix from all the outbound messages.
virtual void trim_prefix () = 0;
}; };
} }
......
...@@ -34,8 +34,7 @@ zmq::options_t::options_t () : ...@@ -34,8 +34,7 @@ zmq::options_t::options_t () :
rcvbuf (0), rcvbuf (0),
requires_in (false), requires_in (false),
requires_out (false), requires_out (false),
immediate_connect (true), immediate_connect (true)
traceroute (false)
{ {
} }
......
...@@ -61,9 +61,6 @@ namespace zmq ...@@ -61,9 +61,6 @@ namespace zmq
// is not aware of the peer's identity, however, it is able to send // is not aware of the peer's identity, however, it is able to send
// messages straight away. // messages straight away.
bool immediate_connect; bool immediate_connect;
// If true, socket requires tracerouting the messages.
bool traceroute;
}; };
} }
......
...@@ -121,18 +121,6 @@ void zmq::pgm_receiver_t::resume_input () ...@@ -121,18 +121,6 @@ void zmq::pgm_receiver_t::resume_input ()
in_event (); in_event ();
} }
void zmq::pgm_receiver_t::add_prefix (const blob_t &identity_)
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
}
void zmq::pgm_receiver_t::trim_prefix ()
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
}
void zmq::pgm_receiver_t::in_event () void zmq::pgm_receiver_t::in_event ()
{ {
// Read data from the underlying pgm_socket. // Read data from the underlying pgm_socket.
......
...@@ -55,8 +55,6 @@ namespace zmq ...@@ -55,8 +55,6 @@ namespace zmq
void unplug (); void unplug ();
void revive (); void revive ();
void resume_input (); void resume_input ();
void add_prefix (const blob_t &identity_);
void trim_prefix ();
// i_poll_events interface implementation. // i_poll_events interface implementation.
void in_event (); void in_event ();
......
...@@ -107,18 +107,6 @@ void zmq::pgm_sender_t::resume_input () ...@@ -107,18 +107,6 @@ void zmq::pgm_sender_t::resume_input ()
zmq_assert (false); zmq_assert (false);
} }
void zmq::pgm_sender_t::add_prefix (const blob_t &identity_)
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
}
void zmq::pgm_sender_t::trim_prefix ()
{
// No need for tracerouting functionality in PGM socket at the moment.
zmq_assert (false);
}
zmq::pgm_sender_t::~pgm_sender_t () zmq::pgm_sender_t::~pgm_sender_t ()
{ {
if (out_buffer) { if (out_buffer) {
......
...@@ -53,8 +53,6 @@ namespace zmq ...@@ -53,8 +53,6 @@ namespace zmq
void unplug (); void unplug ();
void revive (); void revive ();
void resume_input (); void resume_input ();
void add_prefix (const blob_t &identity_);
void trim_prefix ();
// i_poll_events interface implementation. // i_poll_events interface implementation.
void in_event (); void in_event ();
......
...@@ -264,9 +264,4 @@ void zmq::session_t::process_attach (i_engine *engine_, ...@@ -264,9 +264,4 @@ void zmq::session_t::process_attach (i_engine *engine_,
zmq_assert (engine_); zmq_assert (engine_);
engine = engine_; engine = engine_;
engine->plug (this); engine->plug (this);
// Once the initial handshaking is over tracerouting should trim prefixes
// from outbound messages.
if (options.traceroute)
engine->trim_prefix ();
} }
...@@ -33,9 +33,8 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) : ...@@ -33,9 +33,8 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
// That way we are aware of the peer's identity when binding to the pipes. // That way we are aware of the peer's identity when binding to the pipes.
options.immediate_connect = false; options.immediate_connect = false;
// XREP socket adds identity to inbound messages and strips identity // XREP is unfunctional at the moment. Crash here!
// from the outbound messages. zmq_assert (false);
options.traceroute = true;
} }
zmq::xrep_t::~xrep_t () zmq::xrep_t::~xrep_t ()
......
...@@ -45,11 +45,6 @@ void zmq::zmq_decoder_t::set_inout (i_inout *destination_) ...@@ -45,11 +45,6 @@ void zmq::zmq_decoder_t::set_inout (i_inout *destination_)
destination = destination_; destination = destination_;
} }
void zmq::zmq_decoder_t::add_prefix (const blob_t &prefix_)
{
prefix = prefix_;
}
bool zmq::zmq_decoder_t::one_byte_size_ready () bool zmq::zmq_decoder_t::one_byte_size_ready ()
{ {
// First byte of size is read. If it is 0xff read 8-byte size. // First byte of size is read. If it is 0xff read 8-byte size.
...@@ -64,19 +59,8 @@ bool zmq::zmq_decoder_t::one_byte_size_ready () ...@@ -64,19 +59,8 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
// in_progress is initialised at this point so in theory we should // in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte // close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised... // message and thus we can treat it as uninitialised...
if (prefix.empty ()) { int rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1);
int rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1); errno_assert (rc == 0);
errno_assert (rc == 0);
}
else {
int rc = zmq_msg_init_size (&in_progress,
1 + prefix.size () + *tmpbuf - 1);
errno_assert (rc == 0);
unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress);
*data = (unsigned char) prefix.size ();
memcpy (data + 1, prefix.data (), *data);
}
next_step (tmpbuf, 1, &zmq_decoder_t::flags_ready); next_step (tmpbuf, 1, &zmq_decoder_t::flags_ready);
} }
return true; return true;
...@@ -93,18 +77,8 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () ...@@ -93,18 +77,8 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
// in_progress is initialised at this point so in theory we should // in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte // close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised... // message and thus we can treat it as uninitialised...
if (prefix.empty ()) { int rc = zmq_msg_init_size (&in_progress, size - 1);
int rc = zmq_msg_init_size (&in_progress, size - 1); errno_assert (rc == 0);
errno_assert (rc == 0);
}
else {
int rc = zmq_msg_init_size (&in_progress,
1 + prefix.size () + size - 1);
errno_assert (rc == 0);
unsigned char *data = (unsigned char*) zmq_msg_data (&in_progress);
*data = (unsigned char) prefix.size ();
memcpy (data + 1, prefix.data (), *data);
}
next_step (tmpbuf, 1, &zmq_decoder_t::flags_ready); next_step (tmpbuf, 1, &zmq_decoder_t::flags_ready);
return true; return true;
...@@ -115,17 +89,9 @@ bool zmq::zmq_decoder_t::flags_ready () ...@@ -115,17 +89,9 @@ bool zmq::zmq_decoder_t::flags_ready ()
// Store the flags from the wire into the message structure. // Store the flags from the wire into the message structure.
in_progress.flags = tmpbuf [0]; in_progress.flags = tmpbuf [0];
if (prefix.empty ()) { next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), &zmq_decoder_t::message_ready);
&zmq_decoder_t::message_ready);
}
else {
next_step ((unsigned char*) zmq_msg_data (&in_progress) +
prefix.size () + 1,
zmq_msg_size (&in_progress) - prefix.size () - 1,
&zmq_decoder_t::message_ready);
}
return true; return true;
} }
......
...@@ -33,17 +33,11 @@ namespace zmq ...@@ -33,17 +33,11 @@ namespace zmq
{ {
public: public:
// If prefix is not NULL, it will be glued to the beginning of every
// decoded message.
zmq_decoder_t (size_t bufsize_); zmq_decoder_t (size_t bufsize_);
~zmq_decoder_t (); ~zmq_decoder_t ();
void set_inout (struct i_inout *destination_); void set_inout (struct i_inout *destination_);
// Once called, all decoded messages will be prefixed by the specified
// prefix.
void add_prefix (const blob_t &prefix_);
private: private:
bool one_byte_size_ready (); bool one_byte_size_ready ();
...@@ -55,8 +49,6 @@ namespace zmq ...@@ -55,8 +49,6 @@ namespace zmq
unsigned char tmpbuf [8]; unsigned char tmpbuf [8];
::zmq_msg_t in_progress; ::zmq_msg_t in_progress;
blob_t prefix;
zmq_decoder_t (const zmq_decoder_t&); zmq_decoder_t (const zmq_decoder_t&);
void operator = (const zmq_decoder_t&); void operator = (const zmq_decoder_t&);
}; };
......
...@@ -23,8 +23,7 @@ ...@@ -23,8 +23,7 @@
zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_) : zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_) :
encoder_t <zmq_encoder_t> (bufsize_), encoder_t <zmq_encoder_t> (bufsize_),
source (NULL), source (NULL)
trim (false)
{ {
zmq_msg_init (&in_progress); zmq_msg_init (&in_progress);
...@@ -42,25 +41,11 @@ void zmq::zmq_encoder_t::set_inout (i_inout *source_) ...@@ -42,25 +41,11 @@ void zmq::zmq_encoder_t::set_inout (i_inout *source_)
source = source_; source = source_;
} }
void zmq::zmq_encoder_t::trim_prefix ()
{
trim = true;
}
bool zmq::zmq_encoder_t::size_ready () bool zmq::zmq_encoder_t::size_ready ()
{ {
// Write message body into the buffer. // Write message body into the buffer.
if (!trim) { next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), &zmq_encoder_t::message_ready, false);
&zmq_encoder_t::message_ready, false);
}
else {
size_t prefix_size = *(unsigned char*) zmq_msg_data (&in_progress);
next_step (
(unsigned char*) zmq_msg_data (&in_progress) + prefix_size + 1,
zmq_msg_size (&in_progress) - prefix_size - 1,
&zmq_encoder_t::message_ready, false);
}
return true; return true;
} }
...@@ -78,16 +63,8 @@ bool zmq::zmq_encoder_t::message_ready () ...@@ -78,16 +63,8 @@ bool zmq::zmq_encoder_t::message_ready ()
return false; return false;
} }
// Get the message size. If the prefix is not to be sent, adjust the // Get the message size.
// size accordingly.
size_t size = zmq_msg_size (&in_progress); size_t size = zmq_msg_size (&in_progress);
if (trim) {
zmq_assert (size);
size_t prefix_size =
(*(unsigned char*) zmq_msg_data (&in_progress)) + 1;
zmq_assert (prefix_size <= size);
size -= prefix_size;
}
// Account for the 'flags' byte. // Account for the 'flags' byte.
size++; size++;
......
...@@ -37,10 +37,6 @@ namespace zmq ...@@ -37,10 +37,6 @@ namespace zmq
void set_inout (struct i_inout *source_); void set_inout (struct i_inout *source_);
// Once called, encoder will start trimming frefixes from outbound
// messages.
void trim_prefix ();
private: private:
bool size_ready (); bool size_ready ();
...@@ -50,8 +46,6 @@ namespace zmq ...@@ -50,8 +46,6 @@ namespace zmq
::zmq_msg_t in_progress; ::zmq_msg_t in_progress;
unsigned char tmpbuf [10]; unsigned char tmpbuf [10];
bool trim;
zmq_encoder_t (const zmq_encoder_t&); zmq_encoder_t (const zmq_encoder_t&);
void operator = (const zmq_encoder_t&); void operator = (const zmq_encoder_t&);
}; };
......
...@@ -169,16 +169,6 @@ void zmq::zmq_engine_t::resume_input () ...@@ -169,16 +169,6 @@ void zmq::zmq_engine_t::resume_input ()
in_event (); in_event ();
} }
void zmq::zmq_engine_t::add_prefix (const blob_t &identity_)
{
decoder.add_prefix (identity_);
}
void zmq::zmq_engine_t::trim_prefix ()
{
encoder.trim_prefix ();
}
void zmq::zmq_engine_t::error () void zmq::zmq_engine_t::error ()
{ {
zmq_assert (inout); zmq_assert (inout);
......
...@@ -48,8 +48,6 @@ namespace zmq ...@@ -48,8 +48,6 @@ namespace zmq
void unplug (); void unplug ();
void revive (); void revive ();
void resume_input (); void resume_input ();
void add_prefix (const blob_t &identity_);
void trim_prefix ();
// i_poll_events interface implementation. // i_poll_events interface implementation.
void in_event (); void in_event ();
......
...@@ -85,8 +85,7 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) ...@@ -85,8 +85,7 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_), peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
zmq_msg_size (msg_)); zmq_msg_size (msg_));
} }
if (options.traceroute)
engine->add_prefix (peer_identity);
received = true; received = true;
return true; return true;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment