Commit 72a793f7 authored by Martin Sustrik's avatar Martin Sustrik

ZMQ_GENERIC renamed to ZMQ_ROUTER

Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent cb2d7156
...@@ -158,7 +158,7 @@ ZMQ_EXPORT int zmq_term (void *context); ...@@ -158,7 +158,7 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_PUSH 8 #define ZMQ_PUSH 8
#define ZMQ_XPUB 9 #define ZMQ_XPUB 9
#define ZMQ_XSUB 10 #define ZMQ_XSUB 10
#define ZMQ_GENERIC 13 #define ZMQ_ROUTER 13
/* Socket options. */ /* Socket options. */
#define ZMQ_AFFINITY 4 #define ZMQ_AFFINITY 4
......
...@@ -23,7 +23,6 @@ libzmq_la_SOURCES = \ ...@@ -23,7 +23,6 @@ libzmq_la_SOURCES = \
err.hpp \ err.hpp \
fd.hpp \ fd.hpp \
fq.hpp \ fq.hpp \
generic.hpp \
io_object.hpp \ io_object.hpp \
io_thread.hpp \ io_thread.hpp \
ip.hpp \ ip.hpp \
...@@ -55,6 +54,7 @@ libzmq_la_SOURCES = \ ...@@ -55,6 +54,7 @@ libzmq_la_SOURCES = \
reaper.hpp \ reaper.hpp \
rep.hpp \ rep.hpp \
req.hpp \ req.hpp \
router.hpp \
select.hpp \ select.hpp \
semaphore.hpp \ semaphore.hpp \
session.hpp \ session.hpp \
...@@ -89,7 +89,6 @@ libzmq_la_SOURCES = \ ...@@ -89,7 +89,6 @@ libzmq_la_SOURCES = \
epoll.cpp \ epoll.cpp \
err.cpp \ err.cpp \
fq.cpp \ fq.cpp \
generic.cpp \
io_object.cpp \ io_object.cpp \
io_thread.cpp \ io_thread.cpp \
ip.cpp \ ip.cpp \
...@@ -113,6 +112,7 @@ libzmq_la_SOURCES = \ ...@@ -113,6 +112,7 @@ libzmq_la_SOURCES = \
reaper.cpp \ reaper.cpp \
pub.cpp \ pub.cpp \
random.cpp \ random.cpp \
router.cpp \
rep.cpp \ rep.cpp \
req.cpp \ req.cpp \
select.cpp \ select.cpp \
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "generic.hpp" #include "router.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "wire.hpp" #include "wire.hpp"
#include "random.hpp" #include "random.hpp"
...@@ -26,7 +26,7 @@ ...@@ -26,7 +26,7 @@
#include "wire.hpp" #include "wire.hpp"
#include "err.hpp" #include "err.hpp"
zmq::generic_t::generic_t (class ctx_t *parent_, uint32_t tid_) : zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_), socket_base_t (parent_, tid_),
prefetched (false), prefetched (false),
more_in (false), more_in (false),
...@@ -34,18 +34,18 @@ zmq::generic_t::generic_t (class ctx_t *parent_, uint32_t tid_) : ...@@ -34,18 +34,18 @@ zmq::generic_t::generic_t (class ctx_t *parent_, uint32_t tid_) :
more_out (false), more_out (false),
next_peer_id (generate_random ()) next_peer_id (generate_random ())
{ {
options.type = ZMQ_GENERIC; options.type = ZMQ_ROUTER;
prefetched_msg.init (); prefetched_msg.init ();
} }
zmq::generic_t::~generic_t () zmq::router_t::~router_t ()
{ {
zmq_assert (outpipes.empty ()); zmq_assert (outpipes.empty ());
prefetched_msg.close (); prefetched_msg.close ();
} }
void zmq::generic_t::xattach_pipe (pipe_t *pipe_) void zmq::router_t::xattach_pipe (pipe_t *pipe_)
{ {
zmq_assert (pipe_); zmq_assert (pipe_);
...@@ -82,7 +82,7 @@ void zmq::generic_t::xattach_pipe (pipe_t *pipe_) ...@@ -82,7 +82,7 @@ void zmq::generic_t::xattach_pipe (pipe_t *pipe_)
++next_peer_id; ++next_peer_id;
} }
void zmq::generic_t::xterminated (pipe_t *pipe_) void zmq::router_t::xterminated (pipe_t *pipe_)
{ {
fq.terminated (pipe_); fq.terminated (pipe_);
...@@ -104,12 +104,12 @@ void zmq::generic_t::xterminated (pipe_t *pipe_) ...@@ -104,12 +104,12 @@ void zmq::generic_t::xterminated (pipe_t *pipe_)
zmq_assert (false); zmq_assert (false);
} }
void zmq::generic_t::xread_activated (pipe_t *pipe_) void zmq::router_t::xread_activated (pipe_t *pipe_)
{ {
fq.activated (pipe_); fq.activated (pipe_);
} }
void zmq::generic_t::xwrite_activated (pipe_t *pipe_) void zmq::router_t::xwrite_activated (pipe_t *pipe_)
{ {
for (outpipes_t::iterator it = outpipes.begin (); for (outpipes_t::iterator it = outpipes.begin ();
it != outpipes.end (); ++it) { it != outpipes.end (); ++it) {
...@@ -122,7 +122,7 @@ void zmq::generic_t::xwrite_activated (pipe_t *pipe_) ...@@ -122,7 +122,7 @@ void zmq::generic_t::xwrite_activated (pipe_t *pipe_)
zmq_assert (false); zmq_assert (false);
} }
int zmq::generic_t::xsend (msg_t *msg_, int flags_) int zmq::router_t::xsend (msg_t *msg_, int flags_)
{ {
// If this is the first part of the message it's the ID of the // If this is the first part of the message it's the ID of the
// peer to send the message to. // peer to send the message to.
...@@ -188,7 +188,7 @@ int zmq::generic_t::xsend (msg_t *msg_, int flags_) ...@@ -188,7 +188,7 @@ int zmq::generic_t::xsend (msg_t *msg_, int flags_)
return 0; return 0;
} }
int zmq::generic_t::xrecv (msg_t *msg_, int flags_) int zmq::router_t::xrecv (msg_t *msg_, int flags_)
{ {
// If there's a queued command, pass it to the caller. // If there's a queued command, pass it to the caller.
if (unlikely (!more_in && !pending_commands.empty ())) { if (unlikely (!more_in && !pending_commands.empty ())) {
...@@ -236,7 +236,7 @@ int zmq::generic_t::xrecv (msg_t *msg_, int flags_) ...@@ -236,7 +236,7 @@ int zmq::generic_t::xrecv (msg_t *msg_, int flags_)
return 0; return 0;
} }
int zmq::generic_t::rollback (void) int zmq::router_t::rollback (void)
{ {
if (current_out) { if (current_out) {
current_out->rollback (); current_out->rollback ();
...@@ -246,14 +246,14 @@ int zmq::generic_t::rollback (void) ...@@ -246,14 +246,14 @@ int zmq::generic_t::rollback (void)
return 0; return 0;
} }
bool zmq::generic_t::xhas_in () bool zmq::router_t::xhas_in ()
{ {
if (prefetched) if (prefetched)
return true; return true;
return fq.has_in (); return fq.has_in ();
} }
bool zmq::generic_t::xhas_out () bool zmq::router_t::xhas_out ()
{ {
// In theory, GENERIC socket is always ready for writing. Whether actual // In theory, GENERIC socket is always ready for writing. Whether actual
// attempt to write succeeds depends on whitch pipe the message is going // attempt to write succeeds depends on whitch pipe the message is going
......
...@@ -18,8 +18,8 @@ ...@@ -18,8 +18,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef __ZMQ_GENERIC_HPP_INCLUDED__ #ifndef __ZMQ_ROUTER_HPP_INCLUDED__
#define __ZMQ_GENERIC_HPP_INCLUDED__ #define __ZMQ_ROUTER_HPP_INCLUDED__
#include <map> #include <map>
#include <deque> #include <deque>
...@@ -32,14 +32,13 @@ ...@@ -32,14 +32,13 @@
namespace zmq namespace zmq
{ {
// TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm. class router_t :
class generic_t :
public socket_base_t public socket_base_t
{ {
public: public:
generic_t (class ctx_t *parent_, uint32_t tid_); router_t (class ctx_t *parent_, uint32_t tid_);
~generic_t (); ~router_t ();
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipe (class pipe_t *pipe_); void xattach_pipe (class pipe_t *pipe_);
...@@ -99,8 +98,8 @@ namespace zmq ...@@ -99,8 +98,8 @@ namespace zmq
typedef std::deque <pending_command_t> pending_commands_t; typedef std::deque <pending_command_t> pending_commands_t;
pending_commands_t pending_commands; pending_commands_t pending_commands;
generic_t (const generic_t&); router_t (const router_t&);
const generic_t &operator = (const generic_t&); const router_t &operator = (const router_t&);
}; };
} }
......
...@@ -58,7 +58,7 @@ ...@@ -58,7 +58,7 @@
#include "xrep.hpp" #include "xrep.hpp"
#include "xpub.hpp" #include "xpub.hpp"
#include "xsub.hpp" #include "xsub.hpp"
#include "generic.hpp" #include "router.hpp"
bool zmq::socket_base_t::check_tag () bool zmq::socket_base_t::check_tag ()
{ {
...@@ -104,8 +104,8 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, ...@@ -104,8 +104,8 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
case ZMQ_XSUB: case ZMQ_XSUB:
s = new (std::nothrow) xsub_t (parent_, tid_); s = new (std::nothrow) xsub_t (parent_, tid_);
break; break;
case ZMQ_GENERIC: case ZMQ_ROUTER:
s = new (std::nothrow) generic_t (parent_, tid_); s = new (std::nothrow) router_t (parent_, tid_);
break; break;
default: default:
errno = EINVAL; errno = EINVAL;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment