Commit a1bb5e83 authored by Martin Hurton's avatar Martin Hurton

Simplify ZMQ_STREAM socket implementation

parent d8f13760
......@@ -34,7 +34,6 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
next_peer_id (generate_random ())
{
options.type = ZMQ_STREAM;
options.recv_identity = true;
options.raw_sock = true;
prefetched_id.init ();
......@@ -43,7 +42,6 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
zmq::stream_t::~stream_t ()
{
zmq_assert (anonymous_pipes.empty ());;
zmq_assert (outpipes.empty ());
prefetched_id.close ();
prefetched_msg.close ();
......@@ -56,40 +54,23 @@ void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
zmq_assert (pipe_);
bool identity_ok = identify_peer (pipe_);
if (identity_ok)
fq.attach (pipe_);
else
anonymous_pipes.insert (pipe_);
identify_peer (pipe_);
fq.attach (pipe_);
}
void zmq::stream_t::xpipe_terminated (pipe_t *pipe_)
{
std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
if (it != anonymous_pipes.end ())
anonymous_pipes.erase (it);
else {
outpipes_t::iterator it = outpipes.find (pipe_->get_identity ());
zmq_assert (it != outpipes.end ());
outpipes.erase (it);
fq.pipe_terminated (pipe_);
if (pipe_ == current_out)
current_out = NULL;
}
outpipes_t::iterator it = outpipes.find (pipe_->get_identity ());
zmq_assert (it != outpipes.end ());
outpipes.erase (it);
fq.pipe_terminated (pipe_);
if (pipe_ == current_out)
current_out = NULL;
}
void zmq::stream_t::xread_activated (pipe_t *pipe_)
{
std::set <pipe_t*>::iterator it = anonymous_pipes.find (pipe_);
if (it == anonymous_pipes.end ())
fq.activated (pipe_);
else {
bool identity_ok = identify_peer (pipe_);
if (identity_ok) {
anonymous_pipes.erase (it);
fq.attach (pipe_);
}
}
fq.activated (pipe_);
}
void zmq::stream_t::xwrite_activated (pipe_t *pipe_)
......@@ -147,7 +128,7 @@ int zmq::stream_t::xsend (msg_t *msg_)
return 0;
}
// Ignore the MORE flag
// Ignore the MORE flag
msg_->reset_flags (msg_t::more);
// Check whether this is the last part of the message.
......@@ -159,7 +140,7 @@ int zmq::stream_t::xsend (msg_t *msg_)
// Close the remote connection if user has asked to do so
// by sending zero length message.
// Pending messages in the pipe will be dropped (on receiving term- ack)
if (msg_->size() == 0) {
if (msg_->size () == 0) {
current_out->terminate (false);
int rc = msg_->close ();
errno_assert (rc == 0);
......@@ -206,14 +187,6 @@ int zmq::stream_t::xrecv (msg_t *msg_)
pipe_t *pipe = NULL;
int rc = fq.recvpipe (msg_, &pipe);
// It's possible that we receive peer's identity. That happens
// after reconnection. The current implementation assumes that
// the peer always uses the same identity.
// TODO: handle the situation when the peer changes its identity.
while (rc == 0 && msg_->is_identity ())
rc = fq.recvpipe (msg_, &pipe);
if (rc != 0)
return -1;
......@@ -241,16 +214,6 @@ int zmq::stream_t::xrecv (msg_t *msg_)
return 0;
}
int zmq::stream_t::rollback (void)
{
if (current_out) {
current_out->rollback ();
current_out = NULL;
more_out = false;
}
return 0;
}
bool zmq::stream_t::xhas_in ()
{
// If we are in the middle of reading the messages, there are
......@@ -266,13 +229,6 @@ bool zmq::stream_t::xhas_in ()
// The message, if read, is kept in the pre-fetch buffer.
pipe_t *pipe = NULL;
int rc = fq.recvpipe (&prefetched_msg, &pipe);
// It's possible that we receive peer's identity. That happens
// after reconnection. The current implementation assumes that
// the peer always uses the same identity.
while (rc == 0 && prefetched_msg.is_identity ())
rc = fq.recvpipe (&prefetched_msg, &pipe);
if (rc != 0)
return false;
......@@ -292,34 +248,29 @@ bool zmq::stream_t::xhas_in ()
bool zmq::stream_t::xhas_out ()
{
// In theory, ROUTER socket is always ready for writing. Whether actual
// In theory, STREAM socket is always ready for writing. Whether actual
// attempt to write succeeds depends on which pipe the message is going
// to be routed to.
return true;
}
bool zmq::stream_t::identify_peer (pipe_t *pipe_)
void zmq::stream_t::identify_peer (pipe_t *pipe_)
{
blob_t identity;
bool ok;
// Always assign identity for raw-socket
unsigned char buffer [5];
buffer [0] = 0;
put_uint32 (buffer + 1, next_peer_id++);
identity = blob_t (buffer, sizeof buffer);
unsigned int i = 0; // Store identity to allow use of raw socket as client
for (blob_t::iterator it = identity.begin(); it != identity.end(); it++)
options.identity[i++] = *it;
options.identity_size = i;
blob_t identity = blob_t (buffer, sizeof buffer);
memcpy (options.identity, identity.data (), identity.size ());
options.identity_size = identity.size ();
pipe_->set_identity (identity);
// Add the record into output pipes lookup table
outpipe_t outpipe = {pipe_, true};
ok = outpipes.insert (outpipes_t::value_type (identity, outpipe)).second;
const bool ok = outpipes.insert (
outpipes_t::value_type (identity, outpipe)).second;
zmq_assert (ok);
return true;
}
zmq::stream_session_t::stream_session_t (io_thread_t *io_thread_, bool connect_,
......
......@@ -30,7 +30,7 @@ namespace zmq
class ctx_t;
class pipe_t;
class stream_t :
class stream_t :
public socket_base_t
{
public:
......@@ -48,14 +48,9 @@ namespace zmq
void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_);
protected:
// Rollback any message parts that were sent but not yet flushed.
int rollback ();
private:
// Receive peer id and update lookup map
bool identify_peer (pipe_t *pipe_);
// Generate peer's id and update lookup map
void identify_peer (pipe_t *pipe_);
// Fair queueing object for inbound pipes.
fq_t fq;
......@@ -82,9 +77,6 @@ namespace zmq
bool active;
};
// We keep a set of pipes that have not been identified yet.
std::set <pipe_t*> anonymous_pipes;
// Outbound pipes indexed by the peer IDs.
typedef std::map <blob_t, outpipe_t> outpipes_t;
outpipes_t outpipes;
......@@ -99,10 +91,6 @@ namespace zmq
// algorithm. This value is the next ID to use (if not used already).
uint32_t next_peer_id;
// If true, report EAGAIN to the caller instead of silently dropping
// the message targeting an unknown peer.
bool mandatory;
stream_t (const stream_t&);
const stream_t &operator = (const stream_t&);
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment