Commit c12fedc7 authored by Pieter Hintjens's avatar Pieter Hintjens

Completed internal renaming of XREP/XREQ to ROUTER/DEALER

parent 75809b27
......@@ -19,17 +19,17 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "xreq.hpp"
#include "dealer.hpp"
#include "err.hpp"
#include "msg.hpp"
zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
zmq::dealer_t::dealer_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
prefetched (false)
{
options.type = ZMQ_XREQ;
options.type = ZMQ_DEALER;
// TODO: Uncomment the following line when XREQ will become true XREQ
// TODO: Uncomment the following line when DEALER will become true DEALER
// rather than generic dealer socket.
// If the socket is closing we can drop all the outbound requests. There'll
// be noone to receive the replies anyway.
......@@ -41,24 +41,24 @@ zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
prefetched_msg.init ();
}
zmq::xreq_t::~xreq_t ()
zmq::dealer_t::~dealer_t ()
{
prefetched_msg.close ();
}
void zmq::xreq_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
zmq_assert (pipe_);
fq.attach (pipe_);
lb.attach (pipe_);
}
int zmq::xreq_t::xsend (msg_t *msg_, int flags_)
int zmq::dealer_t::xsend (msg_t *msg_, int flags_)
{
return lb.send (msg_, flags_);
}
int zmq::xreq_t::xrecv (msg_t *msg_, int flags_)
int zmq::dealer_t::xrecv (msg_t *msg_, int flags_)
{
// If there is a prefetched message, return it.
if (prefetched) {
......@@ -68,7 +68,7 @@ int zmq::xreq_t::xrecv (msg_t *msg_, int flags_)
return 0;
}
// XREQ socket doesn't use identities. We can safely drop it and
// DEALER socket doesn't use identities. We can safely drop it and
while (true) {
int rc = fq.recv (msg_, flags_);
if (rc != 0)
......@@ -79,14 +79,14 @@ int zmq::xreq_t::xrecv (msg_t *msg_, int flags_)
return 0;
}
bool zmq::xreq_t::xhas_in ()
bool zmq::dealer_t::xhas_in ()
{
// We may already have a message pre-fetched.
if (prefetched)
return true;
// Try to read the next message to the pre-fetch buffer.
int rc = xreq_t::xrecv (&prefetched_msg, ZMQ_DONTWAIT);
int rc = dealer_t::xrecv (&prefetched_msg, ZMQ_DONTWAIT);
if (rc != 0 && errno == EAGAIN)
return false;
zmq_assert (rc == 0);
......@@ -94,35 +94,35 @@ bool zmq::xreq_t::xhas_in ()
return true;
}
bool zmq::xreq_t::xhas_out ()
bool zmq::dealer_t::xhas_out ()
{
return lb.has_out ();
}
void zmq::xreq_t::xread_activated (pipe_t *pipe_)
void zmq::dealer_t::xread_activated (pipe_t *pipe_)
{
fq.activated (pipe_);
}
void zmq::xreq_t::xwrite_activated (pipe_t *pipe_)
void zmq::dealer_t::xwrite_activated (pipe_t *pipe_)
{
lb.activated (pipe_);
}
void zmq::xreq_t::xterminated (pipe_t *pipe_)
void zmq::dealer_t::xterminated (pipe_t *pipe_)
{
fq.terminated (pipe_);
lb.terminated (pipe_);
}
zmq::xreq_session_t::xreq_session_t (io_thread_t *io_thread_, bool connect_,
zmq::dealer_session_t::dealer_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const address_t *addr_) :
session_base_t (io_thread_, connect_, socket_, options_, addr_)
{
}
zmq::xreq_session_t::~xreq_session_t ()
zmq::dealer_session_t::~dealer_session_t ()
{
}
......@@ -18,8 +18,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_XREQ_HPP_INCLUDED__
#define __ZMQ_XREQ_HPP_INCLUDED__
#ifndef __ZMQ_DEALER_HPP_INCLUDED__
#define __ZMQ_DEALER_HPP_INCLUDED__
#include "socket_base.hpp"
#include "session_base.hpp"
......@@ -35,13 +35,13 @@ namespace zmq
class io_thread_t;
class socket_base_t;
class xreq_t :
class dealer_t :
public socket_base_t
{
public:
xreq_t (zmq::ctx_t *parent_, uint32_t tid_, int sid);
~xreq_t ();
dealer_t (zmq::ctx_t *parent_, uint32_t tid_, int sid);
~dealer_t ();
protected:
......@@ -68,23 +68,23 @@ namespace zmq
// Holds the prefetched message.
msg_t prefetched_msg;
xreq_t (const xreq_t&);
const xreq_t &operator = (const xreq_t&);
dealer_t (const dealer_t&);
const dealer_t &operator = (const dealer_t&);
};
class xreq_session_t : public session_base_t
class dealer_session_t : public session_base_t
{
public:
xreq_session_t (zmq::io_thread_t *io_thread_, bool connect_,
dealer_session_t (zmq::io_thread_t *io_thread_, bool connect_,
zmq::socket_base_t *socket_, const options_t &options_,
const address_t *addr_);
~xreq_session_t ();
~dealer_session_t ();
private:
xreq_session_t (const xreq_session_t&);
const xreq_session_t &operator = (const xreq_session_t&);
dealer_session_t (const dealer_session_t&);
const dealer_session_t &operator = (const dealer_session_t&);
};
}
......
......@@ -24,7 +24,7 @@
#include "msg.hpp"
zmq::rep_t::rep_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
xrep_t (parent_, tid_, sid_),
router_t (parent_, tid_, sid_),
sending_reply (false),
request_begins (true)
{
......@@ -46,7 +46,7 @@ int zmq::rep_t::xsend (msg_t *msg_, int flags_)
bool more = msg_->flags () & msg_t::more ? true : false;
// Push message to the reply pipe.
int rc = xrep_t::xsend (msg_, flags_);
int rc = router_t::xsend (msg_, flags_);
if (rc != 0)
return rc;
......@@ -69,12 +69,12 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
// to the reply pipe.
if (request_begins) {
while (true) {
int rc = xrep_t::xrecv (msg_, flags_);
int rc = router_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
zmq_assert (msg_->flags () & msg_t::more);
bool bottom = (msg_->size () == 0);
rc = xrep_t::xsend (msg_, flags_);
rc = router_t::xsend (msg_, flags_);
errno_assert (rc == 0);
if (bottom)
break;
......@@ -83,7 +83,7 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
}
// Get next message part to return to the user.
int rc = xrep_t::xrecv (msg_, flags_);
int rc = router_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
......@@ -101,7 +101,7 @@ bool zmq::rep_t::xhas_in ()
if (sending_reply)
return false;
return xrep_t::xhas_in ();
return router_t::xhas_in ();
}
bool zmq::rep_t::xhas_out ()
......@@ -109,13 +109,13 @@ bool zmq::rep_t::xhas_out ()
if (!sending_reply)
return false;
return xrep_t::xhas_out ();
return router_t::xhas_out ();
}
zmq::rep_session_t::rep_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const address_t *addr_) :
xrep_session_t (io_thread_, connect_, socket_, options_, addr_)
router_session_t (io_thread_, connect_, socket_, options_, addr_)
{
}
......
......@@ -22,7 +22,7 @@
#ifndef __ZMQ_REP_HPP_INCLUDED__
#define __ZMQ_REP_HPP_INCLUDED__
#include "xrep.hpp"
#include "router.hpp"
namespace zmq
{
......@@ -32,7 +32,7 @@ namespace zmq
class io_thread_t;
class socket_base_t;
class rep_t : public xrep_t
class rep_t : public router_t
{
public:
......@@ -60,7 +60,7 @@ namespace zmq
};
class rep_session_t : public xrep_session_t
class rep_session_t : public router_session_t
{
public:
......
......@@ -28,7 +28,7 @@
#include "likely.hpp"
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
xreq_t (parent_, tid_, sid_),
dealer_t (parent_, tid_, sid_),
receiving_reply (false),
message_begins (true)
{
......@@ -54,7 +54,7 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_)
int rc = bottom.init ();
errno_assert (rc == 0);
bottom.set_flags (msg_t::more);
rc = xreq_t::xsend (&bottom, 0);
rc = dealer_t::xsend (&bottom, 0);
if (rc != 0)
return -1;
message_begins = false;
......@@ -62,7 +62,7 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_)
bool more = msg_->flags () & msg_t::more ? true : false;
int rc = xreq_t::xsend (msg_, flags_);
int rc = dealer_t::xsend (msg_, flags_);
if (rc != 0)
return rc;
......@@ -85,14 +85,14 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
// First part of the reply should be the original request ID.
if (message_begins) {
int rc = xreq_t::xrecv (msg_, flags_);
int rc = dealer_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
// TODO: This should also close the connection with the peer!
if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) {
while (true) {
int rc = xreq_t::xrecv (msg_, flags_);
int rc = dealer_t::xrecv (msg_, flags_);
errno_assert (rc == 0);
if (!(msg_->flags () & msg_t::more))
break;
......@@ -106,7 +106,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
message_begins = false;
}
int rc = xreq_t::xrecv (msg_, flags_);
int rc = dealer_t::xrecv (msg_, flags_);
if (rc != 0)
return rc;
......@@ -126,7 +126,7 @@ bool zmq::req_t::xhas_in ()
if (!receiving_reply)
return false;
return xreq_t::xhas_in ();
return dealer_t::xhas_in ();
}
bool zmq::req_t::xhas_out ()
......@@ -134,13 +134,13 @@ bool zmq::req_t::xhas_out ()
if (receiving_reply)
return false;
return xreq_t::xhas_out ();
return dealer_t::xhas_out ();
}
zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const address_t *addr_) :
xreq_session_t (io_thread_, connect_, socket_, options_, addr_),
dealer_session_t (io_thread_, connect_, socket_, options_, addr_),
state (identity)
{
}
......@@ -156,21 +156,21 @@ int zmq::req_session_t::write (msg_t *msg_)
case bottom:
if (msg_->flags () == msg_t::more && msg_->size () == 0) {
state = body;
return xreq_session_t::write (msg_);
return dealer_session_t::write (msg_);
}
break;
case body:
if (msg_->flags () == msg_t::more)
return xreq_session_t::write (msg_);
return dealer_session_t::write (msg_);
if (msg_->flags () == 0) {
state = bottom;
return xreq_session_t::write (msg_);
return dealer_session_t::write (msg_);
}
break;
case identity:
if (msg_->flags () == 0) {
state = bottom;
return xreq_session_t::write (msg_);
return dealer_session_t::write (msg_);
}
break;
}
......
......@@ -23,7 +23,7 @@
#ifndef __ZMQ_REQ_HPP_INCLUDED__
#define __ZMQ_REQ_HPP_INCLUDED__
#include "xreq.hpp"
#include "dealer.hpp"
#include "stdint.hpp"
namespace zmq
......@@ -34,7 +34,7 @@ namespace zmq
class io_thread_t;
class socket_base_t;
class req_t : public xreq_t
class req_t : public dealer_t
{
public:
......@@ -61,7 +61,7 @@ namespace zmq
const req_t &operator = (const req_t&);
};
class req_session_t : public xreq_session_t
class req_session_t : public dealer_session_t
{
public:
......
......@@ -20,14 +20,14 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "xrep.hpp"
#include "router.hpp"
#include "pipe.hpp"
#include "wire.hpp"
#include "random.hpp"
#include "likely.hpp"
#include "err.hpp"
zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
prefetched (0),
more_in (false),
......@@ -36,9 +36,9 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
next_peer_id (generate_random ()),
fail_unroutable(false)
{
options.type = ZMQ_XREP;
options.type = ZMQ_ROUTER;
// TODO: Uncomment the following line when XREP will become true XREP
// TODO: Uncomment the following line when ROUTER will become true ROUTER
// rather than generic router socket.
// If peer disconnect there's noone to send reply to anyway. We can drop
// all the outstanding requests from that peer.
......@@ -50,13 +50,13 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
prefetched_msg.init ();
}
zmq::xrep_t::~xrep_t ()
zmq::router_t::~router_t ()
{
zmq_assert (outpipes.empty ());
prefetched_msg.close ();
}
void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{
zmq_assert (pipe_);
......@@ -78,7 +78,7 @@ void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
fq.attach (pipe_);
}
int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
int zmq::router_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
if (option_ != ZMQ_FAIL_UNROUTABLE) {
......@@ -93,7 +93,7 @@ int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
return 0;
}
void zmq::xrep_t::xterminated (pipe_t *pipe_)
void zmq::router_t::xterminated (pipe_t *pipe_)
{
fq.terminated (pipe_);
......@@ -109,12 +109,12 @@ void zmq::xrep_t::xterminated (pipe_t *pipe_)
zmq_assert (false);
}
void zmq::xrep_t::xread_activated (pipe_t *pipe_)
void zmq::router_t::xread_activated (pipe_t *pipe_)
{
fq.activated (pipe_);
}
void zmq::xrep_t::xwrite_activated (pipe_t *pipe_)
void zmq::router_t::xwrite_activated (pipe_t *pipe_)
{
for (outpipes_t::iterator it = outpipes.begin ();
it != outpipes.end (); ++it) {
......@@ -127,7 +127,7 @@ void zmq::xrep_t::xwrite_activated (pipe_t *pipe_)
zmq_assert (false);
}
int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
int zmq::router_t::xsend (msg_t *msg_, int flags_)
{
// If this is the first part of the message it's the ID of the
// peer to send the message to.
......@@ -200,7 +200,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
return 0;
}
int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
int zmq::router_t::xrecv (msg_t *msg_, int flags_)
{
// if there is a prefetched identity, return it.
if (prefetched == 2)
......@@ -280,7 +280,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
return 0;
}
int zmq::xrep_t::rollback (void)
int zmq::router_t::rollback (void)
{
if (current_out) {
current_out->rollback ();
......@@ -290,7 +290,7 @@ int zmq::xrep_t::rollback (void)
return 0;
}
bool zmq::xrep_t::xhas_in ()
bool zmq::router_t::xhas_in ()
{
// If we are in the middle of reading the messages, there are
// definitely more parts available.
......@@ -305,7 +305,7 @@ bool zmq::xrep_t::xhas_in ()
// it will be identity of the peer sending the message.
msg_t id;
id.init ();
int rc = xrep_t::xrecv (&id, ZMQ_DONTWAIT);
int rc = router_t::xrecv (&id, ZMQ_DONTWAIT);
if (rc != 0 && errno == EAGAIN) {
id.close ();
return false;
......@@ -321,22 +321,22 @@ bool zmq::xrep_t::xhas_in ()
return true;
}
bool zmq::xrep_t::xhas_out ()
bool zmq::router_t::xhas_out ()
{
// In theory, XREP socket is always ready for writing. Whether actual
// In theory, ROUTER socket is always ready for writing. Whether actual
// attempt to write succeeds depends on whitch pipe the message is going
// to be routed to.
return true;
}
zmq::xrep_session_t::xrep_session_t (io_thread_t *io_thread_, bool connect_,
zmq::router_session_t::router_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const address_t *addr_) :
session_base_t (io_thread_, connect_, socket_, options_, addr_)
{
}
zmq::xrep_session_t::~xrep_session_t ()
zmq::router_session_t::~router_session_t ()
{
}
......@@ -20,8 +20,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_XREP_HPP_INCLUDED__
#define __ZMQ_XREP_HPP_INCLUDED__
#ifndef __ZMQ_ROUTER_HPP_INCLUDED__
#define __ZMQ_ROUTER_HPP_INCLUDED__
#include <map>
......@@ -39,13 +39,13 @@ namespace zmq
class pipe_t;
// TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
class xrep_t :
class router_t :
public socket_base_t
{
public:
xrep_t (zmq::ctx_t *parent_, uint32_t tid_, int sid);
~xrep_t ();
router_t (zmq::ctx_t *parent_, uint32_t tid_, int sid);
~router_t ();
// Overloads of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_);
......@@ -104,23 +104,23 @@ namespace zmq
// If true, fail on unroutable messages instead of silently dropping them.
bool fail_unroutable;
xrep_t (const xrep_t&);
const xrep_t &operator = (const xrep_t&);
router_t (const router_t&);
const router_t &operator = (const router_t&);
};
class xrep_session_t : public session_base_t
class router_session_t : public session_base_t
{
public:
xrep_session_t (zmq::io_thread_t *io_thread_, bool connect_,
router_session_t (zmq::io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_,
const address_t *addr_);
~xrep_session_t ();
~router_session_t ();
private:
xrep_session_t (const xrep_session_t&);
const xrep_session_t &operator = (const xrep_session_t&);
router_session_t (const router_session_t&);
const router_session_t &operator = (const router_session_t&);
};
}
......
......@@ -33,9 +33,9 @@
#include "address.hpp"
#include "req.hpp"
#include "xreq.hpp"
#include "dealer.hpp"
#include "rep.hpp"
#include "xrep.hpp"
#include "router.hpp"
#include "pub.hpp"
#include "xpub.hpp"
#include "sub.hpp"
......@@ -54,15 +54,15 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
s = new (std::nothrow) req_session_t (io_thread_, connect_,
socket_, options_, addr_);
break;
case ZMQ_XREQ:
s = new (std::nothrow) xreq_session_t (io_thread_, connect_,
case ZMQ_DEALER:
s = new (std::nothrow) dealer_session_t (io_thread_, connect_,
socket_, options_, addr_);
case ZMQ_REP:
s = new (std::nothrow) rep_session_t (io_thread_, connect_,
socket_, options_, addr_);
break;
case ZMQ_XREP:
s = new (std::nothrow) xrep_session_t (io_thread_, connect_,
case ZMQ_ROUTER:
s = new (std::nothrow) router_session_t (io_thread_, connect_,
socket_, options_, addr_);
break;
case ZMQ_PUB:
......
......@@ -64,8 +64,8 @@
#include "rep.hpp"
#include "pull.hpp"
#include "push.hpp"
#include "xreq.hpp"
#include "xrep.hpp"
#include "dealer.hpp"
#include "router.hpp"
#include "xpub.hpp"
#include "xsub.hpp"
......@@ -95,11 +95,11 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
case ZMQ_REP:
s = new (std::nothrow) rep_t (parent_, tid_, sid_);
break;
case ZMQ_XREQ:
s = new (std::nothrow) xreq_t (parent_, tid_, sid_);
case ZMQ_DEALER:
s = new (std::nothrow) dealer_t (parent_, tid_, sid_);
break;
case ZMQ_XREP:
s = new (std::nothrow) xrep_t (parent_, tid_, sid_);
case ZMQ_ROUTER:
s = new (std::nothrow) router_t (parent_, tid_, sid_);
break;
case ZMQ_PULL:
s = new (std::nothrow) pull_t (parent_, tid_, sid_);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment