Commit c8e8f2a2 authored by Martin Sustrik's avatar Martin Sustrik

ZMQ_IDENTITY socket option removed

This patch simplifies the whole codebase significantly,
including dropping depedency on libuuid.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent ba67eff1
......@@ -90,8 +90,6 @@ case "${host_os}" in
fi
AC_DEFINE(ZMQ_HAVE_LINUX, 1, [Have Linux OS])
AC_CHECK_LIB(rt, sem_init)
AC_CHECK_LIB(uuid, uuid_generate, ,
[AC_MSG_ERROR([cannot link with -luuid, install uuid-dev.])])
;;
*solaris*)
# Define on Solaris to enable all library features
......@@ -100,8 +98,6 @@ case "${host_os}" in
AC_CHECK_LIB(socket, socket)
AC_CHECK_LIB(nsl, gethostbyname)
AC_CHECK_LIB(rt, sem_init)
AC_CHECK_LIB(uuid, uuid_generate, ,
[AC_MSG_ERROR([cannot link with -luuid, install uuid-dev.])])
AC_MSG_CHECKING([whether atomic operations can be used])
AC_COMPILE_IFELSE([AC_LANG_PROGRAM(
[[#include <atomic.h>]],
......@@ -163,18 +159,15 @@ case "${host_os}" in
libzmq_pedantic="no"
AC_DEFINE(ZMQ_HAVE_QNXNTO, 1, [Have QNX Neutrino OS])
AC_CHECK_LIB(socket, socket)
AC_CHECK_LIB(crypto, RAND_bytes)
;;
*aix*)
AC_DEFINE(ZMQ_HAVE_AIX, 1, [Have AIX OS])
AC_CHECK_LIB(crypto,RAND_bytes)
;;
*hpux*)
# Define on HP-UX to enable all library features
CPPFLAGS="-D_POSIX_C_SOURCE=200112L $CPPFLAGS"
AC_DEFINE(ZMQ_HAVE_HPUX, 1, [Have HPUX OS])
AC_CHECK_LIB(rt, sem_init)
AC_CHECK_LIB(crypto, RAND_bytes)
;;
*mingw32*)
AC_DEFINE(ZMQ_HAVE_WINDOWS, 1, [Have Windows OS])
......@@ -201,12 +194,6 @@ case "${host_os}" in
# Define on Cygwin to enable all library features
CPPFLAGS="-D_GNU_SOURCE $CPPFLAGS"
AC_DEFINE(ZMQ_HAVE_CYGWIN, 1, [Have Cygwin])
# Cygwin provides libuuid as part of the e2fsprogs package, and somewhat
# uselessly installs the library in /usr/lib/e2fsprogs
LDFLAGS="-L/usr/lib/e2fsprogs ${LDFLAGS}"
AC_CHECK_LIB(uuid, uuid_generate, ,
[AC_MSG_ERROR([cannot link with -luuid, install the e2fsprogs package.])])
if test "x$enable_static" = "xyes"; then
AC_MSG_ERROR([Building static libraries is not supported under Cygwin])
fi
......
......@@ -161,7 +161,6 @@ ZMQ_EXPORT int zmq_term (void *context);
/* Socket options. */
#define ZMQ_AFFINITY 4
#define ZMQ_IDENTITY 5
#define ZMQ_SUBSCRIBE 6
#define ZMQ_UNSUBSCRIBE 7
#define ZMQ_RATE 8
......
......@@ -35,7 +35,6 @@ libzmq_la_SOURCES = \
msg.hpp \
mtrie.hpp \
mutex.hpp \
named_session.hpp \
object.hpp \
options.hpp \
own.hpp \
......@@ -68,7 +67,6 @@ libzmq_la_SOURCES = \
thread.hpp \
transient_session.hpp \
trie.hpp \
uuid.hpp \
windows.hpp \
wire.hpp \
xpub.hpp \
......@@ -79,10 +77,8 @@ libzmq_la_SOURCES = \
yqueue.hpp \
zmq_connecter.hpp \
zmq_engine.hpp \
zmq_init.hpp \
zmq_listener.hpp \
clock.cpp \
command.cpp \
ctx.cpp \
connect_session.cpp \
decoder.cpp \
......@@ -100,7 +96,6 @@ libzmq_la_SOURCES = \
mailbox.cpp \
msg.cpp \
mtrie.cpp \
named_session.cpp \
object.cpp \
options.cpp \
own.cpp \
......@@ -129,7 +124,6 @@ libzmq_la_SOURCES = \
thread.cpp \
transient_session.cpp \
trie.cpp \
uuid.cpp \
xpub.cpp \
xrep.cpp \
xreq.cpp \
......@@ -137,7 +131,6 @@ libzmq_la_SOURCES = \
zmq.cpp \
zmq_connecter.cpp \
zmq_engine.cpp \
zmq_init.cpp \
zmq_listener.cpp \
zmq_utils.cpp
......
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include "command.hpp"
void zmq::deallocate_command (command_t *cmd_)
{
switch (cmd_->type) {
case command_t::attach:
if (cmd_->args.attach.peer_identity)
free (cmd_->args.attach.peer_identity);
break;
case command_t::bind:
if (cmd_->args.bind.peer_identity)
free (cmd_->args.bind.peer_identity);
break;
default:
/* noop */;
}
}
......@@ -73,16 +73,12 @@ namespace zmq
// session that the connection have failed.
struct {
struct i_engine *engine;
unsigned char peer_identity_size;
unsigned char *peer_identity;
} attach;
// Sent from session to socket to establish pipe(s) between them.
// Caller have used inc_seqnum beforehand sending the command.
struct {
class pipe_t *pipe;
unsigned char peer_identity_size;
unsigned char *peer_identity;
} bind;
// Sent by pipe writer to inform dormant pipe reader that there
......@@ -146,9 +142,6 @@ namespace zmq
} args;
};
// Function to deallocate dynamically allocated components of the command.
void deallocate_command (command_t *cmd_);
}
#endif
......@@ -29,15 +29,12 @@ zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_,
const char *protocol_, const char *address_) :
session_t (io_thread_, socket_, options_),
protocol (protocol_),
address (address_),
connected (false)
address (address_)
{
}
zmq::connect_session_t::~connect_session_t ()
{
if (connected && !peer_identity.empty ())
unregister_session (peer_identity);
}
void zmq::connect_session_t::process_plug ()
......@@ -87,7 +84,7 @@ void zmq::connect_session_t::start_connecting (bool wait_)
int rc = pgm_sender->init (udp_encapsulation, address.c_str ());
zmq_assert (rc == 0);
send_attach (this, pgm_sender, blob_t ());
send_attach (this, pgm_sender);
}
else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) {
......@@ -99,7 +96,7 @@ void zmq::connect_session_t::start_connecting (bool wait_)
int rc = pgm_receiver->init (udp_encapsulation, address.c_str ());
zmq_assert (rc == 0);
send_attach (this, pgm_receiver, blob_t ());
send_attach (this, pgm_receiver);
}
else
zmq_assert (false);
......@@ -111,45 +108,8 @@ void zmq::connect_session_t::start_connecting (bool wait_)
zmq_assert (false);
}
bool zmq::connect_session_t::xattached (const blob_t &peer_identity_)
bool zmq::connect_session_t::xattached ()
{
// If there was no previous connection...
if (!connected) {
// Peer has transient identity.
if (peer_identity_.empty () || peer_identity_ [0] == 0) {
connected = true;
return true;
}
// Peer has strong identity. Let's register it and check whether noone
// else is using the same identity.
if (!register_session (peer_identity_, this)) {
log ("DPID: duplicate peer identity - disconnecting peer");
return false;
}
connected = true;
peer_identity = peer_identity_;
return true;
}
// New engine from listener can conflict with existing engine.
// Alternatively, new engine created by reconnection process can
// conflict with engine supplied by listener in the meantime.
if (has_engine ()) {
log ("DPID: duplicate peer identity - disconnecting peer");
return false;
}
// If there have been a connection before, we have to check whether
// peer's identity haven't changed in the meantime.
if ((peer_identity_.empty () || peer_identity_ [0] == 0) &&
peer_identity.empty ())
return true;
if (peer_identity != peer_identity_) {
log ("CHID: peer have changed identity - disconnecting peer");
return false;
}
return true;
}
......
......@@ -44,7 +44,7 @@ namespace zmq
private:
// Handlers for events from session base class.
bool xattached (const blob_t &peer_identity_);
bool xattached ();
bool xdetached ();
// Start the connection process.
......@@ -57,13 +57,6 @@ namespace zmq
std::string protocol;
std::string address;
// If true, the session was already connected to the peer.
bool connected;
// Identity of the peer. If 'connected' is false, it has no meaning.
// Otherwise, if it's empty, the peer has transient identity.
blob_t peer_identity;
connect_session_t (const connect_session_t&);
const connect_session_t &operator = (const connect_session_t&);
};
......
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "named_session.hpp"
#include "socket_base.hpp"
zmq::named_session_t::named_session_t (class io_thread_t *io_thread_,
socket_base_t *socket_, const options_t &options_,
const blob_t &peer_identity_) :
session_t (io_thread_, socket_, options_),
peer_identity (peer_identity_)
{
// Make double sure that the peer's identity is not transient.
zmq_assert (!peer_identity.empty ());
zmq_assert (peer_identity [0] != 0);
bool ok = socket_->register_session (peer_identity, this);
// If new session is being created, the caller should have already
// checked that the session for specified identity doesn't exist yet.
// Thus, register_session should not fail.
zmq_assert (ok);
}
zmq::named_session_t::~named_session_t ()
{
// Unregister the session from the global list of named sessions.
unregister_session (peer_identity);
}
bool zmq::named_session_t::xattached (const blob_t &peer_identity_)
{
// Double check that identities match.
zmq_assert (peer_identity == peer_identity_);
// If the session already has an engine attached, destroy new one.
if (has_engine ()) {
log ("DPID: duplicate peer identity - disconnecting peer");
return false;
}
return true;
}
bool zmq::named_session_t::xdetached ()
{
// Do nothing. Named sessions are never destroyed because of disconnection.
// Neither they have to actively reconnect.
return true;
}
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_NAMED_SESSION_HPP_INCLUDED__
#define __ZMQ_NAMED_SESSION_HPP_INCLUDED__
#include "session.hpp"
#include "blob.hpp"
namespace zmq
{
// Named session is created by listener object when the peer identifies
// itself by a strong name. Named session survives reconnections.
class named_session_t : public session_t
{
public:
named_session_t (class io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_,
const blob_t &peer_identity_);
~named_session_t ();
// Handlers for events from session base class.
bool xattached (const blob_t &peer_identity_);
bool xdetached ();
private:
blob_t peer_identity;
named_session_t (const named_session_t&);
const named_session_t &operator = (const named_session_t&);
};
}
#endif
......@@ -82,17 +82,12 @@ void zmq::object_t::process_command (command_t &cmd_)
break;
case command_t::attach:
process_attach (cmd_.args.attach.engine,
cmd_.args.attach.peer_identity ?
blob_t (cmd_.args.attach.peer_identity,
cmd_.args.attach.peer_identity_size) : blob_t ());
process_attach (cmd_.args.attach.engine);
process_seqnum ();
break;
case command_t::bind:
process_bind (cmd_.args.bind.pipe, cmd_.args.bind.peer_identity ?
blob_t (cmd_.args.bind.peer_identity,
cmd_.args.bind.peer_identity_size) : blob_t ());
process_bind (cmd_.args.bind.pipe);
process_seqnum ();
break;
......@@ -131,10 +126,6 @@ void zmq::object_t::process_command (command_t &cmd_)
default:
zmq_assert (false);
}
// The assumption here is that each command is processed once only,
// so deallocating it after processing is all right.
deallocate_command (&cmd_);
}
int zmq::object_t::register_endpoint (const char *addr_, endpoint_t &endpoint_)
......@@ -211,7 +202,7 @@ void zmq::object_t::send_own (own_t *destination_, own_t *object_)
}
void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
const blob_t &peer_identity_, bool inc_seqnum_)
bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
......@@ -223,25 +214,11 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
cmd.destination = destination_;
cmd.type = command_t::attach;
cmd.args.attach.engine = engine_;
if (peer_identity_.empty ()) {
cmd.args.attach.peer_identity_size = 0;
cmd.args.attach.peer_identity = NULL;
}
else {
zmq_assert (peer_identity_.size () <= 0xff);
cmd.args.attach.peer_identity_size =
(unsigned char) peer_identity_.size ();
cmd.args.attach.peer_identity =
(unsigned char*) malloc (peer_identity_.size ());
alloc_assert (cmd.args.attach.peer_identity_size);
memcpy (cmd.args.attach.peer_identity, peer_identity_.data (),
peer_identity_.size ());
}
send_command (cmd);
}
void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_,
const blob_t &peer_identity_, bool inc_seqnum_)
bool inc_seqnum_)
{
if (inc_seqnum_)
destination_->inc_seqnum ();
......@@ -253,20 +230,6 @@ void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_,
cmd.destination = destination_;
cmd.type = command_t::bind;
cmd.args.bind.pipe = pipe_;
if (peer_identity_.empty ()) {
cmd.args.bind.peer_identity_size = 0;
cmd.args.bind.peer_identity = NULL;
}
else {
zmq_assert (peer_identity_.size () <= 0xff);
cmd.args.bind.peer_identity_size =
(unsigned char) peer_identity_.size ();
cmd.args.bind.peer_identity =
(unsigned char*) malloc (peer_identity_.size ());
alloc_assert (cmd.args.bind.peer_identity_size);
memcpy (cmd.args.bind.peer_identity, peer_identity_.data (),
peer_identity_.size ());
}
send_command (cmd);
}
......@@ -413,13 +376,12 @@ void zmq::object_t::process_own (own_t *object_)
zmq_assert (false);
}
void zmq::object_t::process_attach (i_engine *engine_,
const blob_t &peer_identity_)
void zmq::object_t::process_attach (i_engine *engine_)
{
zmq_assert (false);
}
void zmq::object_t::process_bind (pipe_t *pipe_, const blob_t &peer_identity_)
void zmq::object_t::process_bind (pipe_t *pipe_)
{
zmq_assert (false);
}
......
......@@ -22,7 +22,6 @@
#define __ZMQ_OBJECT_HPP_INCLUDED__
#include "stdint.hpp"
#include "blob.hpp"
namespace zmq
{
......@@ -64,10 +63,9 @@ namespace zmq
void send_own (class own_t *destination_,
class own_t *object_);
void send_attach (class session_t *destination_,
struct i_engine *engine_, const blob_t &peer_identity_,
bool inc_seqnum_ = true);
struct i_engine *engine_, bool inc_seqnum_ = true);
void send_bind (class own_t *destination_, class pipe_t *pipe_,
const blob_t &peer_identity_, bool inc_seqnum_ = true);
bool inc_seqnum_ = true);
void send_activate_read (class pipe_t *destination_);
void send_activate_write (class pipe_t *destination_,
uint64_t msgs_read_);
......@@ -87,10 +85,8 @@ namespace zmq
virtual void process_stop ();
virtual void process_plug ();
virtual void process_own (class own_t *object_);
virtual void process_attach (struct i_engine *engine_,
const blob_t &peer_identity_);
virtual void process_bind (class pipe_t *pipe_,
const blob_t &peer_identity_);
virtual void process_attach (struct i_engine *engine_);
virtual void process_bind (class pipe_t *pipe_);
virtual void process_activate_read ();
virtual void process_activate_write (uint64_t msgs_read_);
virtual void process_hiccup (void *pipe_);
......
......@@ -75,19 +75,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
affinity = *((uint64_t*) optval_);
return 0;
case ZMQ_IDENTITY:
// Empty identity is invalid as well as identity longer than
// 255 bytes. Identity starting with binary zero is invalid
// as these are used for auto-generated identities.
if (optvallen_ < 1 || optvallen_ > 255 ||
*((const unsigned char*) optval_) == 0) {
errno = EINVAL;
return -1;
}
identity.assign ((const unsigned char*) optval_, optvallen_);
return 0;
case ZMQ_RATE:
if (optvallen_ != sizeof (int) || *((int*) optval_) <= 0) {
errno = EINVAL;
......@@ -229,15 +216,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (uint64_t);
return 0;
case ZMQ_IDENTITY:
if (*optvallen_ < identity.size ()) {
errno = EINVAL;
return -1;
}
memcpy (optval_, identity.data (), identity.size ());
*optvallen_ = identity.size ();
return 0;
case ZMQ_RATE:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
......
......@@ -23,7 +23,6 @@
#include "stddef.h"
#include "stdint.hpp"
#include "blob.hpp"
namespace zmq
{
......@@ -39,8 +38,8 @@ namespace zmq
int sndhwm;
int rcvhwm;
// I/O thread affinity.
uint64_t affinity;
blob_t identity;
// Maximum tranfer rate [kb/s]. Default 100kb/s.
int rate;
......
......@@ -35,7 +35,7 @@ zmq::pair_t::~pair_t ()
zmq_assert (!pipe);
}
void zmq::pair_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
void zmq::pair_t::xattach_pipe (pipe_t *pipe_)
{
zmq_assert (!pipe);
pipe = pipe_;
......
......@@ -35,7 +35,7 @@ namespace zmq
~pair_t ();
// Overloads of functions from socket_base_t.
void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
void xattach_pipe (class pipe_t *pipe_);
int xsend (class msg_t *msg_, int flags_);
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
......
......@@ -38,7 +38,7 @@
#include "pgm_socket.hpp"
#include "config.hpp"
#include "err.hpp"
#include "uuid.hpp"
#include "random.hpp"
#include "stdint.hpp"
#ifndef MSG_ERRQUEUE
......@@ -253,20 +253,13 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
addr.sa_port = port_number;
addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT;
if (options.identity.size () > 0) {
// Create gsi from identity.
if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi,
options.identity.data (), options.identity.size ()))
goto err_abort;
} else {
// Create random GSI.
uint32_t buf [2];
buf [0] = generate_random ();
buf [1] = generate_random ();
if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, (uint8_t*) buf, 8))
goto err_abort;
// Generate GSI from UUID.
unsigned char buf [16];
generate_uuid (buf);
if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, buf, 16))
goto err_abort;
}
// Bind a transport to the specified network devices.
struct pgm_interface_req_t if_req;
......
......@@ -33,7 +33,7 @@ zmq::pull_t::~pull_t ()
{
}
void zmq::pull_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
void zmq::pull_t::xattach_pipe (pipe_t *pipe_)
{
zmq_assert (pipe_);
fq.attach (pipe_);
......
......@@ -38,7 +38,7 @@ namespace zmq
protected:
// Overloads of functions from socket_base_t.
void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
void xattach_pipe (class pipe_t *pipe_);
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
void xread_activated (class pipe_t *pipe_);
......
......@@ -33,7 +33,7 @@ zmq::push_t::~push_t ()
{
}
void zmq::push_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
void zmq::push_t::xattach_pipe (pipe_t *pipe_)
{
zmq_assert (pipe_);
lb.attach (pipe_);
......
......@@ -38,7 +38,7 @@ namespace zmq
protected:
// Overloads of functions from socket_base_t.
void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
void xattach_pipe (class pipe_t *pipe_);
int xsend (class msg_t *msg_, int flags_);
bool xhas_out ();
void xwrite_activated (class pipe_t *pipe_);
......
......@@ -18,23 +18,35 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
#endif
#include "random.hpp"
#include "stdint.hpp"
#include "uuid.hpp"
#include "err.hpp"
#include "clock.hpp"
void zmq::seed_random ()
{
#if defined ZMQ_HAVE_WINDOWS
int pid = (int) GetCurrentProcessId ();
#else
int pid = (int) getpid ();
#endif
srand ((unsigned int) (clock_t::now_us () + pid));
}
// Here we can use different ways of generating random data, as avialable
// on different platforms. At the moment, we'll assume the UUID is random
// enough to use for that purpose.
void zmq::generate_random (void *buf_, size_t size_)
uint32_t zmq::generate_random ()
{
// Collapsing an UUID into 4 bytes.
zmq_assert (size_ == 4);
uint32_t buff [4];
generate_uuid ((void*) buff);
uint32_t result = buff [0];
result ^= buff [1];
result ^= buff [2];
result ^= buff [3];
*((uint32_t*) buf_) = result;
// Compensate for the fact that rand() returns signed integer.
uint32_t low = (uint32_t) rand ();
uint32_t high = (uint32_t) rand ();
high <<= (sizeof (int) * 8 - 1);
return high | low;
}
......@@ -21,13 +21,16 @@
#ifndef __ZMQ_RANDOM_HPP_INCLUDED__
#define __ZMQ_RANDOM_HPP_INCLUDED__
#include <stddef.h>
#include "stdint.hpp"
namespace zmq
{
// Generates truly random bytes (not pseudo-random).
void generate_random (void *buf_, size_t size_);
// Seeds the random number generator.
void seed_random ();
// Generates random value.
uint32_t generate_random ();
}
......
......@@ -21,7 +21,6 @@
#include "req.hpp"
#include "err.hpp"
#include "msg.hpp"
#include "uuid.hpp"
#include "wire.hpp"
#include "random.hpp"
#include "likely.hpp"
......@@ -30,12 +29,9 @@ zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) :
xreq_t (parent_, tid_),
receiving_reply (false),
message_begins (true),
request_id (0)
request_id (generate_random ())
{
options.type = ZMQ_REQ;
// Start the request ID sequence at an random point.
generate_random (&request_id, sizeof (request_id));
}
zmq::req_t::~req_t ()
......
......@@ -159,8 +159,7 @@ void zmq::session_t::process_plug ()
{
}
void zmq::session_t::process_attach (i_engine *engine_,
const blob_t &peer_identity_)
void zmq::session_t::process_attach (i_engine *engine_)
{
// If some other object (e.g. init) notifies us that the connection failed
// without creating an engine we need to start the reconnection process.
......@@ -171,7 +170,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
}
// Trigger the notfication event about the attachment.
if (!attached (peer_identity_)) {
if (!attached ()) {
delete engine_;
return;
}
......@@ -193,7 +192,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
pipe = pipes [0];
// Ask socket to plug into the remote end of the pipe.
send_bind (socket, pipes [1], peer_identity_);
send_bind (socket, pipes [1]);
}
// Plug in the engine.
......@@ -272,24 +271,9 @@ void zmq::session_t::timer_event (int id_)
pipe->terminate (false);
}
bool zmq::session_t::has_engine ()
bool zmq::session_t::attached ()
{
return engine != NULL;
}
bool zmq::session_t::register_session (const blob_t &name_, session_t *session_)
{
return socket->register_session (name_, session_);
}
void zmq::session_t::unregister_session (const blob_t &name_)
{
socket->unregister_session (name_);
}
bool zmq::session_t::attached (const blob_t &peer_identity_)
{
return xattached (peer_identity_);
return xattached ();
}
void zmq::session_t::detached ()
......
......@@ -24,7 +24,6 @@
#include "own.hpp"
#include "i_engine.hpp"
#include "io_object.hpp"
#include "blob.hpp"
#include "pipe.hpp"
namespace zmq
......@@ -64,27 +63,19 @@ namespace zmq
// the termination process when session is about to be detached from
// the peer. If it returns false, session will be terminated.
// To be overloaded by the derived session type.
virtual bool xattached (const blob_t &peer_identity_) = 0;
virtual bool xattached () = 0;
virtual bool xdetached () = 0;
// Returns true if there is an engine attached to the session.
bool has_engine ();
// Allows derives session types to (un)register session names.
bool register_session (const blob_t &name_, class session_t *session_);
void unregister_session (const blob_t &name_);
~session_t ();
private:
bool attached (const blob_t &peer_identity_);
bool attached ();
void detached ();
// Handlers for incoming commands.
void process_plug ();
void process_attach (struct i_engine *engine_,
const blob_t &peer_identity_);
void process_attach (struct i_engine *engine_);
void process_term (int linger_);
// i_poll_events handlers.
......
......@@ -45,7 +45,6 @@
#include "ctx.hpp"
#include "platform.hpp"
#include "likely.hpp"
#include "uuid.hpp"
#include "msg.hpp"
#include "pair.hpp"
......@@ -128,11 +127,6 @@ zmq::socket_base_t::~socket_base_t ()
{
zmq_assert (destroyed);
// Check whether there are no session leaks.
sessions_sync.lock ();
zmq_assert (sessions.empty ());
sessions_sync.unlock ();
// Mark the socket as dead.
tag = 0xdeadbeef;
}
......@@ -212,23 +206,14 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
return 0;
}
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_,
const blob_t &peer_identity_)
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_)
{
// First, register the pipe so that we can terminate it later on.
pipe_->set_event_sink (this);
pipes.push_back (pipe_);
// Then, pass the pipe to the specific socket type.
// If the peer haven't specified it's identity, let's generate one.
if (peer_identity_.size ()) {
xattach_pipe (pipe_, peer_identity_);
}
else {
blob_t identity (17, 0);
generate_uuid ((unsigned char*) identity.data () + 1);
xattach_pipe (pipe_, identity);
}
// Let the derived socket type know about new pipe.
xattach_pipe (pipe_);
// If the socket is already being closed, ask any new pipes to terminate
// straight away.
......@@ -423,12 +408,12 @@ int zmq::socket_base_t::connect (const char *addr_)
errno_assert (rc == 0);
// Attach local end of the pipe to this socket object.
attach_pipe (pipes [0], peer.options.identity);
attach_pipe (pipes [0]);
// Attach remote end of the pipe to the peer socket. Note that peer's
// seqnum was incremented in find_endpoint function. We don't need it
// increased here.
send_bind (peer.socket, pipes [1], options.identity, false);
send_bind (peer.socket, pipes [1], false);
return 0;
}
......@@ -454,7 +439,7 @@ int zmq::socket_base_t::connect (const char *addr_)
errno_assert (rc == 0);
// Attach local end of the pipe to the socket object.
attach_pipe (pipes [0], blob_t ());
attach_pipe (pipes [0]);
// Attach remote end of the pipe to the session object later on.
session->attach_pipe (pipes [1]);
......@@ -654,44 +639,6 @@ bool zmq::socket_base_t::has_out ()
return xhas_out ();
}
bool zmq::socket_base_t::register_session (const blob_t &name_,
session_t *session_)
{
sessions_sync.lock ();
bool registered = sessions.insert (
sessions_t::value_type (name_, session_)).second;
sessions_sync.unlock ();
return registered;
}
void zmq::socket_base_t::unregister_session (const blob_t &name_)
{
sessions_sync.lock ();
sessions_t::iterator it = sessions.find (name_);
zmq_assert (it != sessions.end ());
sessions.erase (it);
sessions_sync.unlock ();
}
zmq::session_t *zmq::socket_base_t::find_session (const blob_t &name_)
{
sessions_sync.lock ();
sessions_t::iterator it = sessions.find (name_);
if (it == sessions.end ()) {
sessions_sync.unlock ();
return NULL;
}
session_t *session = it->second;
// Prepare the session for subsequent attach command.
// Note the connect sessions have NULL pointers registered here.
if (session)
session->inc_seqnum ();
sessions_sync.unlock ();
return session;
}
void zmq::socket_base_t::start_reaping (poller_t *poller_)
{
// Plug the socket to the reaper thread.
......@@ -770,10 +717,9 @@ void zmq::socket_base_t::process_stop ()
ctx_terminated = true;
}
void zmq::socket_base_t::process_bind (pipe_t *pipe_,
const blob_t &peer_identity_)
void zmq::socket_base_t::process_bind (pipe_t *pipe_)
{
attach_pipe (pipe_, peer_identity_);
attach_pipe (pipe_);
}
void zmq::socket_base_t::process_unplug ()
......
......@@ -21,21 +21,17 @@
#ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__
#define __ZMQ_SOCKET_BASE_HPP_INCLUDED__
#include <map>
#include <vector>
#include <string>
#include "own.hpp"
#include "array.hpp"
#include "mutex.hpp"
#include "stdint.hpp"
#include "poller.hpp"
#include "atomic_counter.hpp"
#include "i_poll_events.hpp"
#include "mailbox.hpp"
#include "stdint.hpp"
#include "blob.hpp"
#include "pipe.hpp"
#include "own.hpp"
namespace zmq
{
......@@ -78,11 +74,6 @@ namespace zmq
bool has_in ();
bool has_out ();
// Registry of named sessions.
bool register_session (const blob_t &name_, class session_t *session_);
void unregister_session (const blob_t &name_);
class session_t *find_session (const blob_t &name_);
// Using this function reaper thread ask the socket to regiter with
// its poller.
void start_reaping (poller_t *poller_);
......@@ -106,8 +97,7 @@ namespace zmq
// Concrete algorithms for the x- methods are to be defined by
// individual socket types.
virtual void xattach_pipe (class pipe_t *pipe_,
const blob_t &peer_identity_) = 0;
virtual void xattach_pipe (class pipe_t *pipe_) = 0;
// The default implementation assumes there are no specific socket
// options for the particular socket type. If not so, overload this
......@@ -158,7 +148,7 @@ namespace zmq
int check_protocol (const std::string &protocol_);
// Register the pipe with this socket.
void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
void attach_pipe (class pipe_t *pipe_);
// Processes commands sent to this socket (if any). If timeout is -1,
// returns only after at least one command was processed.
......@@ -168,7 +158,7 @@ namespace zmq
// Handlers for incoming commands.
void process_stop ();
void process_bind (class pipe_t *pipe_, const blob_t &peer_identity_);
void process_bind (class pipe_t *pipe_);
void process_unplug ();
void process_term (int linger_);
......@@ -195,14 +185,6 @@ namespace zmq
// True if the last message received had MORE flag set.
bool rcvmore;
// Lists of existing sessions. This list is never referenced from
// within the socket, instead it is used by objects owned by
// the socket. As those objects can live in different threads,
// the access is synchronised by mutex.
typedef std::map <blob_t, session_t*> sessions_t;
sessions_t sessions;
mutex_t sessions_sync;
socket_base_t (const socket_base_t&);
const socket_base_t &operator = (const socket_base_t&);
};
......
......@@ -30,7 +30,7 @@ zmq::transient_session_t::~transient_session_t ()
{
}
bool zmq::transient_session_t::xattached (const blob_t &peer_identity_)
bool zmq::transient_session_t::xattached ()
{
// Transient session is always valid.
return true;
......
......@@ -40,7 +40,7 @@ namespace zmq
private:
// Handlers for events from session base class.
bool xattached (const blob_t &peer_identity_);
bool xattached ();
bool xdetached ();
transient_session_t (const transient_session_t&);
......
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string.h>
#include "uuid.hpp"
#include "err.hpp"
#include "stdint.hpp"
#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include <rpc.h>
void zmq::generate_uuid (void *buf_)
{
RPC_STATUS ret = UuidCreate ((::UUID*) buf_);
zmq_assert (ret == RPC_S_OK);
}
#elif defined ZMQ_HAVE_FREEBSD || defined ZMQ_HAVE_NETBSD
#include <uuid.h>
void zmq::generate_uuid (void *buf_)
{
uint32_t status;
uuid_create ((::uuid_t*) buf_, &status);
zmq_assert (status == uuid_s_ok);
}
#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_SOLARIS ||\
defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_CYGWIN
#include <uuid/uuid.h>
void zmq::generate_uuid (void *buf_)
{
uuid_generate ((unsigned char*) buf_);
}
#elif defined ZMQ_HAVE_OPENVMS
#include <starlet.h>
void zmq::generate_uuid (void *buf_)
{
sys$create_uid(buf_);
}
#else
#include <openssl/rand.h>
void zmq::generate_uuid (void *buf_)
{
unsigned char *buf = (unsigned char*) buf_;
// Generate random value.
int ret = RAND_bytes (buf, 16);
zmq_assert (ret == 1);
// Set UUID variant to 2 (UUID as specified in RFC4122).
const unsigned char variant = 2;
buf [8] = (buf [8] & 0x3f) | (variant << 6);
// Set UUID version to 4 (randomly or pseudo-randomly generated UUID).
const unsigned char version = 4;
buf [6] = (buf [6] & 0x0f) | (version << 4);
}
#endif
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_UUID_HPP_INCLUDED__
#define __ZMQ_UUID_HPP_INCLUDED__
namespace zmq
{
// This function provides RFC 4122 (a Universally Unique IDentifier)
// implementation. The resulting UUID will be 16 bytes long.
void generate_uuid (void *buf_);
}
#endif
......@@ -36,7 +36,7 @@ zmq::xpub_t::~xpub_t ()
{
}
void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
void zmq::xpub_t::xattach_pipe (pipe_t *pipe_)
{
zmq_assert (pipe_);
dist.attach (pipe_);
......
......@@ -41,7 +41,7 @@ namespace zmq
~xpub_t ();
// Implementations of virtual functions from socket_base_t.
void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
void xattach_pipe (class pipe_t *pipe_);
int xsend (class msg_t *msg_, int flags_);
bool xhas_out ();
int xrecv (class msg_t *msg_, int flags_);
......
......@@ -29,7 +29,8 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
prefetched (false),
more_in (false),
current_out (NULL),
more_out (false)
more_out (false),
next_peer_id (generate_random ())
{
options.type = ZMQ_XREP;
......@@ -38,9 +39,6 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
options.delay_on_disconnect = false;
prefetched_msg.init ();
// Start the peer ID sequence from a random point.
generate_random (&next_peer_id, sizeof (next_peer_id));
}
zmq::xrep_t::~xrep_t ()
......@@ -49,7 +47,7 @@ zmq::xrep_t::~xrep_t ()
prefetched_msg.close ();
}
void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
void zmq::xrep_t::xattach_pipe (pipe_t *pipe_)
{
zmq_assert (pipe_);
......
......@@ -41,8 +41,7 @@ namespace zmq
~xrep_t ();
// Overloads of functions from socket_base_t.
void xattach_pipe (class pipe_t *pipe_,
const blob_t &peer_identity_);
void xattach_pipe (class pipe_t *pipe_);
int xsend (class msg_t *msg_, int flags_);
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
......
......@@ -36,7 +36,7 @@ zmq::xreq_t::~xreq_t ()
{
}
void zmq::xreq_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
void zmq::xreq_t::xattach_pipe (pipe_t *pipe_)
{
zmq_assert (pipe_);
fq.attach (pipe_);
......
......@@ -40,7 +40,7 @@ namespace zmq
protected:
// Overloads of functions from socket_base_t.
void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
void xattach_pipe (class pipe_t *pipe_);
int xsend (class msg_t *msg_, int flags_);
int xrecv (class msg_t *msg_, int flags_);
bool xhas_in ();
......
......@@ -39,7 +39,7 @@ zmq::xsub_t::~xsub_t ()
errno_assert (rc == 0);
}
void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
void zmq::xsub_t::xattach_pipe (pipe_t *pipe_)
{
zmq_assert (pipe_);
fq.attach (pipe_);
......
......@@ -41,7 +41,7 @@ namespace zmq
protected:
// Overloads of functions from socket_base_t.
void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
void xattach_pipe (class pipe_t *pipe_);
int xsend (class msg_t *msg_, int flags_);
bool xhas_out ();
int xrecv (class msg_t *msg_, int flags_);
......
......@@ -30,7 +30,6 @@
#include "zmq_connecter.hpp"
#include "zmq_engine.hpp"
#include "zmq_init.hpp"
#include "io_thread.hpp"
#include "err.hpp"
......@@ -86,16 +85,12 @@ void zmq::zmq_connecter_t::out_event ()
return;
}
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);
// Create the engine object for this connection.
zmq_engine_t *engine = new (std::nothrow) zmq_engine_t (fd, options);
alloc_assert (engine);
// Create an init object.
zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, NULL,
session, fd, options);
alloc_assert (init);
launch_sibling (init);
// Attach the engine to the corresponding session object.
send_attach (session, engine);
// Shut the connecter down.
terminate ();
......
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string.h>
#include "zmq_init.hpp"
#include "transient_session.hpp"
#include "named_session.hpp"
#include "socket_base.hpp"
#include "zmq_engine.hpp"
#include "io_thread.hpp"
#include "session.hpp"
#include "uuid.hpp"
#include "blob.hpp"
#include "wire.hpp"
#include "err.hpp"
zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
socket_base_t *socket_, session_t *session_, fd_t fd_,
const options_t &options_) :
own_t (io_thread_, options_),
ephemeral_engine (NULL),
received (false),
socket (socket_),
session (session_),
io_thread (io_thread_)
{
// Create the engine object for this connection.
engine = new (std::nothrow) zmq_engine_t (fd_, options);
alloc_assert (engine);
// Generate an unique identity.
peer_identity.resize (17);
peer_identity [0] = 0;
generate_uuid (&peer_identity [1]);
// Create a list of messages to send on connection initialisation.
if (!options.identity.empty ()) {
msg_t msg;
int rc = msg.init_size (options.identity.size ());
errno_assert (rc == 0);
memcpy (msg.data () , options.identity.data (), msg.size ());
to_send.push_back (msg);
}
else {
msg_t msg;
int rc = msg.init ();
errno_assert (rc == 0);
to_send.push_back (msg);
}
}
zmq::zmq_init_t::~zmq_init_t ()
{
if (engine)
engine->terminate ();
// If there are unsent props still queued deallocate them.
for (to_send_t::iterator it = to_send.begin (); it != to_send.end ();
++it) {
int rc = it->close ();
errno_assert (rc == 0);
}
to_send.clear ();
}
bool zmq::zmq_init_t::read (msg_t *msg_)
{
// If the identity was already sent, do nothing.
if (to_send.empty ())
return false;
// Pass next property to the engine.
*msg_ = to_send.front ();
to_send.erase (to_send.begin ());
// Try finalize initialization.
finalise_initialisation ();
return true;
}
bool zmq::zmq_init_t::write (msg_t *msg_)
{
// If identity was already received, we are not interested
// in subsequent messages.
if (received)
return false;
// Retrieve the peer's identity, if any.
zmq_assert (!(msg_->flags () & msg_t::more));
size_t size = msg_->size ();
if (size) {
unsigned char *data = (unsigned char*) msg_->data ();
peer_identity.assign (data, size);
}
received = true;
finalise_initialisation ();
return true;
}
void zmq::zmq_init_t::flush ()
{
// Check if there's anything to flush.
if (!received)
return;
// Initialization is done, dispatch engine.
if (ephemeral_engine)
dispatch_engine ();
}
void zmq::zmq_init_t::detach ()
{
// This function is called by engine when disconnection occurs.
// If there is an associated session, send it a null engine to let it know
// that connection process was unsuccesful.
if (session)
send_attach (session, NULL, blob_t (), true);
// The engine will destroy itself, so let's just drop the pointer here and
// start termination of the init object.
engine = NULL;
terminate ();
}
void zmq::zmq_init_t::process_plug ()
{
zmq_assert (engine);
engine->plug (io_thread, this);
}
void zmq::zmq_init_t::process_unplug ()
{
if (engine)
engine->unplug ();
}
void zmq::zmq_init_t::finalise_initialisation ()
{
// Unplug and prepare to dispatch engine.
if (to_send.empty () && received) {
ephemeral_engine = engine;
engine = NULL;
ephemeral_engine->unplug ();
return;
}
}
void zmq::zmq_init_t::dispatch_engine ()
{
if (to_send.empty () && received) {
// Engine must be detached.
zmq_assert (!engine);
zmq_assert (ephemeral_engine);
// If we know what session we belong to, it's easy, just send the
// engine to that session and destroy the init object. Note that we
// know about the session only if this object is owned by it. Thus,
// lifetime of this object in contained in the lifetime of the session
// so the pointer cannot become invalid without notice.
if (session) {
send_attach (session, ephemeral_engine, peer_identity, true);
terminate ();
return;
}
// All the cases below are listener-based. Therefore we need the socket
// reference so that new sessions can bind to that socket.
zmq_assert (socket);
// We have no associated session. If the peer has no identity we'll
// create a transient session for the connection. Note that
// seqnum is incremented to account for attach command before the
// session is launched. That way we are sure it won't terminate before
// being attached.
if (peer_identity [0] == 0) {
session = new (std::nothrow) transient_session_t (io_thread,
socket, options);
alloc_assert (session);
session->inc_seqnum ();
launch_sibling (session);
send_attach (session, ephemeral_engine, peer_identity, false);
terminate ();
return;
}
// Try to find the session corresponding to the peer's identity.
// If found, send the engine to that session and destroy this object.
// Note that session's seqnum is incremented by find_session rather
// than by send_attach.
session = socket->find_session (peer_identity);
if (session) {
send_attach (session, ephemeral_engine, peer_identity, false);
terminate ();
return;
}
// There's no such named session. We have to create one. Note that
// seqnum is incremented to account for attach command before the
// session is launched. That way we are sure it won't terminate before
// being attached.
session = new (std::nothrow) named_session_t (io_thread, socket,
options, peer_identity);
alloc_assert (session);
session->inc_seqnum ();
launch_sibling (session);
send_attach (session, ephemeral_engine, peer_identity, false);
terminate ();
return;
}
}
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__
#define __ZMQ_ZMQ_INIT_HPP_INCLUDED__
#include <vector>
#include "i_engine.hpp"
#include "stdint.hpp"
#include "blob.hpp"
#include "msg.hpp"
#include "own.hpp"
#include "fd.hpp"
namespace zmq
{
// The class handles initialisation phase of 0MQ wire-level protocol.
class zmq_init_t :
public own_t,
public i_engine_sink
{
public:
zmq_init_t (class io_thread_t *io_thread_, class socket_base_t *socket_,
class session_t *session_, fd_t fd_, const options_t &options_);
~zmq_init_t ();
private:
void finalise_initialisation ();
void dispatch_engine ();
// i_engine_sink interface implementation.
bool read (class msg_t *msg_);
bool write (class msg_t *msg_);
void flush ();
void detach ();
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
// Associated wire-protocol engine.
i_engine *engine;
// Detached transient engine.
i_engine *ephemeral_engine;
// List of messages to send to the peer during the connection
// initiation phase.
typedef std::vector <msg_t> to_send_t;
to_send_t to_send;
// True if peer's identity was already received.
bool received;
// Socket the object belongs to.
class socket_base_t *socket;
// Reference to the session the init object belongs to.
// If the associated session is unknown and should be found
// depending on peer identity this value is NULL.
class session_t *session;
// Identity of the peer socket.
blob_t peer_identity;
// I/O thread the object is living in. It will be used to plug
// the engine into the same I/O thread.
class io_thread_t *io_thread;
zmq_init_t (const zmq_init_t&);
const zmq_init_t &operator = (const zmq_init_t&);
};
}
#endif
......@@ -21,7 +21,8 @@
#include <new>
#include "zmq_listener.hpp"
#include "zmq_init.hpp"
#include "transient_session.hpp"
#include "zmq_engine.hpp"
#include "io_thread.hpp"
#include "err.hpp"
......@@ -63,16 +64,21 @@ void zmq::zmq_listener_t::in_event ()
// TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd)
return;
// Create the engine object for this connection.
zmq_engine_t *engine = new (std::nothrow) zmq_engine_t (fd, options);
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);
// Create and launch an init object.
zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, socket,
NULL, fd, options);
alloc_assert (init);
launch_child (init);
// Create and launch a session object.
transient_session_t *session = new (std::nothrow)
transient_session_t (io_thread, socket, options);
alloc_assert (session);
session->inc_seqnum ();
launch_child (session);
send_attach (session, engine, false);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment