Commit 2e78e485 authored by Martin Sustrik's avatar Martin Sustrik

Multi-hop REQ/REP, part V., peer identity is passed from init object to session

parent d8430f4b
...@@ -17,8 +17,18 @@ ...@@ -17,8 +17,18 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <stdlib.h>
#include "command.hpp" #include "command.hpp"
void zmq::deallocate_command (command_t *cmd_) void zmq::deallocate_command (command_t *cmd_)
{ {
switch (cmd_->type) {
case command_t::attach:
if (cmd_->args.attach.peer_identity)
free (cmd_->args.attach.peer_identity);
break;
default:
/* noop */;
}
} }
...@@ -66,6 +66,8 @@ namespace zmq ...@@ -66,6 +66,8 @@ namespace zmq
// Attach the engine to the session. // Attach the engine to the session.
struct { struct {
struct i_engine *engine; struct i_engine *engine;
unsigned char peer_identity_size;
unsigned char *peer_identity;
} attach; } attach;
// Sent from session to socket to establish pipe(s) between them. // Sent from session to socket to establish pipe(s) between them.
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <string.h>
#include "object.hpp" #include "object.hpp"
#include "dispatcher.hpp" #include "dispatcher.hpp"
#include "err.hpp" #include "err.hpp"
...@@ -80,7 +82,9 @@ void zmq::object_t::process_command (command_t &cmd_) ...@@ -80,7 +82,9 @@ void zmq::object_t::process_command (command_t &cmd_)
break; break;
case command_t::attach: case command_t::attach:
process_attach (cmd_.args.attach.engine); process_attach (cmd_.args.attach.engine,
cmd_.args.attach.peer_identity_size,
cmd_.args.attach.peer_identity);
process_seqnum (); process_seqnum ();
break; break;
...@@ -180,6 +184,7 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_) ...@@ -180,6 +184,7 @@ void zmq::object_t::send_own (socket_base_t *destination_, owned_t *object_)
} }
void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
unsigned char peer_identity_size_, unsigned char *peer_identity_,
bool inc_seqnum_) bool inc_seqnum_)
{ {
if (inc_seqnum_) if (inc_seqnum_)
...@@ -189,6 +194,18 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, ...@@ -189,6 +194,18 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
cmd.destination = destination_; cmd.destination = destination_;
cmd.type = command_t::attach; cmd.type = command_t::attach;
cmd.args.attach.engine = engine_; cmd.args.attach.engine = engine_;
if (!peer_identity_size_) {
cmd.args.attach.peer_identity_size = 0;
cmd.args.attach.peer_identity = NULL;
}
else {
cmd.args.attach.peer_identity_size = peer_identity_size_;
cmd.args.attach.peer_identity =
(unsigned char*) malloc (peer_identity_size_);
zmq_assert (cmd.args.attach.peer_identity_size);
memcpy (cmd.args.attach.peer_identity, peer_identity_,
peer_identity_size_);
}
send_command (cmd); send_command (cmd);
} }
...@@ -271,7 +288,8 @@ void zmq::object_t::process_own (owned_t *object_) ...@@ -271,7 +288,8 @@ void zmq::object_t::process_own (owned_t *object_)
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_attach (i_engine *engine_) void zmq::object_t::process_attach (i_engine *engine_,
unsigned char peer_identity_size_, unsigned char *peer_identity_)
{ {
zmq_assert (false); zmq_assert (false);
} }
......
...@@ -64,7 +64,8 @@ namespace zmq ...@@ -64,7 +64,8 @@ namespace zmq
void send_own (class socket_base_t *destination_, void send_own (class socket_base_t *destination_,
class owned_t *object_); class owned_t *object_);
void send_attach (class session_t *destination_, void send_attach (class session_t *destination_,
struct i_engine *engine_, bool inc_seqnum_ = true); struct i_engine *engine_, unsigned char peer_identity_size_,
unsigned char *peer_identity_, bool inc_seqnum_ = true);
void send_bind (class socket_base_t *destination_, void send_bind (class socket_base_t *destination_,
class reader_t *in_pipe_, class writer_t *out_pipe_, class reader_t *in_pipe_, class writer_t *out_pipe_,
bool inc_seqnum_ = true); bool inc_seqnum_ = true);
...@@ -81,7 +82,8 @@ namespace zmq ...@@ -81,7 +82,8 @@ namespace zmq
virtual void process_stop (); virtual void process_stop ();
virtual void process_plug (); virtual void process_plug ();
virtual void process_own (class owned_t *object_); virtual void process_own (class owned_t *object_);
virtual void process_attach (struct i_engine *engine_); virtual void process_attach (struct i_engine *engine_,
unsigned char peer_identity_size_, unsigned char *peer_identity_);
virtual void process_bind (class reader_t *in_pipe_, virtual void process_bind (class reader_t *in_pipe_,
class writer_t *out_pipe_); class writer_t *out_pipe_);
virtual void process_revive (); virtual void process_revive ();
......
...@@ -232,7 +232,8 @@ void zmq::session_t::process_unplug () ...@@ -232,7 +232,8 @@ void zmq::session_t::process_unplug ()
} }
} }
void zmq::session_t::process_attach (i_engine *engine_) void zmq::session_t::process_attach (i_engine *engine_,
unsigned char peer_identity_size_, unsigned char *peer_identity_)
{ {
zmq_assert (!engine); zmq_assert (!engine);
zmq_assert (engine_); zmq_assert (engine_);
......
...@@ -66,7 +66,8 @@ namespace zmq ...@@ -66,7 +66,8 @@ namespace zmq
// Handlers for incoming commands. // Handlers for incoming commands.
void process_plug (); void process_plug ();
void process_unplug (); void process_unplug ();
void process_attach (struct i_engine *engine_); void process_attach (struct i_engine *engine_,
unsigned char peer_identity_size_, unsigned char *peer_identity_);
// Inbound pipe, i.e. one the session is getting messages from. // Inbound pipe, i.e. one the session is getting messages from.
class reader_t *in_pipe; class reader_t *in_pipe;
......
...@@ -192,7 +192,8 @@ void zmq::zmq_init_t::finalise () ...@@ -192,7 +192,8 @@ void zmq::zmq_init_t::finalise ()
} }
// No need to increment seqnum as it was laready incremented above. // No need to increment seqnum as it was laready incremented above.
send_attach (session, engine, false); send_attach (session, engine, (unsigned char) peer_identity.size (),
(unsigned char*) peer_identity.data (), false);
// Destroy the init object. // Destroy the init object.
engine = NULL; engine = NULL;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment