Commit 2a41c8d7 authored by Martin Hurton's avatar Martin Hurton

Simplify initial handshaking

parent 3f6148ab
...@@ -59,7 +59,6 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons ...@@ -59,7 +59,6 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
encoder (NULL), encoder (NULL),
handshaking (true), handshaking (true),
greeting_bytes_read (0), greeting_bytes_read (0),
greeting_size (0),
session (NULL), session (NULL),
options (options_), options (options_),
endpoint (endpoint_), endpoint (endpoint_),
...@@ -132,13 +131,6 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, ...@@ -132,13 +131,6 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
io_object_t::plug (io_thread_); io_object_t::plug (io_thread_);
handle = add_fd (s); handle = add_fd (s);
// We need to detect whether our peer is using the versioned
// protocol. The detection is done in two steps. First, we read
// first two bytes and check if the long format of length is in use.
// If so, we receive and check the 'flags' field. If the rightmost bit
// is 1, the peer is using versioned protocol.
greeting_size = 2;
// Send the 'length' and 'flags' fields of the identity message. // Send the 'length' and 'flags' fields of the identity message.
// The 'length' field is encoded in the long format. // The 'length' field is encoded in the long format.
outpos = greeting_output_buffer; outpos = greeting_output_buffer;
...@@ -311,90 +303,52 @@ void zmq::stream_engine_t::activate_in () ...@@ -311,90 +303,52 @@ void zmq::stream_engine_t::activate_in ()
in_event (); in_event ();
} }
int zmq::stream_engine_t::receive_greeting () bool zmq::stream_engine_t::handshake ()
{ {
zmq_assert (handshaking);
zmq_assert (greeting_bytes_read < greeting_size); zmq_assert (greeting_bytes_read < greeting_size);
// Receive the greeting.
while (greeting_bytes_read < greeting_size) { while (greeting_bytes_read < greeting_size) {
const int n = read (greeting + greeting_bytes_read, const int n = read (greeting + greeting_bytes_read,
greeting_size - greeting_bytes_read); greeting_size - greeting_bytes_read);
if (n == -1) if (n == -1) {
return -1; error ();
return false;
}
if (n == 0) if (n == 0)
return 0; return false;
greeting_bytes_read += n; greeting_bytes_read += n;
if (greeting_bytes_read < greeting_size) // We have received at least one byte from the peer.
continue;
if (greeting_size == 2) {
// We have received the first two bytes from the peer.
// If the first byte is not 0xff, we know that the // If the first byte is not 0xff, we know that the
// peer is using unversioned protocol. // peer is using unversioned protocol.
if (greeting [0] != 0xff) if (greeting [0] != 0xff)
break; break;
// This may still be a long identity message (either if (greeting_bytes_read < 10)
// 254 or 255 bytes long). We need to receive 8 more continue;
// bytes so we can inspect the potential 'flags' field.
greeting_size = 10; // Inspect the right-most bit of the 10th byte (which coincides
}
else
if (greeting_size == 10) {
// Inspect the rightmost bit of the 10th byte (which coincides
// with the 'flags' field if a regular message was sent). // with the 'flags' field if a regular message was sent).
// Zero indicates this is a header of identity message // Zero indicates this is a header of identity message
// (i.e. the peer is using the unversioned protocol). // (i.e. the peer is using the unversioned protocol).
if (!(greeting [9] & 0x01)) if (!(greeting [9] & 0x01))
break; break;
// This is truly a handshake and we can now send the rest of // The peer is using versioned protocol.
// the greeting message out. // Send the rest of the greeting, if necessary.
if (outpos + outsize != greeting_output_buffer + greeting_size) {
if (outsize == 0) if (outsize == 0)
set_pollout (handle); set_pollout (handle);
zmq_assert (outpos != NULL);
outpos [outsize++] = 1; // Protocol version outpos [outsize++] = 1; // Protocol version
outpos [outsize++] = 1; // Remaining length (1 byte for v1)
outpos [outsize++] = options.type; // Socket type outpos [outsize++] = options.type; // Socket type
// Read the 'version' and 'remaining_length' fields.
greeting_size = 12;
}
else
if (greeting_size == 12) {
// We have received the greeting message up to
// the 'remaining_length' field. Receive the remaining
// bytes of the greeting.
greeting_size += greeting [11];
} }
} }
return 0; // Position of the version field in the greeting.
}
bool zmq::stream_engine_t::handshake ()
{
zmq_assert (handshaking);
zmq_assert (greeting_bytes_read < greeting_size);
int rc = receive_greeting ();
if (rc == -1) {
error ();
return false;
}
if (greeting_bytes_read < greeting_size)
return false;
// We have received either a header of identity message
// or the whole greeting.
zmq_assert (greeting [0] != 0xff || greeting_bytes_read >= 10);
// POsition of the version field in the greeting.
const size_t version_pos = 10; const size_t version_pos = 10;
// Is the peer using the unversioned protocol? // Is the peer using the unversioned protocol?
......
...@@ -91,15 +91,9 @@ namespace zmq ...@@ -91,15 +91,9 @@ namespace zmq
// Underlying socket. // Underlying socket.
fd_t s; fd_t s;
// Maximum size of a greeting message: // Size of the greeting message:
// preamble (10 bytes) + version (1 byte) + remaining_length (1 byte) + // Preamble (10 bytes) + version (1 byte) + socket type (1 byte).
// up to 255 remaining bytes. const static size_t greeting_size = 12;
const static size_t maximum_greeting_size = 10 + 1 + 1 + 255;
// Size of v1 greeting message:
// preamble (10 bytes) + version (1 byte) + remaining_length (1 byte) +
// socket_type (1)
const static size_t v1_greeting_size = 10 + 1 + 1 + 1;
handle_t handle; handle_t handle;
...@@ -119,18 +113,15 @@ namespace zmq ...@@ -119,18 +113,15 @@ namespace zmq
// The receive buffer holding the greeting message // The receive buffer holding the greeting message
// that we are receiving from the peer. // that we are receiving from the peer.
unsigned char greeting [maximum_greeting_size]; unsigned char greeting [greeting_size];
// The number of bytes of the greeting message that // The number of bytes of the greeting message that
// we have already received. // we have already received.
unsigned int greeting_bytes_read; unsigned int greeting_bytes_read;
// The size of the greeting message.
unsigned int greeting_size;
// The send buffer holding the greeting message // The send buffer holding the greeting message
// that we are sending to the peer. // that we are sending to the peer.
unsigned char greeting_output_buffer [v1_greeting_size]; unsigned char greeting_output_buffer [greeting_size];
// The session this engine is attached to. // The session this engine is attached to.
zmq::session_base_t *session; zmq::session_base_t *session;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment