Commit 4b52cf94 authored by Martin Sustrik's avatar Martin Sustrik

TCP and IPC connection initiation allow for multiple properties

So far the only property passed on connection initiation was
identity. The mechanism was now made extensible. Additional
properties are needed to introduce functionality such as
checking the peer's socket type, "subports" etc.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent 8203c4db
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
#include "session.hpp" #include "session.hpp"
#include "uuid.hpp" #include "uuid.hpp"
#include "blob.hpp" #include "blob.hpp"
#include "wire.hpp"
#include "err.hpp" #include "err.hpp"
zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_, zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
...@@ -36,7 +37,6 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_, ...@@ -36,7 +37,6 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
const options_t &options_) : const options_t &options_) :
own_t (io_thread_, options_), own_t (io_thread_, options_),
ephemeral_engine (NULL), ephemeral_engine (NULL),
sent (false),
received (false), received (false),
socket (socket_), socket (socket_),
session (session_), session (session_),
...@@ -45,26 +45,61 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_, ...@@ -45,26 +45,61 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
// Create the engine object for this connection. // Create the engine object for this connection.
engine = new (std::nothrow) zmq_engine_t (fd_, options); engine = new (std::nothrow) zmq_engine_t (fd_, options);
alloc_assert (engine); alloc_assert (engine);
// Generate an unique identity.
unsigned char identity [uuid_t::uuid_blob_len + 1];
identity [0] = 0;
memcpy (identity + 1, uuid_t ().to_blob (), uuid_t::uuid_blob_len);
peer_identity.assign (identity, uuid_t::uuid_blob_len + 1);
// Create a list of props to send.
zmq_msg_t msg;
int rc = zmq_msg_init_size (&msg, 4);
errno_assert (rc == 0);
unsigned char *data = (unsigned char*) zmq_msg_data (&msg);
put_uint16 (data, prop_type);
put_uint16 (data + 2, options.type);
msg.flags |= ZMQ_MSG_MORE;
to_send.push_back (msg);
if (!options.identity.empty ()) {
rc = zmq_msg_init_size (&msg, 2 + options.identity.size ());
errno_assert (rc == 0);
data = (unsigned char*) zmq_msg_data (&msg);
put_uint16 (data, prop_identity);
memcpy (data + 2, options.identity.data (), options.identity.size ());
msg.flags |= ZMQ_MSG_MORE;
to_send.push_back (msg);
}
// Remove the MORE flag from the last prop.
to_send.back ().flags &= ~ZMQ_MSG_MORE;
} }
zmq::zmq_init_t::~zmq_init_t () zmq::zmq_init_t::~zmq_init_t ()
{ {
if (engine) if (engine)
engine->terminate (); engine->terminate ();
// If there are unsent props still queued deallocate them.
for (to_send_t::iterator it = to_send.begin (); it != to_send.end ();
++it) {
int rc = zmq_msg_close (&(*it));
errno_assert (rc == 0);
}
to_send.clear ();
} }
bool zmq::zmq_init_t::read (::zmq_msg_t *msg_) bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
{ {
// If the identity was already sent, do nothing. // If the identity was already sent, do nothing.
if (sent) if (to_send.empty ())
return false; return false;
// Send the identity. // Pass next property to the engine.
int rc = zmq_msg_init_size (msg_, options.identity.size ()); *msg_ = to_send.front ();
zmq_assert (rc == 0); to_send.erase (to_send.begin ());
memcpy (zmq_msg_data (msg_), options.identity.c_str (),
options.identity.size ());
sent = true;
// Try finalize initialization. // Try finalize initialization.
finalise_initialisation (); finalise_initialisation ();
...@@ -79,24 +114,35 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) ...@@ -79,24 +114,35 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
if (received) if (received)
return false; return false;
// Retreieve the remote identity. If it's empty, generate a unique name. size_t size = zmq_msg_size (msg_);
if (!zmq_msg_size (msg_)) { unsigned char *data = (unsigned char*) zmq_msg_data (msg_);
unsigned char identity [uuid_t::uuid_blob_len + 1];
identity [0] = 0; // There should be at least property type in the message.
memcpy (identity + 1, uuid_t ().to_blob (), uuid_t::uuid_blob_len); zmq_assert (size >= 2);
peer_identity.assign (identity, uuid_t::uuid_blob_len + 1); uint16_t prop = get_uint16 (data);
switch (prop) {
case prop_type:
{
zmq_assert (size == 4);
// TODO: Check whether the type is OK.
// uint16_t type = get_uint16 (data + 2);
// ...
break;
};
case prop_identity:
{
peer_identity.assign (data + 2, size - 2);
break;
} }
else { default:
peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_), zmq_assert (false);
zmq_msg_size (msg_));
} }
int rc = zmq_msg_close (msg_);
zmq_assert (rc == 0);
if (!(msg_->flags & ZMQ_MSG_MORE)) {
received = true; received = true;
// Try finalize initialization.
finalise_initialisation (); finalise_initialisation ();
}
return true; return true;
} }
...@@ -142,7 +188,7 @@ void zmq::zmq_init_t::process_unplug () ...@@ -142,7 +188,7 @@ void zmq::zmq_init_t::process_unplug ()
void zmq::zmq_init_t::finalise_initialisation () void zmq::zmq_init_t::finalise_initialisation ()
{ {
// Unplug and prepare to dispatch engine. // Unplug and prepare to dispatch engine.
if (sent && received) { if (to_send.empty () && received) {
ephemeral_engine = engine; ephemeral_engine = engine;
engine = NULL; engine = NULL;
ephemeral_engine->unplug (); ephemeral_engine->unplug ();
...@@ -152,7 +198,7 @@ void zmq::zmq_init_t::finalise_initialisation () ...@@ -152,7 +198,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
void zmq::zmq_init_t::dispatch_engine () void zmq::zmq_init_t::dispatch_engine ()
{ {
if (sent && received) { if (to_send.empty () && received) {
// Engine must be detached. // Engine must be detached.
zmq_assert (!engine); zmq_assert (!engine);
......
...@@ -21,12 +21,15 @@ ...@@ -21,12 +21,15 @@
#ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__ #ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__
#define __ZMQ_ZMQ_INIT_HPP_INCLUDED__ #define __ZMQ_ZMQ_INIT_HPP_INCLUDED__
#include <vector>
#include "../include/zmq.h"
#include "i_inout.hpp" #include "i_inout.hpp"
#include "i_engine.hpp" #include "i_engine.hpp"
#include "own.hpp" #include "own.hpp"
#include "fd.hpp" #include "fd.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "stdint.hpp"
#include "blob.hpp" #include "blob.hpp"
namespace zmq namespace zmq
...@@ -44,6 +47,13 @@ namespace zmq ...@@ -44,6 +47,13 @@ namespace zmq
private: private:
// Peer property IDs.
enum prop_t
{
prop_type = 1,
prop_identity = 2
};
void finalise_initialisation (); void finalise_initialisation ();
void dispatch_engine (); void dispatch_engine ();
...@@ -63,8 +73,10 @@ namespace zmq ...@@ -63,8 +73,10 @@ namespace zmq
// Detached transient engine. // Detached transient engine.
i_engine *ephemeral_engine; i_engine *ephemeral_engine;
// True if our own identity was already sent to the peer. // List of messages to send to the peer during the connection
bool sent; // initiation phase.
typedef std::vector < ::zmq_msg_t> to_send_t;
to_send_t to_send;
// True if peer's identity was already received. // True if peer's identity was already received.
bool received; bool received;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment