Commit 7c1dca54 authored by Martin Sustrik's avatar Martin Sustrik

Session classes merged into a single class

Removal of ZMQ_IDENTITY resulted in various session classes doing
almost the same thing. This patch merges the classes into a single
class.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent f716b571
...@@ -12,7 +12,6 @@ libzmq_la_SOURCES = \ ...@@ -12,7 +12,6 @@ libzmq_la_SOURCES = \
clock.hpp \ clock.hpp \
command.hpp \ command.hpp \
config.hpp \ config.hpp \
connect_session.hpp \
ctx.hpp \ ctx.hpp \
decoder.hpp \ decoder.hpp \
devpoll.hpp \ devpoll.hpp \
...@@ -64,7 +63,6 @@ libzmq_la_SOURCES = \ ...@@ -64,7 +63,6 @@ libzmq_la_SOURCES = \
tcp_listener.hpp \ tcp_listener.hpp \
tcp_socket.hpp \ tcp_socket.hpp \
thread.hpp \ thread.hpp \
transient_session.hpp \
trie.hpp \ trie.hpp \
windows.hpp \ windows.hpp \
wire.hpp \ wire.hpp \
...@@ -79,7 +77,6 @@ libzmq_la_SOURCES = \ ...@@ -79,7 +77,6 @@ libzmq_la_SOURCES = \
zmq_listener.hpp \ zmq_listener.hpp \
clock.cpp \ clock.cpp \
ctx.cpp \ ctx.cpp \
connect_session.cpp \
decoder.cpp \ decoder.cpp \
devpoll.cpp \ devpoll.cpp \
dist.cpp \ dist.cpp \
...@@ -122,7 +119,6 @@ libzmq_la_SOURCES = \ ...@@ -122,7 +119,6 @@ libzmq_la_SOURCES = \
tcp_listener.cpp \ tcp_listener.cpp \
tcp_socket.cpp \ tcp_socket.cpp \
thread.cpp \ thread.cpp \
transient_session.cpp \
trie.cpp \ trie.cpp \
xpub.cpp \ xpub.cpp \
xrep.cpp \ xrep.cpp \
......
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "connect_session.hpp"
#include "zmq_connecter.hpp"
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
#include "err.hpp"
zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
session_t (io_thread_, socket_, options_),
protocol (protocol_),
address (address_)
{
}
zmq::connect_session_t::~connect_session_t ()
{
}
void zmq::connect_session_t::process_plug ()
{
// Start connection process immediately.
start_connecting (false);
}
void zmq::connect_session_t::start_connecting (bool wait_)
{
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);
// Create the connecter object.
// Both TCP and IPC transports are using the same infrastructure.
if (protocol == "tcp" || protocol == "ipc") {
zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (
io_thread, this, options, protocol.c_str (), address.c_str (),
wait_);
alloc_assert (connecter);
launch_child (connecter);
return;
}
#if defined ZMQ_HAVE_OPENPGM
// Both PGM and EPGM transports are using the same infrastructure.
if (protocol == "pgm" || protocol == "epgm") {
// For EPGM transport with UDP encapsulation of PGM is used.
bool udp_encapsulation = (protocol == "epgm");
// At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect'
// exists with PGM anyway.
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
// PGM sender.
pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
io_thread, options);
alloc_assert (pgm_sender);
int rc = pgm_sender->init (udp_encapsulation, address.c_str ());
zmq_assert (rc == 0);
send_attach (this, pgm_sender);
}
else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) {
// PGM receiver.
pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
io_thread, options);
alloc_assert (pgm_receiver);
int rc = pgm_receiver->init (udp_encapsulation, address.c_str ());
zmq_assert (rc == 0);
send_attach (this, pgm_receiver);
}
else
zmq_assert (false);
return;
}
#endif
zmq_assert (false);
}
bool zmq::connect_session_t::xattached ()
{
return true;
}
bool zmq::connect_session_t::xdetached ()
{
// Reconnect.
start_connecting (true);
// Don't tear the session down.
return true;
}
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_CONNECT_SESSION_HPP_INCLUDED__
#define __ZMQ_CONNECT_SESSION_HPP_INCLUDED__
#include <string>
#include "session.hpp"
namespace zmq
{
// Connect session contains an address to connect to. On disconnect it
// attempts to reconnect.
class connect_session_t : public session_t
{
public:
connect_session_t (class io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~connect_session_t ();
private:
// Handlers for events from session base class.
bool xattached ();
bool xdetached ();
// Start the connection process.
void start_connecting (bool wait_);
// Command handlers.
void process_plug ();
// Address to connect to.
std::string protocol;
std::string address;
connect_session_t (const connect_session_t&);
const connect_session_t &operator = (const connect_session_t&);
};
}
#endif
...@@ -22,13 +22,13 @@ ...@@ -22,13 +22,13 @@
#include <string.h> #include <string.h>
#include "decoder.hpp" #include "decoder.hpp"
#include "i_engine.hpp" #include "session.hpp"
#include "wire.hpp" #include "wire.hpp"
#include "err.hpp" #include "err.hpp"
zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) : zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
decoder_base_t <decoder_t> (bufsize_), decoder_base_t <decoder_t> (bufsize_),
sink (NULL), session (NULL),
maxmsgsize (maxmsgsize_) maxmsgsize (maxmsgsize_)
{ {
int rc = in_progress.init (); int rc = in_progress.init ();
...@@ -44,9 +44,9 @@ zmq::decoder_t::~decoder_t () ...@@ -44,9 +44,9 @@ zmq::decoder_t::~decoder_t ()
errno_assert (rc == 0); errno_assert (rc == 0);
} }
void zmq::decoder_t::set_sink (i_engine_sink *sink_) void zmq::decoder_t::set_session (session_t *session_)
{ {
sink = sink_; session = session_;
} }
bool zmq::decoder_t::one_byte_size_ready () bool zmq::decoder_t::one_byte_size_ready ()
...@@ -136,7 +136,7 @@ bool zmq::decoder_t::message_ready () ...@@ -136,7 +136,7 @@ bool zmq::decoder_t::message_ready ()
{ {
// Message is completely read. Push it further and start reading // Message is completely read. Push it further and start reading
// new message. (in_progress is a 0-byte message after this point.) // new message. (in_progress is a 0-byte message after this point.)
if (!sink || !sink->write (&in_progress)) if (!session || !session->write (&in_progress))
return false; return false;
next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready); next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);
......
...@@ -184,7 +184,7 @@ namespace zmq ...@@ -184,7 +184,7 @@ namespace zmq
decoder_t (size_t bufsize_, int64_t maxmsgsize_); decoder_t (size_t bufsize_, int64_t maxmsgsize_);
~decoder_t (); ~decoder_t ();
void set_sink (struct i_engine_sink *sink_); void set_session (class session_t *session_);
private: private:
...@@ -193,7 +193,7 @@ namespace zmq ...@@ -193,7 +193,7 @@ namespace zmq
bool flags_ready (); bool flags_ready ();
bool message_ready (); bool message_ready ();
struct i_engine_sink *sink; class session_t *session;
unsigned char tmpbuf [8]; unsigned char tmpbuf [8];
msg_t in_progress; msg_t in_progress;
......
...@@ -19,12 +19,12 @@ ...@@ -19,12 +19,12 @@
*/ */
#include "encoder.hpp" #include "encoder.hpp"
#include "i_engine.hpp" #include "session.hpp"
#include "wire.hpp" #include "wire.hpp"
zmq::encoder_t::encoder_t (size_t bufsize_) : zmq::encoder_t::encoder_t (size_t bufsize_) :
encoder_base_t <encoder_t> (bufsize_), encoder_base_t <encoder_t> (bufsize_),
sink (NULL) session (NULL)
{ {
int rc = in_progress.init (); int rc = in_progress.init ();
errno_assert (rc == 0); errno_assert (rc == 0);
...@@ -39,9 +39,9 @@ zmq::encoder_t::~encoder_t () ...@@ -39,9 +39,9 @@ zmq::encoder_t::~encoder_t ()
errno_assert (rc == 0); errno_assert (rc == 0);
} }
void zmq::encoder_t::set_sink (i_engine_sink *sink_) void zmq::encoder_t::set_session (session_t *session_)
{ {
sink = sink_; session = session_;
} }
bool zmq::encoder_t::size_ready () bool zmq::encoder_t::size_ready ()
...@@ -62,7 +62,7 @@ bool zmq::encoder_t::message_ready () ...@@ -62,7 +62,7 @@ bool zmq::encoder_t::message_ready ()
// Note that new state is set only if write is successful. That way // Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine // unsuccessful write will cause retry on the next state machine
// invocation. // invocation.
if (!sink || !sink->read (&in_progress)) { if (!session || !session->read (&in_progress)) {
rc = in_progress.init (); rc = in_progress.init ();
errno_assert (rc == 0); errno_assert (rc == 0);
return false; return false;
......
...@@ -163,14 +163,14 @@ namespace zmq ...@@ -163,14 +163,14 @@ namespace zmq
encoder_t (size_t bufsize_); encoder_t (size_t bufsize_);
~encoder_t (); ~encoder_t ();
void set_sink (struct i_engine_sink *sink_); void set_session (class session_t *session_);
private: private:
bool size_ready (); bool size_ready ();
bool message_ready (); bool message_ready ();
struct i_engine_sink *sink; class session_t *session;
msg_t in_progress; msg_t in_progress;
unsigned char tmpbuf [10]; unsigned char tmpbuf [10];
......
...@@ -32,7 +32,7 @@ namespace zmq ...@@ -32,7 +32,7 @@ namespace zmq
// Plug the engine to the session. // Plug the engine to the session.
virtual void plug (class io_thread_t *io_thread_, virtual void plug (class io_thread_t *io_thread_,
struct i_engine_sink *sink_) = 0; class session_t *session_) = 0;
// Unplug the engine from the session. // Unplug the engine from the session.
virtual void unplug () = 0; virtual void unplug () = 0;
...@@ -50,25 +50,6 @@ namespace zmq ...@@ -50,25 +50,6 @@ namespace zmq
virtual void activate_out () = 0; virtual void activate_out () = 0;
}; };
// Abstract interface to be implemented by engine sinks such as sessions.
struct i_engine_sink
{
virtual ~i_engine_sink () {}
// Engine asks for a message to send to the network.
virtual bool read (class msg_t *msg_) = 0;
// Engine received message from the network and sends it further on.
virtual bool write (class msg_t *msg_) = 0;
// Flush all the previously written messages.
virtual void flush () = 0;
// Engine is dead. Drop all the references to it.
virtual void detach () = 0;
};
} }
#endif #endif
...@@ -29,9 +29,10 @@ ...@@ -29,9 +29,10 @@
#endif #endif
#include "pgm_receiver.hpp" #include "pgm_receiver.hpp"
#include "err.hpp" #include "session.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "wire.hpp" #include "wire.hpp"
#include "err.hpp"
zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
const options_t &options_) : const options_t &options_) :
...@@ -39,7 +40,7 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, ...@@ -39,7 +40,7 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
has_rx_timer (false), has_rx_timer (false),
pgm_socket (true, options_), pgm_socket (true, options_),
options (options_), options (options_),
sink (NULL), session (NULL),
mru_decoder (NULL), mru_decoder (NULL),
pending_bytes (0) pending_bytes (0)
{ {
...@@ -56,7 +57,7 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_) ...@@ -56,7 +57,7 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
return pgm_socket.init (udp_encapsulation_, network_); return pgm_socket.init (udp_encapsulation_, network_);
} }
void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, session_t *session_)
{ {
// Retrieve PGM fds and start polling. // Retrieve PGM fds and start polling.
int socket_fd; int socket_fd;
...@@ -67,7 +68,7 @@ void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) ...@@ -67,7 +68,7 @@ void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
set_pollin (pipe_handle); set_pollin (pipe_handle);
set_pollin (socket_handle); set_pollin (socket_handle);
sink = sink_; session = session_;
// If there are any subscriptions already queued in the session, drop them. // If there are any subscriptions already queued in the session, drop them.
drop_subscriptions (); drop_subscriptions ();
...@@ -93,7 +94,7 @@ void zmq::pgm_receiver_t::unplug () ...@@ -93,7 +94,7 @@ void zmq::pgm_receiver_t::unplug ()
rm_fd (socket_handle); rm_fd (socket_handle);
rm_fd (pipe_handle); rm_fd (pipe_handle);
sink = NULL; session = NULL;
} }
void zmq::pgm_receiver_t::terminate () void zmq::pgm_receiver_t::terminate ()
...@@ -220,7 +221,7 @@ void zmq::pgm_receiver_t::in_event () ...@@ -220,7 +221,7 @@ void zmq::pgm_receiver_t::in_event ()
it->second.decoder = new (std::nothrow) decoder_t (0, it->second.decoder = new (std::nothrow) decoder_t (0,
options.maxmsgsize); options.maxmsgsize);
alloc_assert (it->second.decoder); alloc_assert (it->second.decoder);
it->second.decoder->set_sink (sink); it->second.decoder->set_session (session);
} }
mru_decoder = it->second.decoder; mru_decoder = it->second.decoder;
...@@ -246,7 +247,7 @@ void zmq::pgm_receiver_t::in_event () ...@@ -246,7 +247,7 @@ void zmq::pgm_receiver_t::in_event ()
} }
// Flush any messages decoder may have produced. // Flush any messages decoder may have produced.
sink->flush (); session->flush ();
} }
void zmq::pgm_receiver_t::timer_event (int token) void zmq::pgm_receiver_t::timer_event (int token)
...@@ -261,7 +262,7 @@ void zmq::pgm_receiver_t::timer_event (int token) ...@@ -261,7 +262,7 @@ void zmq::pgm_receiver_t::timer_event (int token)
void zmq::pgm_receiver_t::drop_subscriptions () void zmq::pgm_receiver_t::drop_subscriptions ()
{ {
msg_t msg; msg_t msg;
while (sink->read (&msg)) while (session->read (&msg))
msg.close (); msg.close ();
} }
......
...@@ -52,7 +52,7 @@ namespace zmq ...@@ -52,7 +52,7 @@ namespace zmq
int init (bool udp_encapsulation_, const char *network_); int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation. // i_engine interface implementation.
void plug (class io_thread_t *io_thread_, struct i_engine_sink *sink_); void plug (class io_thread_t *io_thread_, class session_t *session_);
void unplug (); void unplug ();
void terminate (); void terminate ();
void activate_in (); void activate_in ();
...@@ -105,7 +105,7 @@ namespace zmq ...@@ -105,7 +105,7 @@ namespace zmq
options_t options; options_t options;
// Associated session. // Associated session.
i_engine_sink *sink; class session_t *session;
// Most recently used decoder. // Most recently used decoder.
decoder_t *mru_decoder; decoder_t *mru_decoder;
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include "io_thread.hpp" #include "io_thread.hpp"
#include "pgm_sender.hpp" #include "pgm_sender.hpp"
#include "session.hpp"
#include "err.hpp" #include "err.hpp"
#include "wire.hpp" #include "wire.hpp"
#include "stdint.hpp" #include "stdint.hpp"
...@@ -61,7 +62,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) ...@@ -61,7 +62,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
return rc; return rc;
} }
void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_t *session_)
{ {
// Alocate 2 fds for PGM socket. // Alocate 2 fds for PGM socket.
int downlink_socket_fd = 0; int downlink_socket_fd = 0;
...@@ -69,7 +70,7 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) ...@@ -69,7 +70,7 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
int rdata_notify_fd = 0; int rdata_notify_fd = 0;
int pending_notify_fd = 0; int pending_notify_fd = 0;
encoder.set_sink (sink_); encoder.set_session (session_);
// Fill fds from PGM transport and add them to the poller. // Fill fds from PGM transport and add them to the poller.
pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd, pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
...@@ -94,9 +95,9 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) ...@@ -94,9 +95,9 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
// subscribe for all the messages. // subscribe for all the messages.
msg_t msg; msg_t msg;
msg.init (); msg.init ();
bool ok = sink_->write (&msg); bool ok = session_->write (&msg);
zmq_assert (ok); zmq_assert (ok);
sink_->flush (); session_->flush ();
} }
void zmq::pgm_sender_t::unplug () void zmq::pgm_sender_t::unplug ()
...@@ -115,7 +116,7 @@ void zmq::pgm_sender_t::unplug () ...@@ -115,7 +116,7 @@ void zmq::pgm_sender_t::unplug ()
rm_fd (uplink_handle); rm_fd (uplink_handle);
rm_fd (rdata_notify_handle); rm_fd (rdata_notify_handle);
rm_fd (pending_notify_handle); rm_fd (pending_notify_handle);
encoder.set_sink (NULL); encoder.set_session (NULL);
} }
void zmq::pgm_sender_t::terminate () void zmq::pgm_sender_t::terminate ()
......
...@@ -50,7 +50,7 @@ namespace zmq ...@@ -50,7 +50,7 @@ namespace zmq
int init (bool udp_encapsulation_, const char *network_); int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation. // i_engine interface implementation.
void plug (class io_thread_t *io_thread_, struct i_engine_sink *sink_); void plug (class io_thread_t *io_thread_, class session_t *session_);
void unplug (); void unplug ();
void terminate (); void terminate ();
void activate_in (); void activate_in ();
......
...@@ -24,11 +24,16 @@ ...@@ -24,11 +24,16 @@
#include "err.hpp" #include "err.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "likely.hpp" #include "likely.hpp"
#include "zmq_connecter.hpp"
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
zmq::session_t::session_t (class io_thread_t *io_thread_, zmq::session_t::session_t (class io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_) : class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
own_t (io_thread_, options_), own_t (io_thread_, options_),
io_object_t (io_thread_), io_object_t (io_thread_),
connect (connect_),
pipe (NULL), pipe (NULL),
incomplete_in (false), incomplete_in (false),
pending (false), pending (false),
...@@ -37,6 +42,10 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, ...@@ -37,6 +42,10 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
io_thread (io_thread_), io_thread (io_thread_),
has_linger_timer (false) has_linger_timer (false)
{ {
if (protocol_)
protocol = protocol_;
if (address_)
address = address_;
} }
zmq::session_t::~session_t () zmq::session_t::~session_t ()
...@@ -157,6 +166,8 @@ void zmq::session_t::hiccuped (pipe_t *pipe_) ...@@ -157,6 +166,8 @@ void zmq::session_t::hiccuped (pipe_t *pipe_)
void zmq::session_t::process_plug () void zmq::session_t::process_plug ()
{ {
if (connect)
start_connecting (false);
} }
void zmq::session_t::process_attach (i_engine *engine_) void zmq::session_t::process_attach (i_engine *engine_)
...@@ -169,12 +180,6 @@ void zmq::session_t::process_attach (i_engine *engine_) ...@@ -169,12 +180,6 @@ void zmq::session_t::process_attach (i_engine *engine_)
return; return;
} }
// Trigger the notfication event about the attachment.
if (!attached ()) {
delete engine_;
return;
}
// Create the pipe if it does not exist yet. // Create the pipe if it does not exist yet.
if (!pipe && !is_terminating ()) { if (!pipe && !is_terminating ()) {
object_t *parents [2] = {this, socket}; object_t *parents [2] = {this, socket};
...@@ -271,25 +276,87 @@ void zmq::session_t::timer_event (int id_) ...@@ -271,25 +276,87 @@ void zmq::session_t::timer_event (int id_)
pipe->terminate (false); pipe->terminate (false);
} }
bool zmq::session_t::attached ()
{
return xattached ();
}
void zmq::session_t::detached () void zmq::session_t::detached ()
{ {
if (!xdetached ()) { // Transient session self-destructs after peer disconnects.
if (!connect) {
// Derived session type have asked for session termination.
terminate (); terminate ();
return; return;
} }
// Reconnect.
start_connecting (true);
// For subscriber sockets we hiccup the inbound pipe, which will cause // For subscriber sockets we hiccup the inbound pipe, which will cause
// the socket object to resend all the subscriptions. // the socket object to resend all the subscriptions.
if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB)) if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
pipe->hiccup (); pipe->hiccup ();
} }
void zmq::session_t::start_connecting (bool wait_)
{
zmq_assert (connect);
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_assert (io_thread);
// Create the connecter object.
// Both TCP and IPC transports are using the same infrastructure.
if (protocol == "tcp" || protocol == "ipc") {
zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (
io_thread, this, options, protocol.c_str (), address.c_str (),
wait_);
alloc_assert (connecter);
launch_child (connecter);
return;
}
#if defined ZMQ_HAVE_OPENPGM
// Both PGM and EPGM transports are using the same infrastructure.
if (protocol == "pgm" || protocol == "epgm") {
// For EPGM transport with UDP encapsulation of PGM is used.
bool udp_encapsulation = (protocol == "epgm");
// At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect'
// exists with PGM anyway.
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
// PGM sender.
pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
io_thread, options);
alloc_assert (pgm_sender);
int rc = pgm_sender->init (udp_encapsulation, address.c_str ());
zmq_assert (rc == 0);
send_attach (this, pgm_sender);
}
else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) {
// PGM receiver.
pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
io_thread, options);
alloc_assert (pgm_receiver);
int rc = pgm_receiver->init (udp_encapsulation, address.c_str ());
zmq_assert (rc == 0);
send_attach (this, pgm_receiver);
}
else
zmq_assert (false);
return;
}
#endif
zmq_assert (false);
}
...@@ -21,6 +21,8 @@ ...@@ -21,6 +21,8 @@
#ifndef __ZMQ_SESSION_HPP_INCLUDED__ #ifndef __ZMQ_SESSION_HPP_INCLUDED__
#define __ZMQ_SESSION_HPP_INCLUDED__ #define __ZMQ_SESSION_HPP_INCLUDED__
#include <string>
#include "own.hpp" #include "own.hpp"
#include "i_engine.hpp" #include "i_engine.hpp"
#include "io_object.hpp" #include "io_object.hpp"
...@@ -32,18 +34,18 @@ namespace zmq ...@@ -32,18 +34,18 @@ namespace zmq
class session_t : class session_t :
public own_t, public own_t,
public io_object_t, public io_object_t,
public i_engine_sink,
public i_pipe_events public i_pipe_events
{ {
public: public:
session_t (class io_thread_t *io_thread_, session_t (class io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_); class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
// To be used once only, when creating the session. // To be used once only, when creating the session.
void attach_pipe (class pipe_t *pipe_); void attach_pipe (class pipe_t *pipe_);
// i_engine_sink interface implementation. // Following functions are the interface exposed towards the engine.
bool read (msg_t *msg_); bool read (msg_t *msg_);
bool write (msg_t *msg_); bool write (msg_t *msg_);
void flush (); void flush ();
...@@ -55,22 +57,12 @@ namespace zmq ...@@ -55,22 +57,12 @@ namespace zmq
void hiccuped (class pipe_t *pipe_); void hiccuped (class pipe_t *pipe_);
void terminated (class pipe_t *pipe_); void terminated (class pipe_t *pipe_);
protected: private:
// Events from the engine. Attached is triggered when session is
// attached to a peer. The function can reject the new peer by
// returning false. Detached is triggered at the beginning of
// the termination process when session is about to be detached from
// the peer. If it returns false, session will be terminated.
// To be overloaded by the derived session type.
virtual bool xattached () = 0;
virtual bool xdetached () = 0;
~session_t (); ~session_t ();
private: void start_connecting (bool wait_);
bool attached ();
void detached (); void detached ();
// Handlers for incoming commands. // Handlers for incoming commands.
...@@ -88,6 +80,10 @@ namespace zmq ...@@ -88,6 +80,10 @@ namespace zmq
// Call this function to move on with the delayed process_term. // Call this function to move on with the delayed process_term.
void proceed_with_term (); void proceed_with_term ();
// If true, this session (re)connects to the peer. Otherwise, it's
// a transient session created by the listener.
bool connect;
// Pipe connecting the session to its socket. // Pipe connecting the session to its socket.
class pipe_t *pipe; class pipe_t *pipe;
...@@ -115,6 +111,10 @@ namespace zmq ...@@ -115,6 +111,10 @@ namespace zmq
// True is linger timer is running. // True is linger timer is running.
bool has_linger_timer; bool has_linger_timer;
// Protocol and address to use when connecting.
std::string protocol;
std::string address;
session_t (const session_t&); session_t (const session_t&);
const session_t &operator = (const session_t&); const session_t &operator = (const session_t&);
}; };
......
...@@ -37,7 +37,7 @@ ...@@ -37,7 +37,7 @@
#include "zmq_listener.hpp" #include "zmq_listener.hpp"
#include "zmq_connecter.hpp" #include "zmq_connecter.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "connect_session.hpp" #include "session.hpp"
#include "config.hpp" #include "config.hpp"
#include "clock.hpp" #include "clock.hpp"
#include "pipe.hpp" #include "pipe.hpp"
...@@ -441,8 +441,8 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -441,8 +441,8 @@ int zmq::socket_base_t::connect (const char *addr_)
} }
// Create session. // Create session.
connect_session_t *session = new (std::nothrow) connect_session_t ( session_t *session = new (std::nothrow) session_t (
io_thread, this, options, protocol.c_str (), address.c_str ()); io_thread, true, this, options, protocol.c_str (), address.c_str ());
alloc_assert (session); alloc_assert (session);
// Create a bi-directional pipe. // Create a bi-directional pipe.
......
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "transient_session.hpp"
zmq::transient_session_t::transient_session_t (class io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_) :
session_t (io_thread_, socket_, options_)
{
}
zmq::transient_session_t::~transient_session_t ()
{
}
bool zmq::transient_session_t::xattached ()
{
// Transient session is always valid.
return true;
}
bool zmq::transient_session_t::xdetached ()
{
// There's no way to reestablish a transient session. Tear it down.
return false;
}
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_TRANSIENT_SESSION_HPP_INCLUDED__
#define __ZMQ_TRANSIENT_SESSION_HPP_INCLUDED__
#include "session.hpp"
namespace zmq
{
// Transient session is created by the listener when the connected peer
// stays anonymous. Transient session is destroyed on disconnect.
class transient_session_t : public session_t
{
public:
transient_session_t (class io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_);
~transient_session_t ();
private:
// Handlers for events from session base class.
bool xattached ();
bool xdetached ();
transient_session_t (const transient_session_t&);
const transient_session_t &operator = (const transient_session_t&);
};
}
#endif
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
#include "zmq_engine.hpp" #include "zmq_engine.hpp"
#include "zmq_connecter.hpp" #include "zmq_connecter.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "session.hpp"
#include "config.hpp" #include "config.hpp"
#include "err.hpp" #include "err.hpp"
...@@ -39,8 +40,8 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) : ...@@ -39,8 +40,8 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) :
outpos (NULL), outpos (NULL),
outsize (0), outsize (0),
encoder (out_batch_size), encoder (out_batch_size),
sink (NULL), session (NULL),
ephemeral_sink (NULL), leftover_session (NULL),
options (options_), options (options_),
plugged (false) plugged (false)
{ {
...@@ -54,18 +55,18 @@ zmq::zmq_engine_t::~zmq_engine_t () ...@@ -54,18 +55,18 @@ zmq::zmq_engine_t::~zmq_engine_t ()
zmq_assert (!plugged); zmq_assert (!plugged);
} }
void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, session_t *session_)
{ {
zmq_assert (!plugged); zmq_assert (!plugged);
plugged = true; plugged = true;
ephemeral_sink = NULL; leftover_session = NULL;
// Connect to session/init object. // Connect to session object.
zmq_assert (!sink); zmq_assert (!session);
zmq_assert (sink_); zmq_assert (session_);
encoder.set_sink (sink_); encoder.set_session (session_);
decoder.set_sink (sink_); decoder.set_session (session_);
sink = sink_; session = session_;
// Connect to I/O threads poller object. // Connect to I/O threads poller object.
io_object_t::plug (io_thread_); io_object_t::plug (io_thread_);
...@@ -88,11 +89,11 @@ void zmq::zmq_engine_t::unplug () ...@@ -88,11 +89,11 @@ void zmq::zmq_engine_t::unplug ()
// Disconnect from I/O threads poller object. // Disconnect from I/O threads poller object.
io_object_t::unplug (); io_object_t::unplug ();
// Disconnect from init/session object. // Disconnect from session object.
encoder.set_sink (NULL); encoder.set_session (NULL);
decoder.set_sink (NULL); decoder.set_session (NULL);
ephemeral_sink = sink; leftover_session = session;
sink = NULL; session = NULL;
} }
void zmq::zmq_engine_t::terminate () void zmq::zmq_engine_t::terminate ()
...@@ -133,9 +134,7 @@ void zmq::zmq_engine_t::in_event () ...@@ -133,9 +134,7 @@ void zmq::zmq_engine_t::in_event ()
// Stop polling for input if we got stuck. // Stop polling for input if we got stuck.
if (processed < insize) { if (processed < insize) {
// This may happen if queue limits are in effect or when // This may happen if queue limits are in effect.
// init object reads all required information from the socket
// and rejects to read more data.
if (plugged) if (plugged)
reset_pollin (handle); reset_pollin (handle);
} }
...@@ -148,13 +147,13 @@ void zmq::zmq_engine_t::in_event () ...@@ -148,13 +147,13 @@ void zmq::zmq_engine_t::in_event ()
// Flush all messages the decoder may have produced. // Flush all messages the decoder may have produced.
// If IO handler has unplugged engine, flush transient IO handler. // If IO handler has unplugged engine, flush transient IO handler.
if (unlikely (!plugged)) { if (unlikely (!plugged)) {
zmq_assert (ephemeral_sink); zmq_assert (leftover_session);
ephemeral_sink->flush (); leftover_session->flush ();
} else { } else {
sink->flush (); session->flush ();
} }
if (sink && disconnection) if (session && disconnection)
error (); error ();
} }
...@@ -168,8 +167,8 @@ void zmq::zmq_engine_t::out_event () ...@@ -168,8 +167,8 @@ void zmq::zmq_engine_t::out_event ()
// If IO handler has unplugged engine, flush transient IO handler. // If IO handler has unplugged engine, flush transient IO handler.
if (unlikely (!plugged)) { if (unlikely (!plugged)) {
zmq_assert (ephemeral_sink); zmq_assert (leftover_session);
ephemeral_sink->flush (); leftover_session->flush ();
return; return;
} }
...@@ -218,8 +217,8 @@ void zmq::zmq_engine_t::activate_in () ...@@ -218,8 +217,8 @@ void zmq::zmq_engine_t::activate_in ()
void zmq::zmq_engine_t::error () void zmq::zmq_engine_t::error ()
{ {
zmq_assert (sink); zmq_assert (session);
sink->detach (); session->detach ();
unplug (); unplug ();
delete this; delete this;
} }
...@@ -43,7 +43,7 @@ namespace zmq ...@@ -43,7 +43,7 @@ namespace zmq
~zmq_engine_t (); ~zmq_engine_t ();
// i_engine interface implementation. // i_engine interface implementation.
void plug (class io_thread_t *io_thread_, struct i_engine_sink *sink_); void plug (class io_thread_t *io_thread_, class session_t *session_);
void unplug (); void unplug ();
void terminate (); void terminate ();
void activate_in (); void activate_in ();
...@@ -69,10 +69,11 @@ namespace zmq ...@@ -69,10 +69,11 @@ namespace zmq
size_t outsize; size_t outsize;
encoder_t encoder; encoder_t encoder;
i_engine_sink *sink; // The session this engine is attached to.
class session_t *session;
// Detached transient sink. // Detached transient session.
i_engine_sink *ephemeral_sink; class session_t *leftover_session;
options_t options; options_t options;
......
...@@ -21,9 +21,9 @@ ...@@ -21,9 +21,9 @@
#include <new> #include <new>
#include "zmq_listener.hpp" #include "zmq_listener.hpp"
#include "transient_session.hpp"
#include "zmq_engine.hpp" #include "zmq_engine.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "session.hpp"
#include "err.hpp" #include "err.hpp"
zmq::zmq_listener_t::zmq_listener_t (io_thread_t *io_thread_, zmq::zmq_listener_t::zmq_listener_t (io_thread_t *io_thread_,
...@@ -74,8 +74,8 @@ void zmq::zmq_listener_t::in_event () ...@@ -74,8 +74,8 @@ void zmq::zmq_listener_t::in_event ()
zmq_assert (io_thread); zmq_assert (io_thread);
// Create and launch a session object. // Create and launch a session object.
transient_session_t *session = new (std::nothrow) session_t *session = new (std::nothrow)
transient_session_t (io_thread, socket, options); session_t (io_thread, false, socket, options, NULL, NULL);
alloc_assert (session); alloc_assert (session);
session->inc_seqnum (); session->inc_seqnum ();
launch_child (session); launch_child (session);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment