Commit f78d9b6b authored by Martin Sustrik's avatar Martin Sustrik

Session class separated into socket-type-specific sessions

This is a preliminary patch allowing for socket-type-specific
functionality in the I/O thread. For example, message format
can be checked asynchronously and misbehaved connections dropped
straight away.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent 78b02d14
...@@ -57,7 +57,7 @@ libzmq_la_SOURCES = \ ...@@ -57,7 +57,7 @@ libzmq_la_SOURCES = \
req.hpp \ req.hpp \
router.hpp \ router.hpp \
select.hpp \ select.hpp \
session.hpp \ session_base.hpp \
signaler.hpp \ signaler.hpp \
socket_base.hpp \ socket_base.hpp \
stdint.hpp \ stdint.hpp \
...@@ -117,7 +117,7 @@ libzmq_la_SOURCES = \ ...@@ -117,7 +117,7 @@ libzmq_la_SOURCES = \
rep.cpp \ rep.cpp \
req.cpp \ req.cpp \
select.cpp \ select.cpp \
session.cpp \ session_base.cpp \
signaler.cpp \ signaler.cpp \
socket_base.cpp \ socket_base.cpp \
stream_engine.cpp \ stream_engine.cpp \
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#include <string.h> #include <string.h>
#include "decoder.hpp" #include "decoder.hpp"
#include "session.hpp" #include "session_base.hpp"
#include "wire.hpp" #include "wire.hpp"
#include "err.hpp" #include "err.hpp"
...@@ -44,7 +44,7 @@ zmq::decoder_t::~decoder_t () ...@@ -44,7 +44,7 @@ zmq::decoder_t::~decoder_t ()
errno_assert (rc == 0); errno_assert (rc == 0);
} }
void zmq::decoder_t::set_session (session_t *session_) void zmq::decoder_t::set_session (session_base_t *session_)
{ {
session = session_; session = session_;
} }
......
...@@ -184,7 +184,7 @@ namespace zmq ...@@ -184,7 +184,7 @@ namespace zmq
decoder_t (size_t bufsize_, int64_t maxmsgsize_); decoder_t (size_t bufsize_, int64_t maxmsgsize_);
~decoder_t (); ~decoder_t ();
void set_session (class session_t *session_); void set_session (class session_base_t *session_);
private: private:
...@@ -193,7 +193,7 @@ namespace zmq ...@@ -193,7 +193,7 @@ namespace zmq
bool flags_ready (); bool flags_ready ();
bool message_ready (); bool message_ready ();
class session_t *session; class session_base_t *session;
unsigned char tmpbuf [8]; unsigned char tmpbuf [8];
msg_t in_progress; msg_t in_progress;
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
*/ */
#include "encoder.hpp" #include "encoder.hpp"
#include "session.hpp" #include "session_base.hpp"
#include "wire.hpp" #include "wire.hpp"
zmq::encoder_t::encoder_t (size_t bufsize_) : zmq::encoder_t::encoder_t (size_t bufsize_) :
...@@ -39,7 +39,7 @@ zmq::encoder_t::~encoder_t () ...@@ -39,7 +39,7 @@ zmq::encoder_t::~encoder_t ()
errno_assert (rc == 0); errno_assert (rc == 0);
} }
void zmq::encoder_t::set_session (session_t *session_) void zmq::encoder_t::set_session (session_base_t *session_)
{ {
session = session_; session = session_;
} }
......
...@@ -163,14 +163,14 @@ namespace zmq ...@@ -163,14 +163,14 @@ namespace zmq
encoder_t (size_t bufsize_); encoder_t (size_t bufsize_);
~encoder_t (); ~encoder_t ();
void set_session (class session_t *session_); void set_session (class session_base_t *session_);
private: private:
bool size_ready (); bool size_ready ();
bool message_ready (); bool message_ready ();
class session_t *session; class session_base_t *session;
msg_t in_progress; msg_t in_progress;
unsigned char tmpbuf [10]; unsigned char tmpbuf [10];
......
...@@ -32,7 +32,7 @@ namespace zmq ...@@ -32,7 +32,7 @@ namespace zmq
// Plug the engine to the session. // Plug the engine to the session.
virtual void plug (class io_thread_t *io_thread_, virtual void plug (class io_thread_t *io_thread_,
class session_t *session_) = 0; class session_base_t *session_) = 0;
// Unplug the engine from the session. // Unplug the engine from the session.
virtual void unplug () = 0; virtual void unplug () = 0;
......
...@@ -38,7 +38,7 @@ ...@@ -38,7 +38,7 @@
#include <sys/un.h> #include <sys/un.h>
zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_, zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
class session_t *session_, const options_t &options_, class session_base_t *session_, const options_t &options_,
const char *address_, bool wait_) : const char *address_, bool wait_) :
own_t (io_thread_, options_), own_t (io_thread_, options_),
io_object_t (io_thread_), io_object_t (io_thread_),
......
...@@ -41,7 +41,7 @@ namespace zmq ...@@ -41,7 +41,7 @@ namespace zmq
// If 'delay' is true connecter first waits for a while, then starts // If 'delay' is true connecter first waits for a while, then starts
// connection process. // connection process.
ipc_connecter_t (class io_thread_t *io_thread_, ipc_connecter_t (class io_thread_t *io_thread_,
class session_t *session_, const options_t &options_, class session_base_t *session_, const options_t &options_,
const char *address_, bool delay_); const char *address_, bool delay_);
~ipc_connecter_t (); ~ipc_connecter_t ();
...@@ -101,7 +101,7 @@ namespace zmq ...@@ -101,7 +101,7 @@ namespace zmq
bool wait; bool wait;
// Reference to the session we belong to. // Reference to the session we belong to.
class session_t *session; class session_base_t *session;
// Current reconnect ivl, updated for backoff strategy // Current reconnect ivl, updated for backoff strategy
int current_reconnect_ivl; int current_reconnect_ivl;
......
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
#include "stream_engine.hpp" #include "stream_engine.hpp"
#include "ipc_address.hpp" #include "ipc_address.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "session.hpp" #include "session_base.hpp"
#include "config.hpp" #include "config.hpp"
#include "err.hpp" #include "err.hpp"
#include "ip.hpp" #include "ip.hpp"
...@@ -87,9 +87,9 @@ void zmq::ipc_listener_t::in_event () ...@@ -87,9 +87,9 @@ void zmq::ipc_listener_t::in_event ()
zmq_assert (io_thread); zmq_assert (io_thread);
// Create and launch a session object. // Create and launch a session object.
session_t *session = new (std::nothrow) session_base_t *session = session_base_t::create (io_thread, false, socket,
session_t (io_thread, false, socket, options, NULL, NULL); options, NULL, NULL);
alloc_assert (session); errno_assert (session);
session->inc_seqnum (); session->inc_seqnum ();
launch_child (session); launch_child (session);
send_attach (session, engine, false); send_attach (session, engine, false);
......
...@@ -26,7 +26,7 @@ ...@@ -26,7 +26,7 @@
#include "err.hpp" #include "err.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "session.hpp" #include "session_base.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) : zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) :
...@@ -201,8 +201,8 @@ void zmq::object_t::send_own (own_t *destination_, own_t *object_) ...@@ -201,8 +201,8 @@ void zmq::object_t::send_own (own_t *destination_, own_t *object_)
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, void zmq::object_t::send_attach (session_base_t *destination_,
bool inc_seqnum_) i_engine *engine_, bool inc_seqnum_)
{ {
if (inc_seqnum_) if (inc_seqnum_)
destination_->inc_seqnum (); destination_->inc_seqnum ();
......
...@@ -62,7 +62,7 @@ namespace zmq ...@@ -62,7 +62,7 @@ namespace zmq
bool inc_seqnum_ = true); bool inc_seqnum_ = true);
void send_own (class own_t *destination_, void send_own (class own_t *destination_,
class own_t *object_); class own_t *object_);
void send_attach (class session_t *destination_, void send_attach (class session_base_t *destination_,
struct i_engine *engine_, bool inc_seqnum_ = true); struct i_engine *engine_, bool inc_seqnum_ = true);
void send_bind (class own_t *destination_, class pipe_t *pipe_, void send_bind (class own_t *destination_, class pipe_t *pipe_,
bool inc_seqnum_ = true); bool inc_seqnum_ = true);
......
...@@ -116,3 +116,15 @@ bool zmq::pair_t::xhas_out () ...@@ -116,3 +116,15 @@ bool zmq::pair_t::xhas_out ()
return result; return result;
} }
zmq::pair_session_t::pair_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_,
address_)
{
}
zmq::pair_session_t::~pair_session_t ()
{
}
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#define __ZMQ_PAIR_HPP_INCLUDED__ #define __ZMQ_PAIR_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "session_base.hpp"
namespace zmq namespace zmq
{ {
...@@ -52,6 +53,21 @@ namespace zmq ...@@ -52,6 +53,21 @@ namespace zmq
const pair_t &operator = (const pair_t&); const pair_t &operator = (const pair_t&);
}; };
class pair_session_t : public session_base_t
{
public:
pair_session_t (class io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~pair_session_t ();
private:
pair_session_t (const pair_session_t&);
const pair_session_t &operator = (const pair_session_t&);
};
} }
#endif #endif
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
#endif #endif
#include "pgm_receiver.hpp" #include "pgm_receiver.hpp"
#include "session.hpp" #include "session_base.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "wire.hpp" #include "wire.hpp"
#include "err.hpp" #include "err.hpp"
...@@ -57,7 +57,8 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_) ...@@ -57,7 +57,8 @@ int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
return pgm_socket.init (udp_encapsulation_, network_); return pgm_socket.init (udp_encapsulation_, network_);
} }
void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, session_t *session_) void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_,
session_base_t *session_)
{ {
// Retrieve PGM fds and start polling. // Retrieve PGM fds and start polling.
fd_t socket_fd = retired_fd; fd_t socket_fd = retired_fd;
......
...@@ -52,7 +52,8 @@ namespace zmq ...@@ -52,7 +52,8 @@ namespace zmq
int init (bool udp_encapsulation_, const char *network_); int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation. // i_engine interface implementation.
void plug (class io_thread_t *io_thread_, class session_t *session_); void plug (class io_thread_t *io_thread_,
class session_base_t *session_);
void unplug (); void unplug ();
void terminate (); void terminate ();
void activate_in (); void activate_in ();
...@@ -105,7 +106,7 @@ namespace zmq ...@@ -105,7 +106,7 @@ namespace zmq
options_t options; options_t options;
// Associated session. // Associated session.
class session_t *session; class session_base_t *session;
// Most recently used decoder. // Most recently used decoder.
decoder_t *mru_decoder; decoder_t *mru_decoder;
......
...@@ -30,7 +30,7 @@ ...@@ -30,7 +30,7 @@
#include "io_thread.hpp" #include "io_thread.hpp"
#include "pgm_sender.hpp" #include "pgm_sender.hpp"
#include "session.hpp" #include "session_base.hpp"
#include "err.hpp" #include "err.hpp"
#include "wire.hpp" #include "wire.hpp"
#include "stdint.hpp" #include "stdint.hpp"
...@@ -62,7 +62,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) ...@@ -62,7 +62,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
return rc; return rc;
} }
void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_t *session_) void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
{ {
// Alocate 2 fds for PGM socket. // Alocate 2 fds for PGM socket.
fd_t downlink_socket_fd = retired_fd; fd_t downlink_socket_fd = retired_fd;
......
...@@ -50,7 +50,8 @@ namespace zmq ...@@ -50,7 +50,8 @@ namespace zmq
int init (bool udp_encapsulation_, const char *network_); int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation. // i_engine interface implementation.
void plug (class io_thread_t *io_thread_, class session_t *session_); void plug (class io_thread_t *io_thread_,
class session_base_t *session_);
void unplug (); void unplug ();
void terminate (); void terminate ();
void activate_in (); void activate_in ();
......
...@@ -43,3 +43,15 @@ bool zmq::pub_t::xhas_in () ...@@ -43,3 +43,15 @@ bool zmq::pub_t::xhas_in ()
return false; return false;
} }
zmq::pub_session_t::pub_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
xpub_session_t (io_thread_, connect_, socket_, options_, protocol_,
address_)
{
}
zmq::pub_session_t::~pub_session_t ()
{
}
...@@ -43,6 +43,21 @@ namespace zmq ...@@ -43,6 +43,21 @@ namespace zmq
const pub_t &operator = (const pub_t&); const pub_t &operator = (const pub_t&);
}; };
class pub_session_t : public xpub_session_t
{
public:
pub_session_t (class io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~pub_session_t ();
private:
pub_session_t (const pub_session_t&);
const pub_session_t &operator = (const pub_session_t&);
};
} }
#endif #endif
...@@ -59,3 +59,15 @@ bool zmq::pull_t::xhas_in () ...@@ -59,3 +59,15 @@ bool zmq::pull_t::xhas_in ()
return fq.has_in (); return fq.has_in ();
} }
zmq::pull_session_t::pull_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_,
address_)
{
}
zmq::pull_session_t::~pull_session_t ()
{
}
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#define __ZMQ_PULL_HPP_INCLUDED__ #define __ZMQ_PULL_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "session_base.hpp"
#include "fq.hpp" #include "fq.hpp"
namespace zmq namespace zmq
...@@ -54,6 +55,21 @@ namespace zmq ...@@ -54,6 +55,21 @@ namespace zmq
}; };
class pull_session_t : public session_base_t
{
public:
pull_session_t (class io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~pull_session_t ();
private:
pull_session_t (const pull_session_t&);
const pull_session_t &operator = (const pull_session_t&);
};
} }
#endif #endif
...@@ -59,3 +59,15 @@ bool zmq::push_t::xhas_out () ...@@ -59,3 +59,15 @@ bool zmq::push_t::xhas_out ()
return lb.has_out (); return lb.has_out ();
} }
zmq::push_session_t::push_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_,
address_)
{
}
zmq::push_session_t::~push_session_t ()
{
}
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#define __ZMQ_PUSH_HPP_INCLUDED__ #define __ZMQ_PUSH_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "session_base.hpp"
#include "lb.hpp" #include "lb.hpp"
namespace zmq namespace zmq
...@@ -53,6 +54,21 @@ namespace zmq ...@@ -53,6 +54,21 @@ namespace zmq
const push_t &operator = (const push_t&); const push_t &operator = (const push_t&);
}; };
class push_session_t : public session_base_t
{
public:
push_session_t (class io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~push_session_t ();
private:
push_session_t (const push_session_t&);
const push_session_t &operator = (const push_session_t&);
};
} }
#endif #endif
...@@ -110,3 +110,15 @@ bool zmq::rep_t::xhas_out () ...@@ -110,3 +110,15 @@ bool zmq::rep_t::xhas_out ()
return xrep_t::xhas_out (); return xrep_t::xhas_out ();
} }
zmq::rep_session_t::rep_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
xrep_session_t (io_thread_, connect_, socket_, options_, protocol_,
address_)
{
}
zmq::rep_session_t::~rep_session_t ()
{
}
...@@ -54,6 +54,21 @@ namespace zmq ...@@ -54,6 +54,21 @@ namespace zmq
}; };
class rep_session_t : public xrep_session_t
{
public:
rep_session_t (class io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~rep_session_t ();
private:
rep_session_t (const rep_session_t&);
const rep_session_t &operator = (const rep_session_t&);
};
} }
#endif #endif
...@@ -146,4 +146,15 @@ bool zmq::req_t::xhas_out () ...@@ -146,4 +146,15 @@ bool zmq::req_t::xhas_out ()
return xreq_t::xhas_out (); return xreq_t::xhas_out ();
} }
zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
xreq_session_t (io_thread_, connect_, socket_, options_, protocol_,
address_)
{
}
zmq::req_session_t::~req_session_t ()
{
}
...@@ -58,6 +58,21 @@ namespace zmq ...@@ -58,6 +58,21 @@ namespace zmq
const req_t &operator = (const req_t&); const req_t &operator = (const req_t&);
}; };
class req_session_t : public xreq_session_t
{
public:
req_session_t (class io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~req_session_t ();
private:
req_session_t (const req_session_t&);
const req_session_t &operator = (const req_session_t&);
};
} }
#endif #endif
...@@ -270,5 +270,15 @@ bool zmq::router_t::xhas_out () ...@@ -270,5 +270,15 @@ bool zmq::router_t::xhas_out ()
return true; return true;
} }
zmq::router_session_t::router_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_,
address_)
{
}
zmq::router_session_t::~router_session_t ()
{
}
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include <deque> #include <deque>
#include "socket_base.hpp" #include "socket_base.hpp"
#include "session_base.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "msg.hpp" #include "msg.hpp"
#include "fq.hpp" #include "fq.hpp"
...@@ -102,6 +103,21 @@ namespace zmq ...@@ -102,6 +103,21 @@ namespace zmq
const router_t &operator = (const router_t&); const router_t &operator = (const router_t&);
}; };
class router_session_t : public session_base_t
{
public:
router_session_t (class io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~router_session_t ();
private:
router_session_t (const router_session_t&);
const router_session_t &operator = (const router_session_t&);
};
} }
#endif #endif
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "session.hpp" #include "session_base.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
#include "i_engine.hpp" #include "i_engine.hpp"
#include "err.hpp" #include "err.hpp"
...@@ -30,8 +30,82 @@ ...@@ -30,8 +30,82 @@
#include "pgm_sender.hpp" #include "pgm_sender.hpp"
#include "pgm_receiver.hpp" #include "pgm_receiver.hpp"
zmq::session_t::session_t (class io_thread_t *io_thread_, bool connect_, #include "req.hpp"
class socket_base_t *socket_, const options_t &options_, #include "xreq.hpp"
#include "rep.hpp"
#include "xrep.hpp"
#include "pub.hpp"
#include "xpub.hpp"
#include "sub.hpp"
#include "xsub.hpp"
#include "push.hpp"
#include "pull.hpp"
#include "router.hpp"
#include "pair.hpp"
zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
bool connect_, class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_)
{
session_base_t *s = NULL;
switch (options_.type) {
case ZMQ_REQ:
s = new (std::nothrow) req_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
break;
case ZMQ_XREQ:
s = new (std::nothrow) xreq_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
case ZMQ_REP:
s = new (std::nothrow) rep_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
break;
case ZMQ_XREP:
s = new (std::nothrow) xrep_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
break;
case ZMQ_PUB:
s = new (std::nothrow) pub_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
break;
case ZMQ_XPUB:
s = new (std::nothrow) xpub_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
break;
case ZMQ_SUB:
s = new (std::nothrow) sub_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
break;
case ZMQ_XSUB:
s = new (std::nothrow) xsub_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
break;
case ZMQ_PUSH:
s = new (std::nothrow) push_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
break;
case ZMQ_PULL:
s = new (std::nothrow) pull_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
break;
case ZMQ_ROUTER:
s = new (std::nothrow) router_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
break;
case ZMQ_PAIR:
s = new (std::nothrow) pair_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_);
break;
default:
errno = EINVAL;
return NULL;
}
alloc_assert (s);
return s;
}
zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
bool connect_, class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) : const char *protocol_, const char *address_) :
own_t (io_thread_, options_), own_t (io_thread_, options_),
io_object_t (io_thread_), io_object_t (io_thread_),
...@@ -50,7 +124,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, bool connect_, ...@@ -50,7 +124,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, bool connect_,
address = address_; address = address_;
} }
zmq::session_t::~session_t () zmq::session_base_t::~session_base_t ()
{ {
zmq_assert (!pipe); zmq_assert (!pipe);
...@@ -65,7 +139,7 @@ zmq::session_t::~session_t () ...@@ -65,7 +139,7 @@ zmq::session_t::~session_t ()
engine->terminate (); engine->terminate ();
} }
void zmq::session_t::attach_pipe (pipe_t *pipe_) void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
{ {
zmq_assert (!is_terminating ()); zmq_assert (!is_terminating ());
zmq_assert (!pipe); zmq_assert (!pipe);
...@@ -74,7 +148,7 @@ void zmq::session_t::attach_pipe (pipe_t *pipe_) ...@@ -74,7 +148,7 @@ void zmq::session_t::attach_pipe (pipe_t *pipe_)
pipe->set_event_sink (this); pipe->set_event_sink (this);
} }
bool zmq::session_t::read (msg_t *msg_) bool zmq::session_base_t::read (msg_t *msg_)
{ {
if (!pipe) if (!pipe)
return false; return false;
...@@ -87,7 +161,7 @@ bool zmq::session_t::read (msg_t *msg_) ...@@ -87,7 +161,7 @@ bool zmq::session_t::read (msg_t *msg_)
return true; return true;
} }
bool zmq::session_t::write (msg_t *msg_) bool zmq::session_base_t::write (msg_t *msg_)
{ {
if (pipe && pipe->write (msg_)) { if (pipe && pipe->write (msg_)) {
int rc = msg_->init (); int rc = msg_->init ();
...@@ -98,13 +172,13 @@ bool zmq::session_t::write (msg_t *msg_) ...@@ -98,13 +172,13 @@ bool zmq::session_t::write (msg_t *msg_)
return false; return false;
} }
void zmq::session_t::flush () void zmq::session_base_t::flush ()
{ {
if (pipe) if (pipe)
pipe->flush (); pipe->flush ();
} }
void zmq::session_t::clean_pipes () void zmq::session_base_t::clean_pipes ()
{ {
if (pipe) { if (pipe) {
...@@ -128,7 +202,7 @@ void zmq::session_t::clean_pipes () ...@@ -128,7 +202,7 @@ void zmq::session_t::clean_pipes ()
} }
} }
void zmq::session_t::terminated (pipe_t *pipe_) void zmq::session_base_t::terminated (pipe_t *pipe_)
{ {
// Drop the reference to the deallocated pipe. // Drop the reference to the deallocated pipe.
zmq_assert (pipe == pipe_); zmq_assert (pipe == pipe_);
...@@ -141,7 +215,7 @@ void zmq::session_t::terminated (pipe_t *pipe_) ...@@ -141,7 +215,7 @@ void zmq::session_t::terminated (pipe_t *pipe_)
proceed_with_term (); proceed_with_term ();
} }
void zmq::session_t::read_activated (pipe_t *pipe_) void zmq::session_base_t::read_activated (pipe_t *pipe_)
{ {
zmq_assert (pipe == pipe_); zmq_assert (pipe == pipe_);
...@@ -151,7 +225,7 @@ void zmq::session_t::read_activated (pipe_t *pipe_) ...@@ -151,7 +225,7 @@ void zmq::session_t::read_activated (pipe_t *pipe_)
pipe->check_read (); pipe->check_read ();
} }
void zmq::session_t::write_activated (pipe_t *pipe_) void zmq::session_base_t::write_activated (pipe_t *pipe_)
{ {
zmq_assert (pipe == pipe_); zmq_assert (pipe == pipe_);
...@@ -159,20 +233,20 @@ void zmq::session_t::write_activated (pipe_t *pipe_) ...@@ -159,20 +233,20 @@ void zmq::session_t::write_activated (pipe_t *pipe_)
engine->activate_in (); engine->activate_in ();
} }
void zmq::session_t::hiccuped (pipe_t *pipe_) void zmq::session_base_t::hiccuped (pipe_t *pipe_)
{ {
// Hiccups are always sent from session to socket, not the other // Hiccups are always sent from session to socket, not the other
// way round. // way round.
zmq_assert (false); zmq_assert (false);
} }
void zmq::session_t::process_plug () void zmq::session_base_t::process_plug ()
{ {
if (connect) if (connect)
start_connecting (false); start_connecting (false);
} }
void zmq::session_t::process_attach (i_engine *engine_) void zmq::session_base_t::process_attach (i_engine *engine_)
{ {
// If some other object (e.g. init) notifies us that the connection failed // If some other object (e.g. init) notifies us that the connection failed
// without creating an engine we need to start the reconnection process. // without creating an engine we need to start the reconnection process.
...@@ -208,7 +282,7 @@ void zmq::session_t::process_attach (i_engine *engine_) ...@@ -208,7 +282,7 @@ void zmq::session_t::process_attach (i_engine *engine_)
engine->plug (io_thread, this); engine->plug (io_thread, this);
} }
void zmq::session_t::detach () void zmq::session_base_t::detach ()
{ {
// Engine is dead. Let's forget about it. // Engine is dead. Let's forget about it.
engine = NULL; engine = NULL;
...@@ -224,7 +298,7 @@ void zmq::session_t::detach () ...@@ -224,7 +298,7 @@ void zmq::session_t::detach ()
pipe->check_read (); pipe->check_read ();
} }
void zmq::session_t::process_term (int linger_) void zmq::session_base_t::process_term (int linger_)
{ {
zmq_assert (!pending); zmq_assert (!pending);
...@@ -257,7 +331,7 @@ void zmq::session_t::process_term (int linger_) ...@@ -257,7 +331,7 @@ void zmq::session_t::process_term (int linger_)
pipe->check_read (); pipe->check_read ();
} }
void zmq::session_t::proceed_with_term () void zmq::session_base_t::proceed_with_term ()
{ {
// The pending phase have just ended. // The pending phase have just ended.
pending = false; pending = false;
...@@ -266,7 +340,7 @@ void zmq::session_t::proceed_with_term () ...@@ -266,7 +340,7 @@ void zmq::session_t::proceed_with_term ()
own_t::process_term (0); own_t::process_term (0);
} }
void zmq::session_t::timer_event (int id_) void zmq::session_base_t::timer_event (int id_)
{ {
// Linger period expired. We can proceed with termination even though // Linger period expired. We can proceed with termination even though
// there are still pending messages to be sent. // there are still pending messages to be sent.
...@@ -278,7 +352,7 @@ void zmq::session_t::timer_event (int id_) ...@@ -278,7 +352,7 @@ void zmq::session_t::timer_event (int id_)
pipe->terminate (false); pipe->terminate (false);
} }
void zmq::session_t::detached () void zmq::session_base_t::detached ()
{ {
// Transient session self-destructs after peer disconnects. // Transient session self-destructs after peer disconnects.
if (!connect) { if (!connect) {
...@@ -295,7 +369,7 @@ void zmq::session_t::detached () ...@@ -295,7 +369,7 @@ void zmq::session_t::detached ()
pipe->hiccup (); pipe->hiccup ();
} }
void zmq::session_t::start_connecting (bool wait_) void zmq::session_base_t::start_connecting (bool wait_)
{ {
zmq_assert (connect); zmq_assert (connect);
......
...@@ -18,8 +18,8 @@ ...@@ -18,8 +18,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef __ZMQ_SESSION_HPP_INCLUDED__ #ifndef __ZMQ_SESSION_BASE_HPP_INCLUDED__
#define __ZMQ_SESSION_HPP_INCLUDED__ #define __ZMQ_SESSION_BASE_HPP_INCLUDED__
#include <string> #include <string>
...@@ -31,16 +31,18 @@ ...@@ -31,16 +31,18 @@
namespace zmq namespace zmq
{ {
class session_t : class session_base_t :
public own_t, public own_t,
public io_object_t, public io_object_t,
public i_pipe_events public i_pipe_events
{ {
public: public:
session_t (class io_thread_t *io_thread_, bool connect_, // Create a session of the particular type.
class socket_base_t *socket_, const options_t &options_, static session_base_t *create (class io_thread_t *io_thread_,
const char *protocol_, const char *address_); bool connect_, class socket_base_t *socket_,
const options_t &options_, const char *protocol_,
const char *address_);
// To be used once only, when creating the session. // To be used once only, when creating the session.
void attach_pipe (class pipe_t *pipe_); void attach_pipe (class pipe_t *pipe_);
...@@ -57,9 +59,14 @@ namespace zmq ...@@ -57,9 +59,14 @@ namespace zmq
void hiccuped (class pipe_t *pipe_); void hiccuped (class pipe_t *pipe_);
void terminated (class pipe_t *pipe_); void terminated (class pipe_t *pipe_);
private: protected:
~session_t (); session_base_t (class io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~session_base_t ();
private:
void start_connecting (bool wait_); void start_connecting (bool wait_);
...@@ -115,8 +122,8 @@ namespace zmq ...@@ -115,8 +122,8 @@ namespace zmq
std::string protocol; std::string protocol;
std::string address; std::string address;
session_t (const session_t&); session_base_t (const session_base_t&);
const session_t &operator = (const session_t&); const session_base_t &operator = (const session_base_t&);
}; };
} }
......
...@@ -39,7 +39,7 @@ ...@@ -39,7 +39,7 @@
#include "vtcp_listener.hpp" #include "vtcp_listener.hpp"
#include "tcp_connecter.hpp" #include "tcp_connecter.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "session.hpp" #include "session_base.hpp"
#include "config.hpp" #include "config.hpp"
#include "clock.hpp" #include "clock.hpp"
#include "pipe.hpp" #include "pipe.hpp"
...@@ -480,9 +480,9 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -480,9 +480,9 @@ int zmq::socket_base_t::connect (const char *addr_)
} }
// Create session. // Create session.
session_t *session = new (std::nothrow) session_t ( session_base_t *session = session_base_t::create (io_thread, true, this,
io_thread, true, this, options, protocol.c_str (), address.c_str ()); options, protocol.c_str (), address.c_str ());
alloc_assert (session); errno_assert (session);
// Create a bi-directional pipe. // Create a bi-directional pipe.
object_t *parents [2] = {this, session}; object_t *parents [2] = {this, session};
......
...@@ -36,7 +36,7 @@ ...@@ -36,7 +36,7 @@
#include "stream_engine.hpp" #include "stream_engine.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "session.hpp" #include "session_base.hpp"
#include "config.hpp" #include "config.hpp"
#include "err.hpp" #include "err.hpp"
#include "ip.hpp" #include "ip.hpp"
...@@ -102,7 +102,8 @@ zmq::stream_engine_t::~stream_engine_t () ...@@ -102,7 +102,8 @@ zmq::stream_engine_t::~stream_engine_t ()
} }
} }
void zmq::stream_engine_t::plug (io_thread_t *io_thread_, session_t *session_) void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
session_base_t *session_)
{ {
zmq_assert (!plugged); zmq_assert (!plugged);
plugged = true; plugged = true;
......
...@@ -44,7 +44,8 @@ namespace zmq ...@@ -44,7 +44,8 @@ namespace zmq
~stream_engine_t (); ~stream_engine_t ();
// i_engine interface implementation. // i_engine interface implementation.
void plug (class io_thread_t *io_thread_, class session_t *session_); void plug (class io_thread_t *io_thread_,
class session_base_t *session_);
void unplug (); void unplug ();
void terminate (); void terminate ();
void activate_in (); void activate_in ();
...@@ -84,10 +85,10 @@ namespace zmq ...@@ -84,10 +85,10 @@ namespace zmq
encoder_t encoder; encoder_t encoder;
// The session this engine is attached to. // The session this engine is attached to.
class session_t *session; class session_base_t *session;
// Detached transient session. // Detached transient session.
class session_t *leftover_session; class session_base_t *leftover_session;
options_t options; options_t options;
......
...@@ -79,3 +79,15 @@ bool zmq::sub_t::xhas_out () ...@@ -79,3 +79,15 @@ bool zmq::sub_t::xhas_out ()
return false; return false;
} }
zmq::sub_session_t::sub_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
xsub_session_t (io_thread_, connect_, socket_, options_, protocol_,
address_)
{
}
zmq::sub_session_t::~sub_session_t ()
{
}
...@@ -45,6 +45,21 @@ namespace zmq ...@@ -45,6 +45,21 @@ namespace zmq
const sub_t &operator = (const sub_t&); const sub_t &operator = (const sub_t&);
}; };
class sub_session_t : public xsub_session_t
{
public:
sub_session_t (class io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~sub_session_t ();
private:
sub_session_t (const sub_session_t&);
const sub_session_t &operator = (const sub_session_t&);
};
} }
#endif #endif
...@@ -46,7 +46,7 @@ ...@@ -46,7 +46,7 @@
#endif #endif
zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
class session_t *session_, const options_t &options_, class session_base_t *session_, const options_t &options_,
const char *address_, bool wait_) : const char *address_, bool wait_) :
own_t (io_thread_, options_), own_t (io_thread_, options_),
io_object_t (io_thread_), io_object_t (io_thread_),
......
...@@ -37,7 +37,7 @@ namespace zmq ...@@ -37,7 +37,7 @@ namespace zmq
// If 'delay' is true connecter first waits for a while, then starts // If 'delay' is true connecter first waits for a while, then starts
// connection process. // connection process.
tcp_connecter_t (class io_thread_t *io_thread_, tcp_connecter_t (class io_thread_t *io_thread_,
class session_t *session_, const options_t &options_, class session_base_t *session_, const options_t &options_,
const char *address_, bool delay_); const char *address_, bool delay_);
~tcp_connecter_t (); ~tcp_connecter_t ();
...@@ -97,7 +97,7 @@ namespace zmq ...@@ -97,7 +97,7 @@ namespace zmq
bool wait; bool wait;
// Reference to the session we belong to. // Reference to the session we belong to.
class session_t *session; class session_base_t *session;
// Current reconnect ivl, updated for backoff strategy // Current reconnect ivl, updated for backoff strategy
int current_reconnect_ivl; int current_reconnect_ivl;
......
...@@ -26,7 +26,7 @@ ...@@ -26,7 +26,7 @@
#include "tcp_listener.hpp" #include "tcp_listener.hpp"
#include "stream_engine.hpp" #include "stream_engine.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "session.hpp" #include "session_base.hpp"
#include "config.hpp" #include "config.hpp"
#include "err.hpp" #include "err.hpp"
#include "ip.hpp" #include "ip.hpp"
...@@ -97,9 +97,9 @@ void zmq::tcp_listener_t::in_event () ...@@ -97,9 +97,9 @@ void zmq::tcp_listener_t::in_event ()
zmq_assert (io_thread); zmq_assert (io_thread);
// Create and launch a session object. // Create and launch a session object.
session_t *session = new (std::nothrow) session_base_t *session = session_base_t::create (io_thread, false, socket,
session_t (io_thread, false, socket, options, NULL, NULL); options, NULL, NULL);
alloc_assert (session); errno_assert (session);
session->inc_seqnum (); session->inc_seqnum ();
launch_child (session); launch_child (session);
send_attach (session, engine, false); send_attach (session, engine, false);
......
...@@ -50,7 +50,7 @@ ...@@ -50,7 +50,7 @@
#endif #endif
zmq::vtcp_connecter_t::vtcp_connecter_t (class io_thread_t *io_thread_, zmq::vtcp_connecter_t::vtcp_connecter_t (class io_thread_t *io_thread_,
class session_t *session_, const options_t &options_, class session_base_t *session_, const options_t &options_,
const char *address_, bool wait_) : const char *address_, bool wait_) :
own_t (io_thread_, options_), own_t (io_thread_, options_),
io_object_t (io_thread_), io_object_t (io_thread_),
......
...@@ -43,7 +43,7 @@ namespace zmq ...@@ -43,7 +43,7 @@ namespace zmq
// If 'delay' is true connecter first waits for a while, then starts // If 'delay' is true connecter first waits for a while, then starts
// connection process. // connection process.
vtcp_connecter_t (class io_thread_t *io_thread_, vtcp_connecter_t (class io_thread_t *io_thread_,
class session_t *session_, const options_t &options_, class session_base_t *session_, const options_t &options_,
const char *address_, bool delay_); const char *address_, bool delay_);
~vtcp_connecter_t (); ~vtcp_connecter_t ();
...@@ -104,7 +104,7 @@ namespace zmq ...@@ -104,7 +104,7 @@ namespace zmq
bool wait; bool wait;
// Reference to the session we belong to. // Reference to the session we belong to.
class session_t *session; class session_base_t *session;
// Current reconnect ivl, updated for backoff strategy // Current reconnect ivl, updated for backoff strategy
int current_reconnect_ivl; int current_reconnect_ivl;
......
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
#include <vtcp.h> #include <vtcp.h>
#include "stream_engine.hpp" #include "stream_engine.hpp"
#include "session.hpp" #include "session_base.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "err.hpp" #include "err.hpp"
#include "ip.hpp" #include "ip.hpp"
...@@ -113,8 +113,8 @@ void zmq::vtcp_listener_t::in_event () ...@@ -113,8 +113,8 @@ void zmq::vtcp_listener_t::in_event ()
zmq_assert (io_thread); zmq_assert (io_thread);
// Create and launch a session object. // Create and launch a session object.
session_t *session = new (std::nothrow) session_base_t *session = session_base_t::create (io_thread, false, socket,
session_t (io_thread, false, socket, options, NULL, NULL); options, NULL, NULL);
alloc_assert (session); alloc_assert (session);
session->inc_seqnum (); session->inc_seqnum ();
launch_child (session); launch_child (session);
......
...@@ -169,3 +169,15 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, ...@@ -169,3 +169,15 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
} }
} }
zmq::xpub_session_t::xpub_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_,
address_)
{
}
zmq::xpub_session_t::~xpub_session_t ()
{
}
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include <string> #include <string>
#include "socket_base.hpp" #include "socket_base.hpp"
#include "session_base.hpp"
#include "mtrie.hpp" #include "mtrie.hpp"
#include "array.hpp" #include "array.hpp"
#include "dist.hpp" #include "dist.hpp"
...@@ -79,6 +80,21 @@ namespace zmq ...@@ -79,6 +80,21 @@ namespace zmq
const xpub_t &operator = (const xpub_t&); const xpub_t &operator = (const xpub_t&);
}; };
class xpub_session_t : public session_base_t
{
public:
xpub_session_t (class io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~xpub_session_t ();
private:
xpub_session_t (const xpub_session_t&);
const xpub_session_t &operator = (const xpub_session_t&);
};
} }
#endif #endif
...@@ -243,5 +243,15 @@ bool zmq::xrep_t::xhas_out () ...@@ -243,5 +243,15 @@ bool zmq::xrep_t::xhas_out ()
return true; return true;
} }
zmq::xrep_session_t::xrep_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_,
address_)
{
}
zmq::xrep_session_t::~xrep_session_t ()
{
}
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include <map> #include <map>
#include "socket_base.hpp" #include "socket_base.hpp"
#include "session_base.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "msg.hpp" #include "msg.hpp"
#include "fq.hpp" #include "fq.hpp"
...@@ -93,6 +94,21 @@ namespace zmq ...@@ -93,6 +94,21 @@ namespace zmq
const xrep_t &operator = (const xrep_t&); const xrep_t &operator = (const xrep_t&);
}; };
class xrep_session_t : public session_base_t
{
public:
xrep_session_t (class io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~xrep_session_t ();
private:
xrep_session_t (const xrep_session_t&);
const xrep_session_t &operator = (const xrep_session_t&);
};
} }
#endif #endif
...@@ -79,3 +79,15 @@ void zmq::xreq_t::xterminated (pipe_t *pipe_) ...@@ -79,3 +79,15 @@ void zmq::xreq_t::xterminated (pipe_t *pipe_)
lb.terminated (pipe_); lb.terminated (pipe_);
} }
zmq::xreq_session_t::xreq_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_,
address_)
{
}
zmq::xreq_session_t::~xreq_session_t ()
{
}
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#define __ZMQ_XREQ_HPP_INCLUDED__ #define __ZMQ_XREQ_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "session_base.hpp"
#include "fq.hpp" #include "fq.hpp"
#include "lb.hpp" #include "lb.hpp"
...@@ -60,6 +61,21 @@ namespace zmq ...@@ -60,6 +61,21 @@ namespace zmq
const xreq_t &operator = (const xreq_t&); const xreq_t &operator = (const xreq_t&);
}; };
class xreq_session_t : public session_base_t
{
public:
xreq_session_t (class io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~xreq_session_t ();
private:
xreq_session_t (const xreq_session_t&);
const xreq_session_t &operator = (const xreq_session_t&);
};
} }
#endif #endif
...@@ -213,4 +213,15 @@ void zmq::xsub_t::send_subscription (unsigned char *data_, size_t size_, ...@@ -213,4 +213,15 @@ void zmq::xsub_t::send_subscription (unsigned char *data_, size_t size_,
zmq_assert (sent); zmq_assert (sent);
} }
zmq::xsub_session_t::xsub_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_,
address_)
{
}
zmq::xsub_session_t::~xsub_session_t ()
{
}
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#define __ZMQ_XSUB_HPP_INCLUDED__ #define __ZMQ_XSUB_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "session_base.hpp"
#include "dist.hpp" #include "dist.hpp"
#include "fq.hpp" #include "fq.hpp"
#include "trie.hpp" #include "trie.hpp"
...@@ -83,6 +84,21 @@ namespace zmq ...@@ -83,6 +84,21 @@ namespace zmq
const xsub_t &operator = (const xsub_t&); const xsub_t &operator = (const xsub_t&);
}; };
class xsub_session_t : public session_base_t
{
public:
xsub_session_t (class io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_);
~xsub_session_t ();
private:
xsub_session_t (const xsub_session_t&);
const xsub_session_t &operator = (const xsub_session_t&);
};
} }
#endif #endif
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment