Commit fa6bf24d authored by Martin Sustrik's avatar Martin Sustrik

XREP & XREQ socket types added; zmq_queue device added

parent c43aded5
...@@ -149,8 +149,10 @@ ZMQ_EXPORT int zmq_term (void *context); ...@@ -149,8 +149,10 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_SUB 2 #define ZMQ_SUB 2
#define ZMQ_REQ 3 #define ZMQ_REQ 3
#define ZMQ_REP 4 #define ZMQ_REP 4
#define ZMQ_UPSTREAM 5 #define ZMQ_XREQ 5
#define ZMQ_DOWNSTREAM 6 #define ZMQ_XREP 6
#define ZMQ_UPSTREAM 7
#define ZMQ_DOWNSTREAM 8
#define ZMQ_HWM 1 #define ZMQ_HWM 1
#define ZMQ_LWM 2 #define ZMQ_LWM 2
......
...@@ -130,8 +130,10 @@ ...@@ -130,8 +130,10 @@
(defconstant sub 2) (defconstant sub 2)
(defconstant req 3) (defconstant req 3)
(defconstant rep 4) (defconstant rep 4)
(defconstant upstream 5) (defconstant xreq 5)
(defconstant downstream 6) (defconstant xrep 6)
(defconstant upstream 7)
(defconstant downstream 8)
(defcfun* ("zmq_socket" socket) :pointer (defcfun* ("zmq_socket" socket) :pointer
(context :pointer) (context :pointer)
......
...@@ -34,8 +34,10 @@ public class Socket ...@@ -34,8 +34,10 @@ public class Socket
public static final int SUB = 2; public static final int SUB = 2;
public static final int REQ = 3; public static final int REQ = 3;
public static final int REP = 4; public static final int REP = 4;
public static final int UPSTREAM = 4; public static final int XREQ = 5;
public static final int DOWNSTREAM = 4; public static final int XREP = 6;
public static final int UPSTREAM = 7;
public static final int DOWNSTREAM = 8;
public static final int HWM = 1; public static final int HWM = 1;
public static final int LWM = 2; public static final int LWM = 2;
......
...@@ -498,6 +498,12 @@ PyMODINIT_FUNC initlibpyzmq () ...@@ -498,6 +498,12 @@ PyMODINIT_FUNC initlibpyzmq ()
t = PyInt_FromLong (ZMQ_REP); t = PyInt_FromLong (ZMQ_REP);
PyDict_SetItemString (dict, "REP", t); PyDict_SetItemString (dict, "REP", t);
Py_DECREF (t); Py_DECREF (t);
t = PyInt_FromLong (ZMQ_XREQ);
PyDict_SetItemString (dict, "XREQ", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_XREP);
PyDict_SetItemString (dict, "XREP", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_UPSTREAM); t = PyInt_FromLong (ZMQ_UPSTREAM);
PyDict_SetItemString (dict, "UPSTREAM", t); PyDict_SetItemString (dict, "UPSTREAM", t);
Py_DECREF (t); Py_DECREF (t);
......
...@@ -277,6 +277,8 @@ extern "C" void Init_librbzmq () ...@@ -277,6 +277,8 @@ extern "C" void Init_librbzmq ()
rb_define_global_const ("PUB", INT2NUM (ZMQ_PUB)); rb_define_global_const ("PUB", INT2NUM (ZMQ_PUB));
rb_define_global_const ("REQ", INT2NUM (ZMQ_REQ)); rb_define_global_const ("REQ", INT2NUM (ZMQ_REQ));
rb_define_global_const ("REP", INT2NUM (ZMQ_REP)); rb_define_global_const ("REP", INT2NUM (ZMQ_REP));
rb_define_global_const ("XREQ", INT2NUM (ZMQ_XREQ));
rb_define_global_const ("XREP", INT2NUM (ZMQ_XREP));
rb_define_global_const ("UPSTREAM", INT2NUM (ZMQ_UPSTREAM)); rb_define_global_const ("UPSTREAM", INT2NUM (ZMQ_UPSTREAM));
rb_define_global_const ("DOWNSTREAM", INT2NUM (ZMQ_DOWNSTREAM)); rb_define_global_const ("DOWNSTREAM", INT2NUM (ZMQ_DOWNSTREAM));
......
...@@ -520,6 +520,15 @@ if test "x$with_streamer" != "xno"; then ...@@ -520,6 +520,15 @@ if test "x$with_streamer" != "xno"; then
streamer="yes" streamer="yes"
fi fi
# Queue device
queue="no"
AC_ARG_WITH([queue], [AS_HELP_STRING([--with-queue],
[build queue device [default=no]])], [with_queue=yes], [with_queue=no])
if test "x$with_queue" != "xno"; then
queue="yes"
fi
# Perf # Perf
perf="no" perf="no"
AC_ARG_WITH([perf], [AS_HELP_STRING([--with-perf], AC_ARG_WITH([perf], [AS_HELP_STRING([--with-perf],
...@@ -555,7 +564,8 @@ AM_CONDITIONAL(BUILD_CPP, test "x$cppzmq" = "xyes") ...@@ -555,7 +564,8 @@ AM_CONDITIONAL(BUILD_CPP, test "x$cppzmq" = "xyes")
AM_CONDITIONAL(BUILD_PGM2, test "x$pgm2_ext" = "xyes") AM_CONDITIONAL(BUILD_PGM2, test "x$pgm2_ext" = "xyes")
AM_CONDITIONAL(BUILD_NO_PGM, test "x$pgm2_ext" = "xno") AM_CONDITIONAL(BUILD_NO_PGM, test "x$pgm2_ext" = "xno")
AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes") AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes")
AM_CONDITIONAL(BUILD_STREAMER, test "x$streamer" = "xyes") AM_CONDITIONAL(BUILD_STREAMER, test "x$streamer" = "xyes")
AM_CONDITIONAL(BUILD_QUEUE, test "x$queue" = "xyes")
AM_CONDITIONAL(BUILD_PERF, test "x$perf" = "xyes") AM_CONDITIONAL(BUILD_PERF, test "x$perf" = "xyes")
AM_CONDITIONAL(BUILD_CHAT, test "x$chat" = "xyes") AM_CONDITIONAL(BUILD_CHAT, test "x$chat" = "xyes")
AM_CONDITIONAL(ON_MINGW, test "x$on_mingw32" = "xyes") AM_CONDITIONAL(ON_MINGW, test "x$on_mingw32" = "xyes")
...@@ -581,7 +591,7 @@ AC_OUTPUT(Makefile src/Makefile man/Makefile bindings/python/Makefile \ ...@@ -581,7 +591,7 @@ AC_OUTPUT(Makefile src/Makefile man/Makefile bindings/python/Makefile \
bindings/java/Makefile perf/Makefile perf/c/Makefile perf/cpp/Makefile \ bindings/java/Makefile perf/Makefile perf/c/Makefile perf/cpp/Makefile \
perf/python/Makefile perf/ruby/Makefile perf/java/Makefile src/libzmq.pc \ perf/python/Makefile perf/ruby/Makefile perf/java/Makefile src/libzmq.pc \
devices/Makefile devices/zmq_forwarder/Makefile \ devices/Makefile devices/zmq_forwarder/Makefile \
devices/zmq_streamer/Makefile bindings/Makefile \ devices/zmq_streamer/Makefile devices/zmq_queue/Makefile bindings/Makefile \
examples/Makefile examples/chat/Makefile) examples/Makefile examples/chat/Makefile)
AC_MSG_RESULT([]) AC_MSG_RESULT([])
...@@ -616,6 +626,7 @@ AC_MSG_RESULT([ inproc: yes]) ...@@ -616,6 +626,7 @@ AC_MSG_RESULT([ inproc: yes])
AC_MSG_RESULT([ Devices:]) AC_MSG_RESULT([ Devices:])
AC_MSG_RESULT([ Forwarder: $forwarder]) AC_MSG_RESULT([ Forwarder: $forwarder])
AC_MSG_RESULT([ Streamer: $streamer]) AC_MSG_RESULT([ Streamer: $streamer])
AC_MSG_RESULT([ Queue: $queue])
AC_MSG_RESULT([ Performance tests: $perf]) AC_MSG_RESULT([ Performance tests: $perf])
AC_MSG_RESULT([ Examples:]) AC_MSG_RESULT([ Examples:])
AC_MSG_RESULT([ Chat: $chat]) AC_MSG_RESULT([ Chat: $chat])
......
...@@ -6,5 +6,9 @@ if BUILD_STREAMER ...@@ -6,5 +6,9 @@ if BUILD_STREAMER
STREAMER_DIR = zmq_streamer STREAMER_DIR = zmq_streamer
endif endif
SUBDIRS = $(FORWARDER_DIR) $(STREAMER_DIR) if BUILD_QUEUE
DIST_SUBDIRS = zmq_forwarder zmq_streamer QUEUE_DIR = zmq_queue
endif
SUBDIRS = $(FORWARDER_DIR) $(STREAMER_DIR) $(QUEUE_DIR)
DIST_SUBDIRS = zmq_forwarder zmq_streamer zmq_queue
INCLUDES = -I$(top_builddir)/bindings/c INCLUDES = -I$(top_srcdir)/bindings/cpp -I$(top_srcdir)/bindings/c
bin_PROGRAMS = zmq_forwarder bin_PROGRAMS = zmq_forwarder
......
...@@ -29,7 +29,7 @@ int main (int argc, char *argv []) ...@@ -29,7 +29,7 @@ int main (int argc, char *argv [])
XMLNode root = XMLNode::parseFile (argv [1]); XMLNode root = XMLNode::parseFile (argv [1]);
if (root.isEmpty ()) { if (root.isEmpty ()) {
fprintf (stderr, "configuration file not found\n"); fprintf (stderr, "configuration file not found or not an XML file\n");
return 1; return 1;
} }
......
INCLUDES = -I$(top_srcdir)/bindings/cpp -I$(top_srcdir)/bindings/c
bin_PROGRAMS = zmq_queue
zmq_queue_LDADD = $(top_builddir)/src/libzmq.la
zmq_queue_SOURCES = zmq_queue.cpp
zmq_queue_CXXFLAGS = -Wall -pedantic -Werror
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../../bindings/cpp/zmq.hpp"
#include "../../foreign/xmlParser/xmlParser.cpp"
int main (int argc, char *argv [])
{
if (argc != 2) {
fprintf (stderr, "usage: zmq_queue <config-file>\n");
return 1;
}
XMLNode root = XMLNode::parseFile (argv [1]);
if (root.isEmpty ()) {
fprintf (stderr, "configuration file not found or not an XML file\n");
return 1;
}
if (strcmp (root.getName (), "queue") != 0) {
fprintf (stderr, "root element in the configuration file should be "
"named 'queue'\n");
return 1;
}
XMLNode in_node = root.getChildNode ("in");
if (in_node.isEmpty ()) {
fprintf (stderr, "'in' node is missing in the configuration file\n");
return 1;
}
XMLNode out_node = root.getChildNode ("out");
if (out_node.isEmpty ()) {
fprintf (stderr, "'out' node is missing in the configuration file\n");
return 1;
}
// TODO: make the number of I/O threads configurable.
zmq::context_t ctx (1, 1);
zmq::socket_t in_socket (ctx, ZMQ_XREP);
zmq::socket_t out_socket (ctx, ZMQ_XREQ);
int n = 0;
while (true) {
XMLNode bind = in_node.getChildNode ("bind", n);
if (bind.isEmpty ())
break;
const char *addr = bind.getAttribute ("addr");
if (!addr) {
fprintf (stderr, "'bind' node is missing 'addr' attribute\n");
return 1;
}
in_socket.bind (addr);
n++;
}
n = 0;
while (true) {
XMLNode connect = in_node.getChildNode ("connect", n);
if (connect.isEmpty ())
break;
const char *addr = connect.getAttribute ("addr");
if (!addr) {
fprintf (stderr, "'connect' node is missing 'addr' attribute\n");
return 1;
}
in_socket.connect (addr);
n++;
}
n = 0;
while (true) {
XMLNode bind = out_node.getChildNode ("bind", n);
if (bind.isEmpty ())
break;
const char *addr = bind.getAttribute ("addr");
if (!addr) {
fprintf (stderr, "'bind' node is missing 'addr' attribute\n");
return 1;
}
out_socket.bind (addr);
n++;
}
n = 0;
while (true) {
XMLNode connect = out_node.getChildNode ("connect", n);
if (connect.isEmpty ())
break;
const char *addr = connect.getAttribute ("addr");
if (!addr) {
fprintf (stderr, "'connect' node is missing 'addr' attribute\n");
return 1;
}
out_socket.connect (addr);
n++;
}
zmq::message_t msg;
while (true) {
in_socket.recv (&msg);
out_socket.send (msg);
}
return 0;
}
INCLUDES = -I$(top_builddir)/bindings/c INCLUDES = -I$(top_srcdir)/bindings/cpp -I$(top_srcdir)/bindings/c
bin_PROGRAMS = zmq_streamer bin_PROGRAMS = zmq_streamer
......
...@@ -29,7 +29,7 @@ int main (int argc, char *argv []) ...@@ -29,7 +29,7 @@ int main (int argc, char *argv [])
XMLNode root = XMLNode::parseFile (argv [1]); XMLNode root = XMLNode::parseFile (argv [1]);
if (root.isEmpty ()) { if (root.isEmpty ()) {
fprintf (stderr, "configuration file not found\n"); fprintf (stderr, "configuration file not found or not an XML file\n");
return 1; return 1;
} }
......
...@@ -105,6 +105,8 @@ libzmq_la_SOURCES = app_thread.hpp \ ...@@ -105,6 +105,8 @@ libzmq_la_SOURCES = app_thread.hpp \
uuid.hpp \ uuid.hpp \
windows.hpp \ windows.hpp \
wire.hpp \ wire.hpp \
xrep.hpp \
xreq.hpp \
yarray.hpp \ yarray.hpp \
yarray_item.hpp \ yarray_item.hpp \
ypipe.hpp \ ypipe.hpp \
...@@ -150,6 +152,8 @@ libzmq_la_SOURCES = app_thread.hpp \ ...@@ -150,6 +152,8 @@ libzmq_la_SOURCES = app_thread.hpp \
thread.cpp \ thread.cpp \
upstream.cpp \ upstream.cpp \
uuid.cpp \ uuid.cpp \
xrep.cpp \
xreq.cpp \
ypollset.cpp \ ypollset.cpp \
zmq.cpp \ zmq.cpp \
zmq_connecter.cpp \ zmq_connecter.cpp \
......
...@@ -45,6 +45,8 @@ ...@@ -45,6 +45,8 @@
#include "sub.hpp" #include "sub.hpp"
#include "req.hpp" #include "req.hpp"
#include "rep.hpp" #include "rep.hpp"
#include "xreq.hpp"
#include "xrep.hpp"
#include "upstream.hpp" #include "upstream.hpp"
#include "downstream.hpp" #include "downstream.hpp"
...@@ -175,6 +177,12 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) ...@@ -175,6 +177,12 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
case ZMQ_REP: case ZMQ_REP:
s = new rep_t (this); s = new rep_t (this);
break; break;
case ZMQ_XREQ:
s = new xreq_t (this);
break;
case ZMQ_XREP:
s = new xrep_t (this);
break;
case ZMQ_UPSTREAM: case ZMQ_UPSTREAM:
s = new upstream_t (this); s = new upstream_t (this);
break; break;
......
...@@ -46,7 +46,6 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, ...@@ -46,7 +46,6 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
write_pos (0), write_pos (0),
first_message_offset (-1) first_message_offset (-1)
{ {
} }
int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
...@@ -56,7 +55,6 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) ...@@ -56,7 +55,6 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
void zmq::pgm_sender_t::plug (i_inout *inout_) void zmq::pgm_sender_t::plug (i_inout *inout_)
{ {
// Alocate 2 fds for PGM socket. // Alocate 2 fds for PGM socket.
int downlink_socket_fd = 0; int downlink_socket_fd = 0;
int uplink_socket_fd = 0; int uplink_socket_fd = 0;
...@@ -119,7 +117,6 @@ void zmq::pgm_sender_t::in_event () ...@@ -119,7 +117,6 @@ void zmq::pgm_sender_t::in_event ()
void zmq::pgm_sender_t::out_event () void zmq::pgm_sender_t::out_event ()
{ {
// POLLOUT event from send socket. If write buffer is empty, // POLLOUT event from send socket. If write buffer is empty,
// try to read new data from the encoder. // try to read new data from the encoder.
if (write_pos == write_size) { if (write_pos == write_size) {
...@@ -159,7 +156,6 @@ void zmq::pgm_sender_t::out_event () ...@@ -159,7 +156,6 @@ void zmq::pgm_sender_t::out_event ()
write_pos += nbytes; write_pos += nbytes;
} }
} }
size_t zmq::pgm_sender_t::write_one_pkt_with_offset (unsigned char *data_, size_t zmq::pgm_sender_t::write_one_pkt_with_offset (unsigned char *data_,
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../bindings/c/zmq.h"
#include "xrep.hpp"
#include "err.hpp"
#include "pipe.hpp"
zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
options.requires_in = true;
options.requires_out = true;
}
zmq::xrep_t::~xrep_t ()
{
}
void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
zmq_assert (false);
}
void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)
{
zmq_assert (false);
}
void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_)
{
zmq_assert (false);
}
void zmq::xrep_t::xkill (class reader_t *pipe_)
{
zmq_assert (false);
}
void zmq::xrep_t::xrevive (class reader_t *pipe_)
{
zmq_assert (false);
}
int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
errno = EINVAL;
return -1;
}
int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
{
zmq_assert (false);
}
int zmq::xrep_t::xflush ()
{
zmq_assert (false);
}
int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
{
zmq_assert (false);
}
bool zmq::xrep_t::xhas_in ()
{
zmq_assert (false);
}
bool zmq::xrep_t::xhas_out ()
{
zmq_assert (false);
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_XREP_HPP_INCLUDED__
#define __ZMQ_XREP_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
namespace zmq
{
class xrep_t : public socket_base_t
{
public:
xrep_t (class app_thread_t *parent_);
~xrep_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
private:
xrep_t (const xrep_t&);
void operator = (const xrep_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../bindings/c/zmq.h"
#include "xreq.hpp"
#include "err.hpp"
#include "pipe.hpp"
zmq::xreq_t::xreq_t (class app_thread_t *parent_) :
socket_base_t (parent_)
{
options.requires_in = true;
options.requires_out = true;
}
zmq::xreq_t::~xreq_t ()
{
}
void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
zmq_assert (false);
}
void zmq::xreq_t::xdetach_inpipe (class reader_t *pipe_)
{
zmq_assert (false);
}
void zmq::xreq_t::xdetach_outpipe (class writer_t *pipe_)
{
zmq_assert (false);
}
void zmq::xreq_t::xkill (class reader_t *pipe_)
{
zmq_assert (false);
}
void zmq::xreq_t::xrevive (class reader_t *pipe_)
{
zmq_assert (false);
}
int zmq::xreq_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
errno = EINVAL;
return -1;
}
int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_)
{
zmq_assert (false);
}
int zmq::xreq_t::xflush ()
{
zmq_assert (false);
}
int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_)
{
zmq_assert (false);
}
bool zmq::xreq_t::xhas_in ()
{
zmq_assert (false);
}
bool zmq::xreq_t::xhas_out ()
{
zmq_assert (false);
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_XREQ_HPP_INCLUDED__
#define __ZMQ_XREQ_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
namespace zmq
{
class xreq_t : public socket_base_t
{
public:
xreq_t (class app_thread_t *parent_);
~xreq_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
private:
xreq_t (const xreq_t&);
void operator = (const xreq_t&);
};
}
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment