Commit c637bf29 authored by malosek's avatar malosek

Merge branch 'master' of git@github.com:sustrik/zeromq2

parents 9ccf2b42 fa1641af
...@@ -9,6 +9,7 @@ Dirk O. Kaar ...@@ -9,6 +9,7 @@ Dirk O. Kaar
Erich Heine Erich Heine
Frank Denis Frank Denis
George Neill George Neill
Jon Dyte
Martin Hurton Martin Hurton
Martin Lucina Martin Lucina
Martin Sustrik Martin Sustrik
...@@ -36,3 +37,4 @@ Matt Muggeridge ...@@ -36,3 +37,4 @@ Matt Muggeridge
Paulo Henrique Silva Paulo Henrique Silva
Peter Lemenkov Peter Lemenkov
Robert Zhang Robert Zhang
Vitaly Mayatskikh
...@@ -2,8 +2,12 @@ if BUILD_PERF ...@@ -2,8 +2,12 @@ if BUILD_PERF
DIR_PERF = perf DIR_PERF = perf
endif endif
SUBDIRS = src $(DIR_PERF) devices bindings if INSTALL_MAN
DIST_SUBDIRS = src perf devices bindings DIR_MAN = man
endif
SUBDIRS = src $(DIR_MAN) $(DIR_PERF) devices bindings
DIST_SUBDIRS = src man perf devices bindings
EXTRA_DIST = $(top_srcdir)/foreign/openpgm/@pgm1_basename@.tar.bz2 \ EXTRA_DIST = $(top_srcdir)/foreign/openpgm/@pgm1_basename@.tar.bz2 \
$(top_srcdir)/foreign/openpgm/@pgm2_basename@ \ $(top_srcdir)/foreign/openpgm/@pgm2_basename@ \
......
...@@ -63,6 +63,9 @@ extern "C" { ...@@ -63,6 +63,9 @@ extern "C" {
#ifndef EADDRNOTAVAIL #ifndef EADDRNOTAVAIL
#define EADDRNOTAVAIL (ZMQ_HAUSNUMERO + 6) #define EADDRNOTAVAIL (ZMQ_HAUSNUMERO + 6)
#endif #endif
#ifndef ECONNREFUSED
#define ECONNREFUSED (ZMQ_HAUSNUMERO + 7)
#endif
// Native 0MQ error codes. // Native 0MQ error codes.
#define EMTHREAD (ZMQ_HAUSNUMERO + 50) #define EMTHREAD (ZMQ_HAUSNUMERO + 50)
...@@ -180,7 +183,7 @@ ZMQ_EXPORT int zmq_term (void *context); ...@@ -180,7 +183,7 @@ ZMQ_EXPORT int zmq_term (void *context);
// Socket to send requests and receive replies. Requests are // Socket to send requests and receive replies. Requests are
// load-balanced among all the peers. This socket type allows // load-balanced among all the peers. This socket type allows
// only an alternated sequence of send's and recv's // only an alternated sequence of send's and recv's.
#define ZMQ_REQ 3 #define ZMQ_REQ 3
// Socket to receive requests and send replies. This socket type allows // Socket to receive requests and send replies. This socket type allows
...@@ -188,6 +191,12 @@ ZMQ_EXPORT int zmq_term (void *context); ...@@ -188,6 +191,12 @@ ZMQ_EXPORT int zmq_term (void *context);
// the peer that issued the last received request. // the peer that issued the last received request.
#define ZMQ_REP 4 #define ZMQ_REP 4
// Socket to receive messages from up the stream.
#define ZMQ_UPSTREAM 5
// Socket to send messages downstream.
#define ZMQ_DOWNSTREAM 6
// Open a socket. 'type' is one of the socket types defined above. // Open a socket. 'type' is one of the socket types defined above.
// //
// Errors: EINVAL - invalid socket type. // Errors: EINVAL - invalid socket type.
......
...@@ -34,6 +34,8 @@ public class Socket ...@@ -34,6 +34,8 @@ public class Socket
public static final int SUB = 2; public static final int SUB = 2;
public static final int REQ = 3; public static final int REQ = 3;
public static final int REP = 4; public static final int REP = 4;
public static final int UPSTREAM = 4;
public static final int DOWNSTREAM = 4;
public static final int HWM = 1; public static final int HWM = 1;
public static final int LWM = 2; public static final int LWM = 2;
......
...@@ -498,6 +498,12 @@ PyMODINIT_FUNC initlibpyzmq () ...@@ -498,6 +498,12 @@ PyMODINIT_FUNC initlibpyzmq ()
t = PyInt_FromLong (ZMQ_REP); t = PyInt_FromLong (ZMQ_REP);
PyDict_SetItemString (dict, "REP", t); PyDict_SetItemString (dict, "REP", t);
Py_DECREF (t); Py_DECREF (t);
t = PyInt_FromLong (ZMQ_UPSTREAM);
PyDict_SetItemString (dict, "UPSTREAM", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_DOWNSTREAM);
PyDict_SetItemString (dict, "DOWNSTREAM", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_HWM); t = PyInt_FromLong (ZMQ_HWM);
PyDict_SetItemString (dict, "HWM", t); PyDict_SetItemString (dict, "HWM", t);
Py_DECREF (t); Py_DECREF (t);
......
...@@ -275,6 +275,8 @@ extern "C" void Init_librbzmq () ...@@ -275,6 +275,8 @@ extern "C" void Init_librbzmq ()
rb_define_global_const ("PUB", INT2NUM (ZMQ_PUB)); rb_define_global_const ("PUB", INT2NUM (ZMQ_PUB));
rb_define_global_const ("REQ", INT2NUM (ZMQ_REQ)); rb_define_global_const ("REQ", INT2NUM (ZMQ_REQ));
rb_define_global_const ("REP", INT2NUM (ZMQ_REP)); rb_define_global_const ("REP", INT2NUM (ZMQ_REP));
rb_define_global_const ("UPSTREAM", INT2NUM (ZMQ_UPSTREAM));
rb_define_global_const ("DOWNSTREAM", INT2NUM (ZMQ_DOWNSTREAM));
rb_define_global_const ("POLL", INT2NUM (ZMQ_POLL)); rb_define_global_const ("POLL", INT2NUM (ZMQ_POLL));
} }
...@@ -181,6 +181,10 @@ ...@@ -181,6 +181,10 @@
RelativePath="..\..\..\src\dispatcher.cpp" RelativePath="..\..\..\src\dispatcher.cpp"
> >
</File> </File>
<File
RelativePath="..\..\..\src\downstream.cpp"
>
</File>
<File <File
RelativePath="..\..\..\src\epoll.cpp" RelativePath="..\..\..\src\epoll.cpp"
> >
...@@ -289,6 +293,10 @@ ...@@ -289,6 +293,10 @@
RelativePath="..\..\..\src\thread.cpp" RelativePath="..\..\..\src\thread.cpp"
> >
</File> </File>
<File
RelativePath="..\..\..\src\upstream.cpp"
>
</File>
<File <File
RelativePath="..\..\..\src\uuid.cpp" RelativePath="..\..\..\src\uuid.cpp"
> >
...@@ -375,6 +383,10 @@ ...@@ -375,6 +383,10 @@
RelativePath="..\..\..\src\encoder.hpp" RelativePath="..\..\..\src\encoder.hpp"
> >
</File> </File>
<File
RelativePath="..\..\..\src\downstream.hpp"
>
</File>
<File <File
RelativePath="..\..\..\src\epoll.hpp" RelativePath="..\..\..\src\epoll.hpp"
> >
...@@ -531,6 +543,10 @@ ...@@ -531,6 +543,10 @@
RelativePath="..\..\..\src\thread.hpp" RelativePath="..\..\..\src\thread.hpp"
> >
</File> </File>
<File
RelativePath="..\..\..\src\upstream.hpp"
>
</File>
<File <File
RelativePath="..\..\..\src\uuid.hpp" RelativePath="..\..\..\src\uuid.hpp"
> >
......
...@@ -49,6 +49,10 @@ on_mingw32="no" ...@@ -49,6 +49,10 @@ on_mingw32="no"
# Host speciffic checks # Host speciffic checks
AC_CANONICAL_HOST AC_CANONICAL_HOST
# Whether or not install manual pages.
# Note that on MinGW manpages are not installed.
install_man="yes"
case "${host_os}" in case "${host_os}" in
*linux*) *linux*)
AC_DEFINE(ZMQ_HAVE_LINUX, 1, [Have Linux OS]) AC_DEFINE(ZMQ_HAVE_LINUX, 1, [Have Linux OS])
...@@ -134,6 +138,7 @@ case "${host_os}" in ...@@ -134,6 +138,7 @@ case "${host_os}" in
[AC_MSG_ERROR([Could not link with Iphlpapi.dll.])]) [AC_MSG_ERROR([Could not link with Iphlpapi.dll.])])
CFLAGS="${CFLAGS} -std=c99" CFLAGS="${CFLAGS} -std=c99"
on_mingw32="yes" on_mingw32="yes"
install_man="no"
;; ;;
*) *)
AC_MSG_ERROR([Not supported os: $host.]) AC_MSG_ERROR([Not supported os: $host.])
...@@ -585,6 +590,14 @@ if test "x$with_forwarder" != "xno"; then ...@@ -585,6 +590,14 @@ if test "x$with_forwarder" != "xno"; then
forwarder="yes" forwarder="yes"
fi fi
# streamer device
streamer="no"
AC_ARG_WITH([streamer], [AS_HELP_STRING([--with-streamer],
[build streamer device [default=no]])], [with_streamer=yes], [with_streamer=no])
if test "x$with_streamer" != "xno"; then
streamer="yes"
fi
# Perf # Perf
perf="no" perf="no"
...@@ -613,10 +626,12 @@ AM_CONDITIONAL(BUILD_CPP, test "x$cppzmq" = "xyes") ...@@ -613,10 +626,12 @@ AM_CONDITIONAL(BUILD_CPP, test "x$cppzmq" = "xyes")
AM_CONDITIONAL(BUILD_PGM1, test "x$pgm1_ext" = "xyes") AM_CONDITIONAL(BUILD_PGM1, test "x$pgm1_ext" = "xyes")
AM_CONDITIONAL(BUILD_PGM2, test "x$pgm2_ext" = "xyes") AM_CONDITIONAL(BUILD_PGM2, test "x$pgm2_ext" = "xyes")
AM_CONDITIONAL(BUILD_NO_PGM, test "x$pgm2_ext" = "xno" -a "x$pgm1_ext" = "xno") AM_CONDITIONAL(BUILD_NO_PGM, test "x$pgm2_ext" = "xno" -a "x$pgm1_ext" = "xno")
AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes") AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes")
AM_CONDITIONAL(BUILD_STREAMER, test "x$streamer" = "xyes")
AM_CONDITIONAL(BUILD_PERF, test "x$perf" = "xyes") AM_CONDITIONAL(BUILD_PERF, test "x$perf" = "xyes")
AM_CONDITIONAL(ON_MINGW, test "x$on_mingw32" = "xyes") AM_CONDITIONAL(ON_MINGW, test "x$on_mingw32" = "xyes")
AM_CONDITIONAL(BUILD_PGM2_EXAMPLES, test "x$with_pgm2_ext" = "xyes") AM_CONDITIONAL(BUILD_PGM2_EXAMPLES, test "x$with_pgm2_ext" = "xyes")
AM_CONDITIONAL(INSTALL_MAN, test "x$install_man" = "xyes")
AC_SUBST(stdint) AC_SUBST(stdint)
AC_SUBST(inttypes) AC_SUBST(inttypes)
...@@ -631,11 +646,12 @@ AC_FUNC_MALLOC ...@@ -631,11 +646,12 @@ AC_FUNC_MALLOC
AC_TYPE_SIGNAL AC_TYPE_SIGNAL
AC_CHECK_FUNCS(perror gettimeofday memset socket getifaddrs freeifaddrs) AC_CHECK_FUNCS(perror gettimeofday memset socket getifaddrs freeifaddrs)
AC_OUTPUT(Makefile src/Makefile bindings/python/Makefile \ AC_OUTPUT(Makefile src/Makefile man/Makefile bindings/python/Makefile \
bindings/python/setup.py bindings/ruby/Makefile \ bindings/python/setup.py bindings/ruby/Makefile \
bindings/java/Makefile perf/Makefile perf/c/Makefile perf/cpp/Makefile \ bindings/java/Makefile perf/Makefile perf/c/Makefile perf/cpp/Makefile \
perf/python/Makefile perf/ruby/Makefile perf/java/Makefile src/libzmq.pc \ perf/python/Makefile perf/ruby/Makefile perf/java/Makefile src/libzmq.pc \
devices/Makefile devices/zmq_forwarder/Makefile bindings/Makefile) devices/Makefile devices/zmq_forwarder/Makefile \
devices/zmq_streamer/Makefile bindings/Makefile)
AC_MSG_RESULT([]) AC_MSG_RESULT([])
AC_MSG_RESULT([ ******************************************************** ]) AC_MSG_RESULT([ ******************************************************** ])
...@@ -670,6 +686,7 @@ AC_MSG_RESULT([ PGM: no]) ...@@ -670,6 +686,7 @@ AC_MSG_RESULT([ PGM: no])
fi fi
AC_MSG_RESULT([ Devices:]) AC_MSG_RESULT([ Devices:])
AC_MSG_RESULT([ forwarder: $forwarder]) AC_MSG_RESULT([ forwarder: $forwarder])
AC_MSG_RESULT([ streamer: $streamer])
AC_MSG_RESULT([ Performance tests: $perf]) AC_MSG_RESULT([ Performance tests: $perf])
AC_MSG_RESULT([]) AC_MSG_RESULT([])
AC_MSG_RESULT([ ******************************************************** ]) AC_MSG_RESULT([ ******************************************************** ])
......
...@@ -2,5 +2,9 @@ if BUILD_FORWARDER ...@@ -2,5 +2,9 @@ if BUILD_FORWARDER
FORWARDER_DIR = zmq_forwarder FORWARDER_DIR = zmq_forwarder
endif endif
SUBDIRS = $(FORWARDER_DIR) if BUILD_STREAMER
DIST_SUBDIRS = zmq_forwarder STREAMER_DIR = zmq_streamer
endif
SUBDIRS = $(FORWARDER_DIR) $(STREAMER_DIR)
DIST_SUBDIRS = zmq_forwarder zmq_streamer
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
int main (int argc, char *argv []) int main (int argc, char *argv [])
{ {
if (argc != 2) { if (argc != 2) {
fprintf (stderr, "usage: forwarder <config-file>\n"); fprintf (stderr, "usage: zmq_forwarder <config-file>\n");
return 1; return 1;
} }
...@@ -53,8 +53,9 @@ int main (int argc, char *argv []) ...@@ -53,8 +53,9 @@ int main (int argc, char *argv [])
// TODO: make the number of I/O threads configurable. // TODO: make the number of I/O threads configurable.
zmq::context_t ctx (1, 1); zmq::context_t ctx (1, 1);
zmq::socket_t in_socket (ctx, ZMQ_P2P); zmq::socket_t in_socket (ctx, ZMQ_SUB);
zmq::socket_t out_socket (ctx, ZMQ_P2P); in_socket.setsockopt (ZMQ_SUBSCRIBE, "*", 1);
zmq::socket_t out_socket (ctx, ZMQ_PUB);
int n = 0; int n = 0;
while (true) { while (true) {
......
INCLUDES = -I$(top_builddir)/bindings/c
bin_PROGRAMS = zmq_streamer
zmq_streamer_LDADD = $(top_builddir)/src/libzmq.la
zmq_streamer_SOURCES = zmq_streamer.cpp
zmq_streamer_CXXFLAGS = -Wall -pedantic -Werror
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../../bindings/cpp/zmq.hpp"
#include "../../foreign/xmlParser/xmlParser.cpp"
int main (int argc, char *argv [])
{
if (argc != 2) {
fprintf (stderr, "usage: zmq_streamer <config-file>\n");
return 1;
}
XMLNode root = XMLNode::parseFile (argv [1]);
if (root.isEmpty ()) {
fprintf (stderr, "configuration file not found\n");
return 1;
}
if (strcmp (root.getName (), "streamer") != 0) {
fprintf (stderr, "root element in the configuration file should be "
"named 'streamer'\n");
return 1;
}
XMLNode in_node = root.getChildNode ("in");
if (in_node.isEmpty ()) {
fprintf (stderr, "'in' node is missing in the configuration file\n");
return 1;
}
XMLNode out_node = root.getChildNode ("out");
if (out_node.isEmpty ()) {
fprintf (stderr, "'out' node is missing in the configuration file\n");
return 1;
}
// TODO: make the number of I/O threads configurable.
zmq::context_t ctx (1, 1);
zmq::socket_t in_socket (ctx, ZMQ_UPSTREAM);
zmq::socket_t out_socket (ctx, ZMQ_DOWNSTREAM);
int n = 0;
while (true) {
XMLNode bind = in_node.getChildNode ("bind", n);
if (bind.isEmpty ())
break;
const char *addr = bind.getAttribute ("addr");
if (!addr) {
fprintf (stderr, "'bind' node is missing 'addr' attribute\n");
return 1;
}
in_socket.bind (addr);
n++;
}
n = 0;
while (true) {
XMLNode connect = in_node.getChildNode ("connect", n);
if (connect.isEmpty ())
break;
const char *addr = connect.getAttribute ("addr");
if (!addr) {
fprintf (stderr, "'connect' node is missing 'addr' attribute\n");
return 1;
}
in_socket.connect (addr);
n++;
}
n = 0;
while (true) {
XMLNode bind = out_node.getChildNode ("bind", n);
if (bind.isEmpty ())
break;
const char *addr = bind.getAttribute ("addr");
if (!addr) {
fprintf (stderr, "'bind' node is missing 'addr' attribute\n");
return 1;
}
out_socket.bind (addr);
n++;
}
n = 0;
while (true) {
XMLNode connect = out_node.getChildNode ("connect", n);
if (connect.isEmpty ())
break;
const char *addr = connect.getAttribute ("addr");
if (!addr) {
fprintf (stderr, "'connect' node is missing 'addr' attribute\n");
return 1;
}
out_socket.connect (addr);
n++;
}
zmq::message_t msg;
while (true) {
in_socket.recv (&msg);
out_socket.send (msg);
}
return 0;
}
dist_man_MANS = man1/zmq_forwarder.1 man3/zmq_init.3 man3/zmq_term.3 \
man3/zmq_socket.3 man3/zmq_close.3 man3/zmq_setsockopt.3 man3/zmq_bind.3 \
man3/zmq_connect.3 man3/zmq_send.3 man3/zmq_flush.3 man3/zmq_recv.3 \
man3/zmq_poll.3 man3/zmq_msg_init.3 man3/zmq_msg_init_size.3 \
man3/zmq_msg_close.3 man3/zmq_msg_move.3 man3/zmq_msg_copy.3 \
man3/zmq_msg_data.3 man3/zmq_msg_size.3 \
man3/zmq_strerror.3 man7/zmq.7
distclean-local:
-rm *.pdf
-rm man1/*.ps
-rm man3/*.ps
-rm man7/*.ps
dist-hook:
./convert2pdf.sh
$(mkdir_p) $(top_distdir)/doc
cp $(top_srcdir)/man/*.pdf $(top_distdir)/doc
#!/bin/sh
#
# Copyright (c) 2007-2009 FastMQ Inc.
#
# This file is part of 0MQ.
#
# 0MQ is free software; you can redistribute it and/or modify it under
# the terms of the Lesser GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# 0MQ is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Lesser GNU General Public License for more details.
#
# You should have received a copy of the Lesser GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
groff -man -Tps man1/zmq_forwarder.1 > man1/zmq_forwarder.1.ps
ps2pdf man1/zmq_forwarder.1.ps zmq_forwarder.pdf
groff -man -Tps man3/zmq_init.3 > man3/zmq_init.3.ps
ps2pdf man3/zmq_init.3.ps zmq_init.pdf
groff -man -Tps man3/zmq_term.3 > man3/zmq_term.3.ps
ps2pdf man3/zmq_term.3.ps zmq_term.pdf
groff -man -Tps man3/zmq_socket.3 > man3/zmq_socket.3.ps
ps2pdf man3/zmq_socket.3.ps zmq_socket.pdf
groff -man -Tps man3/zmq_close.3 > man3/zmq_close.3.ps
ps2pdf man3/zmq_close.3.ps zmq_close.pdf
groff -man -Tps man3/zmq_setsockopt.3 > man3/zmq_setsockopt.3.ps
ps2pdf man3/zmq_setsockopt.3.ps zmq_setsockopt.pdf
groff -man -Tps man3/zmq_bind.3 > man3/zmq_bind.3.ps
ps2pdf man3/zmq_bind.3.ps zmq_bind.pdf
groff -man -Tps man3/zmq_connect.3 > man3/zmq_connect.3.ps
ps2pdf man3/zmq_connect.3.ps zmq_connect.pdf
groff -man -Tps man3/zmq_send.3 > man3/zmq_send.3.ps
ps2pdf man3/zmq_send.3.ps zmq_send.pdf
groff -man -Tps man3/zmq_flush.3 > man3/zmq_flush.3.ps
ps2pdf man3/zmq_flush.3.ps zmq_flush.pdf
groff -man -Tps man3/zmq_recv.3 > man3/zmq_recv.3.ps
ps2pdf man3/zmq_recv.3.ps zmq_recv.pdf
groff -man -Tps man3/zmq_poll.3 > man3/zmq_poll.3.ps
ps2pdf man3/zmq_poll.3.ps zmq_poll.pdf
groff -man -Tps man3/zmq_msg_init.3 > man3/zmq_msg_init.3.ps
ps2pdf man3/zmq_msg_init.3.ps zmq_msg_init.pdf
groff -man -Tps man3/zmq_msg_init_size.3 > man3/zmq_msg_init_size.3.ps
ps2pdf man3/zmq_msg_init_size.3.ps zmq_msg_init_size.pdf
groff -man -Tps man3/zmq_msg_init_data.3 > man3/zmq_msg_init_data.3.ps
ps2pdf man3/zmq_msg_init_data.3.ps zmq_msg_init_data.pdf
groff -man -Tps man3/zmq_msg_close.3 > man3/zmq_msg_close.3.ps
ps2pdf man3/zmq_msg_close.3.ps zmq_msg_close.pdf
groff -man -Tps man3/zmq_msg_move.3 > man3/zmq_msg_move.3.ps
ps2pdf man3/zmq_msg_move.3.ps zmq_msg_move.pdf
groff -man -Tps man3/zmq_msg_copy.3 > man3/zmq_msg_copy.3.ps
ps2pdf man3/zmq_msg_copy.3.ps zmq_msg_copy.pdf
groff -man -Tps man3/zmq_msg_data.3 > man3/zmq_msg_data.3.ps
ps2pdf man3/zmq_msg_data.3.ps zmq_msg_data.pdf
groff -man -Tps man3/zmq_msg_size.3 > man3/zmq_msg_size.3.ps
ps2pdf man3/zmq_msg_size.3.ps zmq_msg_size.pdf
groff -man -Tps man3/zmq_strerror.3 > man3/zmq_strerror.3.ps
ps2pdf man3/zmq_strerror.3.ps zmq_strerror.pdf
groff -man -Tps man7/zmq.7 > man7/zmq.7.ps
ps2pdf man7/zmq.7.ps zmq.pdf
.TH zmq_forwarder 1 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
zmq_forwarder \- forwards the stream of PUB/SUB messages
.SH SYNOPSIS
.SH DESCRIPTION
.SH OPTIONS
.SH "SEE ALSO"
.SH AUTHOR
Martin Sustrik <sustrik at fastmq dot com>
.TH zmq_bind 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
zmq_bind \- binds the socket to the specified address
.SH SYNOPSIS
.B int zmq_bind (void *s, const char *addr);
.SH DESCRIPTION
The function binds socket
.IR s to a particular transport. Actual semantics of the
command depend on the underlying transport mechanism, however, in cases where
peers connect in an asymetric manner,
.IR zmq_bind
should be called first,
.IR zmq_connect
afterwards. For actual formats of
.IR addr
parameter for different types of transport have a look at
.IR zmq(7) .
Note that single socket can be bound (and connected) to
arbitrary number of peers using different transport mechanisms.
.SH RETURN VALUE
In case of success the function returns zero. Otherwise it returns -1 and
sets
.IR errno
to the appropriate value.
.SH ERRORS
.IP "\fBEPROTONOSUPPORT\fP"
unsupported protocol.
.IP "\fBENOCOMPATPROTO\fP"
protocol is not compatible with the socket type.
.IP "\fBEADDRINUSE\fP"
the given address is already in use.
.IP "\fBEADDRNOTAVAIL\fP"
a nonexistent interface was requested or the requested address was not local.
.SH EXAMPLE
.nf
void *s = zmq_socket (context, ZMQ_PUB);
assert (s);
int rc = zmq_bind (s, "inproc://my_publisher");
assert (rc == 0);
rc = zmq_bind (s, "tcp://eth0:5555");
assert (rc == 0);
.fi
.SH SEE ALSO
.BR zmq_connect (3)
.BR zmq_socket (3)
.BR zmq (7)
.SH AUTHOR
Martin Sustrik <sustrik at 250bpm dot com>
.TH zmq_close 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
zmq_close \- destroys 0MQ socket
.SH SYNOPSIS
.B int zmq_close (void *s);
.SH DESCRIPTION
Destroys 0MQ socket (one created using
.IR zmq_socket
function). All sockets have to be properly closed before the application
terminates, otherwise memory leaks will occur.
.SH RETURN VALUE
In case of success the function returns zero. Otherwise it returns -1 and
sets
.IR errno
to the appropriate value.
.SH ERRORS
No errors are defined.
.SH EXAMPLE
.nf
int rc = zmq_close (s);
assert (rc == 0);
.fi
.SH SEE ALSO
.BR zmq_socket (3)
.BR zmq_term (3)
.SH AUTHOR
Martin Sustrik <sustrik at 250bpm dot com>
.TH zmq_connect 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
zmq_connect \- connect the socket to the specified peer
.SH SYNOPSIS
.B int zmq_connect (void *s, const char *addr);
.SH DESCRIPTION
The function connect socket
.IR s to the peer identified by
.IR addr .
Actual semantics of the command depend on the underlying transport mechanism,
however, in cases where peers connect in an asymetric manner,
.IR zmq_bind
should be called first,
.IR zmq_connect
afterwards. For actual formats of
.IR addr
parameter for different types of transport have a look at
.IR zmq(7) .
Note that single socket can be connected (and bound) to
arbitrary number of peers using different transport mechanisms.
.SH RETURN VALUE
In case of success the function returns zero. Otherwise it returns -1 and
sets
.IR errno
to the appropriate value.
.SH ERRORS
.IP "\fBEPROTONOSUPPORT\fP"
unsupported protocol.
.IP "\fBENOCOMPATPROTO\fP"
protocol is not compatible with the socket type.
.IP "\fBECONNREFUSED\fP"
no-one listening on the remote address.
.SH EXAMPLE
.nf
void *s = zmq_socket (context, ZMQ_SUB);
assert (s);
int rc = zmq_connect (s, "inproc://my_publisher");
assert (rc == 0);
rc = zmq_connect (s, "tcp://server001:5555");
assert (rc == 0);
.fi
.SH SEE ALSO
.BR zmq_bind (3)
.BR zmq_socket (3)
.BR zmq (7)
.SH AUTHOR
Martin Sustrik <sustrik at 250bpm dot com>
.TH zmq_flush 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
zmq_flush \- flushes pre-sent messages to the socket
.SH SYNOPSIS
.B int zmq_flush (void *s);
.SH DESCRIPTION
Flushes all the pre-sent messages - i.e. those that have been sent with
ZMQ_NOFLUSH flag - to the socket. This functionality improves performance in
cases where several messages are sent during a single business operation.
It should not be used as a transaction - ACID properties are not guaranteed.
Note that calling
.IR zmq_send
without ZMQ_NOFLUSH flag automatically flushes all previously pre-sent messages.
.SH RETURN VALUE
In case of success the function returns zero. Otherwise it returns -1 and
sets
.IR errno
to the appropriate value.
.SH ERRORS
.IP "\fBENOTSUP\fP"
function isn't supported by particular socket type.
.IP "\fBEFSM\fP"
function cannot be called at the moment, because socket is not in the
approprite state.
.SH EXAMPLE
.nf
rc = zmq_send (s, &msg1, ZMQ_NOFLUSH);
assert (rc == 0);
rc = zmq_send (s, &msg2, ZMQ_NOFLUSH);
assert (rc == 0);
rc = zmq_flush (s);
assert (rc == 0);
.fi
.SH SEE ALSO
.BR zmq_send (3)
.SH AUTHOR
Martin Sustrik <sustrik at 250bpm dot com>
.TH zmq_init 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
zmq_init \- initialises 0MQ context
.SH SYNOPSIS
.B void *zmq_init (int app_threads, int io_threads, int flags);
.SH DESCRIPTION
Initialises 0MQ context.
.IR app_threads
specifies maximal number of application threads that can own open sockets
at the same time. At least one application thread should be defined.
.IR io_threads
specifies the size of thread pool to handle I/O operations. The value shouldn't
be negative. Zero can be used in case only in-process messaging is going to be
used, i.e. there will be no I/O traffic.
'flags' argument is a combination of the flags defined below:
.IP "\fBZMQ_POLL\fP"
flag specifying that the sockets within this context should be pollable (see
.IR zmq_poll
). Pollable sockets may add a little latency to the message transfer when
compared to non-pollable sockets.
.SH RETURN VALUE
Function returns context handle is successful. Otherwise it returns NULL and
sets errno to one of the values below.
.SH ERRORS
.IP "\fBEINVAL\fP"
there's less than one application thread allocated, or number of I/O threads
is negative.
.SH EXAMPLE
.nf
void *ctx = zmq_init (1, 1, ZMQ_POLL);
assert (ctx);
.fi
.SH SEE ALSO
.BR zmq_term (3)
.BR zmq_socket (3)
.SH AUTHOR
Martin Sustrik <sustrik at 250bpm dot com>
.TH zmq_msg_close 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
zmq_msg_close \- destroys 0MQ message
.SH SYNOPSIS
.B int zmq_msg_close (zmq_msg_t *msg);
.SH DESCRIPTION
Deallocates message
.IR msg
including any associated buffers (unless the buffer is
shared with another message). Not calling this function can result in
memory leaks.
.SH RETURN VALUE
In case of success the function returns zero. Otherwise it returns -1 and
sets
.IR errno
to the appropriate value.
.SH ERRORS
No errors are defined.
.SH EXAMPLE
.nf
zmq_msg_t msg;
rc = zmq_msg_init_size (&msg, 1000000);
assert (rc = 0);
rc = zmq_msg_close (&msg);
assert (rc = 0);
.fi
.SH SEE ALSO
.BR zmq_msg_init (3)
.BR zmq_msg_init_size (3)
.BR zmq_msg_init_data (3)
.SH AUTHOR
Martin Sustrik <sustrik at 250bpm dot com>
.TH zmq_msg_copy 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
zmq_msg_copy \- copies content of a message to another message
.SH SYNOPSIS
.B int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);
.SH DESCRIPTION
Copy the
.IR src
message to
.IR dest .
The original content of
.IR dest
is orderly deallocated.
Caution: The implementation may choose not to physically copy the data, rather
to share the buffer between two messages. Thus avoid modifying message data
after the message was copied. Doing so can modify multiple message instances.
If what you need is actual hard copy, allocate new message using
.IR zmq_msg_size
and copy the data using
.IR memcpy .
.SH RETURN VALUE
In case of success the function returns zero. Otherwise it returns -1 and
sets
.IR errno
to the appropriate value.
.SH ERRORS
No errors are defined.
.SH EXAMPLE
.nf
zmq_msg_t dest;
rc = zmq_msg_init (&dest);
assert (rc == 0);
rc = zmq_msg_copy (&dest, &src);
assert (rc == 0);
.fi
.SH SEE ALSO
.BR zmq_msg_move (3)
.BR zmq_msg_init (3)
.BR zmq_msg_init_size (3)
.BR zmq_msg_init_data (3)
.BR zmq_msg_close (3)
.SH AUTHOR
Martin Sustrik <sustrik at 250bpm dot com>
.TH zmq_msg_data 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
zmq_msg_data \- retrieves pointer to the message content
.SH SYNOPSIS
.B void *zmq_msg_data (zmq_msg_t *msg);
.SH DESCRIPTION
Returns pointer to message data. Always use this function to access the data,
never use
.IR zmq_msg_t
members directly.
.SH RETURN VALUE
Pointer to the message data.
.SH ERRORS
No errors are defined.
.SH EXAMPLE
.nf
zmq_msg_t msg;
rc = zmq_msg_init_size (&msg, 100);
memset (zmq_msg_data (&msg), 0, 100);
.fi
.SH SEE ALSO
.BR zmq_msg_init (3)
.BR zmq_msg_init_size (3)
.BR zmq_msg_init_data (3)
.BR zmq_msg_close (3)
.SH AUTHOR
Martin Sustrik <sustrik at 250bpm dot com>
.TH zmq_msg_init 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
zmq_msg_init \- initialises empty 0MQ message
.SH SYNOPSIS
.B int zmq_msg_init (zmq_msg_t *msg);
.SH DESCRIPTION
Initialises 0MQ message zero bytes long. The function is most useful
to initialise a
.IR zmq_msg_t
structure before receiving a message.
.SH RETURN VALUE
In case of success the function returns zero. Otherwise it returns -1 and
sets
.IR errno
to the appropriate value.
.SH ERRORS
No errors are defined.
.SH EXAMPLE
.nf
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_recv (s, &msg, 0);
assert (rc == 0);
.fi
.SH SEE ALSO
.BR zmq_msg_close (3)
.BR zmq_msg_init_size (3)
.BR zmq_msg_init_data (3)
.BR zmq_msg_data (3)
.BR zmq_msg_size (3)
.SH AUTHOR
Martin Sustrik <sustrik at 250bpm dot com>
.TH zmq_msg_init_data 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
zmq_msg_init \- initialises 0MQ message from the given data
.SH SYNOPSIS
.nf
.B typedef void (zmq_free_fn) (void *data);
.B int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn);
.fi
.SH DESCRIPTION
Initialise a message from a supplied buffer. Message isn't copied,
instead 0MQ infrastructure takes ownership of the buffer located at address
.IR data ,
.IR size
bytes long.
Deallocation function (
.IR ffn
) will be called once the data are not needed anymore. Note that deallocation
function prototype is designed so that it complies with standard C
.IR free
function. When using a static constant buffer,
.IR ffn
may be NULL to prevent subsequent deallocation.
.SH RETURN VALUE
In case of success the function returns zero. Otherwise it returns -1 and
sets
.IR errno
to the appropriate value.
.SH ERRORS
No errors are defined.
.SH EXAMPLE
.nf
void *data = malloc (6);
assert (data);
memcpy (data, "ABCDEF", 6);
zmq_msg_t msg;
rc = zmq_msg_init_data (&msg, data, 6, free);
assert (rc == 0);
rc = zmq_send (s, &msg, 0);
assert (rc == 0);
.fi
.SH SEE ALSO
.BR zmq_msg_close (3)
.BR zmq_msg_init (3)
.BR zmq_msg_init_size (3)
.BR zmq_msg_data (3)
.BR zmq_msg_size (3)
.SH AUTHOR
Martin Sustrik <sustrik at 250bpm dot com>
.TH zmq_msg_init_size 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
zmq_msg_init \- initialises 0MQ message of a specified size
.SH SYNOPSIS
.B int zmq_msg_init_size (zmq_msg_t *msg, size_t size);
.SH DESCRIPTION
Initialises 0MQ message
.IR size
bytes long. The implementation chooses whether it is more efficient to store
message content on the stack (small messages) or on the heap (large messages).
Therefore, never access message data directly via
.IR zmq_msg_t
members, rather use
.IR zmq_msg_data
and
.IR zmq_msg_size
functions to get message data and size. Note that the message data are not
nullified to avoid the associated performance impact. Thus you
should expect your message to contain bogus data after this call.
.SH RETURN VALUE
In case of success the function returns zero. Otherwise it returns -1 and
sets
.IR errno
to the appropriate value.
.SH ERRORS
.IP "\fBENOMEM\fP"
memory to hold the message cannot be allocated.
.SH EXAMPLE
.nf
zmq_msg_t msg;
rc = zmq_msg_init_size (&msg, 6);
assert (rc == 0);
memcpy (zmq_msg_data (&msg), "ABCDEF", 6);
rc = zmq_send (s, &msg, 0);
assert (rc == 0);
.fi
.SH SEE ALSO
.BR zmq_msg_close (3)
.BR zmq_msg_init (3)
.BR zmq_msg_init_data (3)
.BR zmq_msg_data (3)
.BR zmq_msg_size (3)
.SH AUTHOR
Martin Sustrik <sustrik at 250bpm dot com>
.TH zmq_msg_move 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
zmq_msg_move \- moves content of a message to another message
.SH SYNOPSIS
.B int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);
.SH DESCRIPTION
Move the content of the message from
.IR src
to
.IR dest .
The content isn't copied, just moved.
.IR src
becomes an empty message after the call. Original content of
.IR dest
message is deallocated.
.SH RETURN VALUE
In case of success the function returns zero. Otherwise it returns -1 and
sets
.IR errno
to the appropriate value.
.SH ERRORS
No errors are defined.
.SH EXAMPLE
.nf
zmq_msg_t dest;
rc = zmq_msg_init (&dest);
assert (rc == 0);
rc = zmq_msg_move (&dest, &src);
assert (rc == 0);
.fi
.SH SEE ALSO
.BR zmq_msg_copy (3)
.BR zmq_msg_init (3)
.BR zmq_msg_init_size (3)
.BR zmq_msg_init_data (3)
.BR zmq_msg_close (3)
.SH AUTHOR
Martin Sustrik <sustrik at 250bpm dot com>
.TH zmq_msg_size 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
zmq_msg_size \- retrieves size of the message content
.SH SYNOPSIS
.B size_t zmq_msg_size (zmq_msg_t *msg);
.SH DESCRIPTION
Returns size of the message data. Always use this function to get the size,
never use
.IR zmq_msg_t
members directly.
.SH RETURN VALUE
Size of the message data (bytes).
.SH ERRORS
No errors are defined.
.SH EXAMPLE
.nf
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_recv (s, &msg, 0);
assert (rc == 0);
size_t msg_size = zmq_msg_size (&msg);
.fi
.SH SEE ALSO
.BR zmq_msg_init (3)
.BR zmq_msg_init_size (3)
.BR zmq_msg_init_data (3)
.BR zmq_msg_close (3)
.SH AUTHOR
Martin Sustrik <sustrik at 250bpm dot com>
.TH zmq_poll 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
zmq_poll \- polls for events on a set of 0MQ and POSIX sockets
.SH SYNOPSIS
.B int zmq_poll (zmq_pollitem_t *items, int nitems);
.SH DESCRIPTION
Waits for the events specified by
.IR items
parameter. Number of items in the array is determined by
.IR nitems
argument. Each item in the array looks like this:
.nf
typedef struct
{
void *socket;
int fd;
short events;
short revents;
} zmq_pollitem_t;
.fi
0MQ socket to poll on is specified by
.IR socket .
In case you want to poll on standard POSIX socket, set
.IR socket
to NULL and fill the POSIX file descriptor to
.IR fd .
.IR events
specifies which events to wait for. It's a combination of the values below.
Once the call exits,
.IR revent
will be filled with events that have actually occured on the socket. The field
will contain a combination of the following values.
.IP "\fBZMQ_POLLIN\fP"
poll for incoming messages.
.IP "\fBZMQ_POLLOUT\fP"
wait while message can be set socket. Poll will return if a message of at least
one byte can be written to the socket. However, there is no guarantee that
arbitrarily large message can be sent.
.SH RETURN VALUE
Function returns number of items signaled or -1 in the case of error.
.SH ERRORS
.IP "\fBEFAULT\fP"
there's a 0MQ socket in the pollset belonging to a different application thread.
.IP "\fBENOTSUP\fP"
0MQ context was initialised without ZMQ_POLL flag. I/O multiplexing is disabled.
.SH EXAMPLE
.nf
zmq_pollitem_t items [2];
items [0].socket = s;
items [0].events = POLLIN;
items [1].socket = NULL;
items [1].fd = my_fd;
items [1].events = POLLIN;
int rc = zmq_poll (items, 2);
assert (rc != -1);
.fi
.SH SEE ALSO
.BR zmq_socket (3)
.SH AUTHOR
Martin Sustrik <sustrik at 250bpm dot com>
.TH zmq_recv 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
zmq_recv \- retrieves a message from the socket
.SH SYNOPSIS
.B int zmq_recv (void *s, zmq_msg_t *msg, int flags);
.SH DESCRIPTION
Receive a message from the socket
.IR s ,
store it in
.IR msg .
Any content previously in
.IR msg
will be properly deallocated.
.IR flags
argument can be combination of the flags described below.
.IP "\fBZMQ_NOBLOCK\fP"
The flag specifies that the operation should be performed in
non-blocking mode. I.e. if it cannot be processed immediately,
error should be returned with
.IR errno
set to EAGAIN.
.SH RETURN VALUE
In case of success the function returns zero. Otherwise it returns -1 and
sets
.IR errno
to the appropriate value.
.SH ERRORS
.IP "\fBEAGAIN\fP"
it's a non-blocking receive and there's no message available at the moment.
.IP "\fBENOTSUP\fP"
function isn't supported by particular socket type.
.IP "\fBEFSM\fP"
function cannot be called at the moment, because socket is not in the
approprite state. This error may occur with sockets that switch between
several states (e.g. ZMQ_REQ).
.SH EXAMPLE
.nf
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_recv (s, &msg, 0);
assert (rc == 0);
.fi
.SH SEE ALSO
.BR zmq_send (3)
.BR zmq_msg_init (3)
.BR zmq_msg_data (3)
.BR zmq_msg_size (3)
.SH AUTHOR
Martin Sustrik <sustrik at 250bpm dot com>
.TH zmq_send 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
zmq_send \- sends a message
.SH SYNOPSIS
.B int zmq_send (void *s, zmq_msg_t *msg, int flags);
.SH DESCRIPTION
Send the message
.IR msg
to the socket
.IR s .
.IR flags
argument can be combination the flags described below.
.IP "\fBZMQ_NOBLOCK\fP"
The flag specifies that the operation should be performed in
non-blocking mode. I.e. if it cannot be processed immediately,
error should be returned with
.IR errno
set to EAGAIN.
.IP "\fBZMQ_NOFLUSH\fP"
The flag specifies that
.IR zmq_send
should not flush the message downstream immediately. Instead, it should batch
ZMQ_NOFLUSH messages and send them downstream only once
.IR zmq_flush
is invoked. This is an optimisation for cases where several messages are sent
in a single business transaction. However, the effect is measurable only in
extremely high-perf scenarios (million messages a second or so).
If that's not your case, use standard flushing send instead.
.SH RETURN VALUE
In case of success the function returns zero. Otherwise it returns -1 and
sets
.IR errno
to the appropriate value.
.SH ERRORS
.IP "\fBEAGAIN\fP"
it's a non-blocking send and message cannot be sent at the moment.
.IP "\fBENOTSUP\fP"
function isn't supported by particular socket type.
.IP "\fBEFSM\fP"
function cannot be called at the moment, because socket is not in the
approprite state. This error may occur with sockets that switch between
several states (e.g. ZMQ_REQ).
.SH EXAMPLE
.nf
zmq_msg_t msg;
int rc = zmq_msg_init_size (&msg, 6);
assert (rc == 0);
memset (zmq_msg_data (&msg), 'A', 6);
rc = zmq_send (s, &msg, 0);
assert (rc == 0);
.fi
.SH SEE ALSO
.BR zmq_flush (3)
.BR zmq_recv (3)
.BR zmq_msg_init (3)
.BR zmq_msg_init_size (3)
.BR zmq_msg_init_data (3)
.BR zmq_msg_data (3)
.BR zmq_msg_size (3)
.SH AUTHOR
Martin Sustrik <sustrik at 250bpm dot com>
.TH zmq_setsockopt 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
zmq_setsockopt \- sets a specified option on a 0MQ socket
.SH SYNOPSIS
.B int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen);
.SH DESCRIPTION
Sets an option on the socket.
.IR option
argument specifies the option from the list below.
.IR optval
is a pointer to the value to set,
.IR optvallen
is the size of the value in bytes.
.IP "\fBZMQ_HWM\fP"
High watermark for the message pipes associated with the socket. The water
mark cannot be exceeded. If the messages don't fit into the pipe emergency
mechanisms of the particular socket type are used (block, drop etc.) If HWM
is set to zero, there are no limits for the content of the pipe.
Type: int64_t Unit: bytes Default: 0
.IP "\fBZMQ_LWM\fP"
Low watermark makes sense only if high watermark is defined (i.e. is non-zero).
When the emergency state is reached when messages overflow the pipe, the
emergency lasts till the size of the pipe decreases to low watermark.
At that point normal state is resumed.
Type: int64_t Unit: bytes Default: 0
.IP "\fBZMQ_SWAP\fP"
Swap allows the pipe to exceed high watermark. However, the data are written
to the disk rather than held in the memory. Until high watermark is
exceeded there is no disk activity involved though. The value of the option
defines maximal size of the swap file.
Type: int64_t Unit: bytes Default: 0
.IP "\fBZMQ_AFFINITY\fP"
Affinity defines which threads in the thread pool will be used to handle
newly created sockets. This way you can dedicate some of the threads (CPUs)
to a specific work. Value of 0 means no affinity. Work is distributed
fairly among the threads in the thread pool. For non-zero values, the lowest
bit corresponds to the thread 1, second lowest bit to the thread 2 etc.
Thus, value of 3 means that from now on newly created sockets will handle
I/O activity exclusively using threads no. 1 and 2.
Type: int64_t Unit: N/A (bitmap) Default: 0
.IP "\fBZMQ_IDENTITY\fP"
Identity of the socket. Identity is important when restarting applications.
If the socket has no identity, each run of the application is completely
separated from other runs. However, with identity application reconnects to
existing infrastructure left by the previous run. Thus it may receive
messages that were sent in the meantime, it shares pipe limits with the
previous run etc.
Type: string Unit: N/A Default: NULL
.IP "\fBZMQ_SUBSCRIBE\fP"
Applicable only to ZMQ_SUB socket type. It establishes new message filter.
When ZMQ_SUB socket is created all the incoming messages are filtered out.
This option allows you to subscribe for all messages ("*"), messages with
specific topic ("x.y.z") and/or messages with specific topic prefix
("x.y.*"). Topic is one-byte-size-prefixed string located at
the very beginning of the message. Multiple filters can be attached to
a single 'sub' socket. In that case message passes if it matches at least
one of the filters.
Type: string Unit: N/A Default: N/A
.IP "\fBZMQ_UNSUBSCRIBE\fP"
Applicable only to ZMQ_SUB socket type. Removes existing message filter.
The filter specified must match the string passed to ZMQ_SUBSCRIBE options
exactly. If there were several instances of the same filter created,
this options removes only one of them, leaving the rest in place
and functional.
Type: string Unit: N/A Default: N/A
.IP "\fBZMQ_RATE\fP"
This option applies only to sending side of multicast transports (pgm & udp).
It specifies maximal outgoing data rate that an individual sender socket
can send.
Type: uint64_t Unit: kilobits/second Default: 100
.IP "\fBZMQ_RECOVERY_IVL\fP"
This option applies only to multicast transports (pgm & udp). It specifies
how long can the receiver socket survive when the sender is inaccessible.
Keep in mind that large recovery intervals at high data rates result in
very large recovery buffers, meaning that you can easily overload your box
by setting say 1 minute recovery interval at 1Gb/s rate (requires
7GB in-memory buffer).
Type: uint64_t Unit: seconds Default: 10
.IP "\fBZMQ_MCAST_LOOP\fP"
This option applies only to multicast transports (pgm & udp). Value of 1
means that the mutlicast packets can be received on the box they were sent
from. Setting the value to 0 disables the loopback functionality which
can have negative impact on the performance. If possible, disable
the loopback in production environments.
Type: uint64_t Unit: N/A (boolean value) Default: 1
.SH RETURN VALUE
In case of success the function returns zero. Otherwise it returns -1 and
sets
.IR errno
to the appropriate value.
.SH ERRORS
.IP "\fBEINVAL\fP"
unknown option, a value with incorrect length or invalid value.
.SH EXAMPLE
.nf
int rc = zmq_setsockopt (s, ZMQ_SUBSCRIBE, "*", 1);
assert (rc == 0);
.fi
.SH SEE ALSO
.BR zmq_socket (3)
.BR zmq (7)
.SH AUTHOR
Martin Sustrik <sustrik at 250bpm dot com>
.TH zmq_socket 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
zmq_socket \- creates 0MQ socket
.SH SYNOPSIS
.B void *zmq_socket (void *context, int type);
.SH DESCRIPTION
Open a socket within the specified
.IR context .
To create a context, use
.IR zmq_init
function.
.IR type
argument can be one of the values defined below. Note that each socket is owned
by exactly one thread (the one that it was created from) and should not be used
from any other thread.
.IP "\fBZMQ_P2P\fP"
Socket to communicate with a single peer. Allows for only a single connect or a
single bind. There's no message routing or message filtering involved.
.IP "\fBZMQ_PUB\fP"
Socket to distribute data. Recv fuction is not implemented for this socket
type. Messages are distributed in fanout fashion to all the peers.
.IP "\fBZMQ_SUB\fP"
Socket to subscribe for data. Send function is not implemented for this
socket type. Initially, socket is subscribed for no messages. Use ZMQ_SUBSCRIBE
option to specify which messages to subscribe for.
.IP "\fBZMQ_REQ\fP"
Socket to send requests and receive replies. Requests are load-balanced among
all the peers. This socket type allows only an alternated sequence of
send's and recv's.
.IP "\fBZMQ_REP\fP"
Socket to receive requests and send replies. This socket type allows
only an alternated sequence of recv's and send's. Each send is routed to
the peer that issued the last received request.
.IP "\fBZMQ_UPSTREAM\fP"
Socket to receive messages from up the stream. Messages are fair-queued
from among all the connected peers. Send function is not implemented for
this socket type.
.IP "\fBZMQ_DOWNSTREAM\fP"
Socket to send messages down stream. Messages are load-balanced among all the
connected peers. Send function is not implemented for this socket type.
.SH RETURN VALUE
Function returns socket handle is successful. Otherwise it returns NULL and
sets errno to one of the values below.
.SH ERRORS
.IP "\fBEINVAL\fP"
invalid socket type.
.IP "\fBEMTHREAD\fP"
the number of application threads allowed to own 0MQ sockets was exceeded. See
.IR app_threads
parameter to
.IR zmq_init
function.
.SH EXAMPLE
.nf
void *s = zmq_socket (context, ZMQ_PUB);
assert (s);
int rc = zmq_bind (s, "tcp://192.168.0.1:5555");
assert (rc == 0);
.fi
.SH SEE ALSO
.BR zmq_init (3)
.BR zmq_setsockopt (3)
.BR zmq_bind (3)
.BR zmq_connect (3)
.BR zmq_send (3)
.BR zmq_flush (3)
.BR zmq_recv (3)
.SH AUTHOR
Martin Sustrik <sustrik at 250bpm dot com>
.TH zmq_strerror 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
zmq_strerror \- returns string describing the error number
.SH SYNOPSIS
.B const char *zmq_strerror (int errnum);
.SH DESCRIPTION
As 0MQ defines few additional (non-POSIX) error codes, standard
.IR strerror
isn't capable of translating those errors into human readable strings. Instead,
.IR zmq_strerror
should be used.
.SH RETURN VALUE
Returns string describing the error number.
.SH ERRORS
No errors are defined.
.SH EXAMPLE
.nf
void *ctx = zmq_init (1, 1, 0);
if (!ctx) {
printf ("error occured during zmq_init: %s\\n", zmq_strerror (errno));
abort ();
}
.fi
.SH SEE ALSO
.BR zmq (7)
.SH AUTHOR
Martin Sustrik <sustrik at 250bpm dot com>
.TH zmq_term 3 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
zmq_init \- terminates 0MQ context
.SH SYNOPSIS
.B int zmq_term (void *context);
.SH DESCRIPTION
Destroys 0MQ context. However, if there are still any sockets open within
the context,
.IR zmq_term
succeeds but shutdown of the context is delayed till the last socket is closed.
.SH RETURN VALUE
Function returns zero is successful. Otherwise it returns -1 and
sets errno to one of the values below.
.SH ERRORS
No errors are defined.
.SH EXAMPLE
.nf
int rc = zmq_term (context);
assert (rc == 0);
.fi
.SH SEE ALSO
.BR zmq_init (3)
.BR zmq_close (3)
.SH AUTHOR
Martin Sustrik <sustrik at 250bpm dot com>
.TH zmq 7 "" "(c)2007-2009 FastMQ Inc." "0MQ User Manuals"
.SH NAME
0MQ \- a lightweight messaging kernel
.SH SYNOPSIS
.SH DESCRIPTION
.SH "SEE ALSO"
.SH AUTHOR
Martin Sustrik <sustrik at fastmq dot com>
...@@ -79,6 +79,9 @@ int main (int argc, char *argv []) ...@@ -79,6 +79,9 @@ int main (int argc, char *argv [])
if (elapsed == 0) if (elapsed == 0)
elapsed = 1; elapsed = 1;
rc = zmq_msg_close (&msg);
assert (rc == 0);
throughput = (unsigned long) throughput = (unsigned long)
((double) message_count / (double) elapsed * 1000000); ((double) message_count / (double) elapsed * 1000000);
megabits = (double) (throughput * message_size * 8) / 1000000; megabits = (double) (throughput * message_size * 8) / 1000000;
......
...@@ -77,6 +77,7 @@ libzmq_la_SOURCES = app_thread.hpp \ ...@@ -77,6 +77,7 @@ libzmq_la_SOURCES = app_thread.hpp \
decoder.hpp \ decoder.hpp \
devpoll.hpp \ devpoll.hpp \
dispatcher.hpp \ dispatcher.hpp \
downstream.hpp \
encoder.hpp \ encoder.hpp \
epoll.hpp \ epoll.hpp \
err.hpp \ err.hpp \
...@@ -117,6 +118,7 @@ libzmq_la_SOURCES = app_thread.hpp \ ...@@ -117,6 +118,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.hpp \ tcp_listener.hpp \
tcp_socket.hpp \ tcp_socket.hpp \
thread.hpp \ thread.hpp \
upstream.hpp \
uuid.hpp \ uuid.hpp \
windows.hpp \ windows.hpp \
wire.hpp \ wire.hpp \
...@@ -135,6 +137,7 @@ libzmq_la_SOURCES = app_thread.hpp \ ...@@ -135,6 +137,7 @@ libzmq_la_SOURCES = app_thread.hpp \
app_thread.cpp \ app_thread.cpp \
devpoll.cpp \ devpoll.cpp \
dispatcher.cpp \ dispatcher.cpp \
downstream.cpp \
epoll.cpp \ epoll.cpp \
err.cpp \ err.cpp \
fd_signaler.cpp \ fd_signaler.cpp \
...@@ -162,6 +165,7 @@ libzmq_la_SOURCES = app_thread.hpp \ ...@@ -162,6 +165,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.cpp \ tcp_listener.cpp \
tcp_socket.cpp \ tcp_socket.cpp \
thread.cpp \ thread.cpp \
upstream.cpp \
uuid.cpp \ uuid.cpp \
ypollset.cpp \ ypollset.cpp \
zmq.cpp \ zmq.cpp \
......
...@@ -40,11 +40,13 @@ ...@@ -40,11 +40,13 @@
#include "pipe.hpp" #include "pipe.hpp"
#include "config.hpp" #include "config.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
#include "p2p.hpp"
#include "pub.hpp" #include "pub.hpp"
#include "sub.hpp" #include "sub.hpp"
#include "req.hpp" #include "req.hpp"
#include "rep.hpp" #include "rep.hpp"
#include "p2p.hpp" #include "upstream.hpp"
#include "downstream.hpp"
// If the RDTSC is available we use it to prevent excessive // If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any // polling for commands. The nice thing here is that it will work on any
...@@ -158,6 +160,9 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) ...@@ -158,6 +160,9 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
{ {
socket_base_t *s = NULL; socket_base_t *s = NULL;
switch (type_) { switch (type_) {
case ZMQ_P2P:
s = new p2p_t (this);
break;
case ZMQ_PUB: case ZMQ_PUB:
s = new pub_t (this); s = new pub_t (this);
break; break;
...@@ -170,8 +175,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) ...@@ -170,8 +175,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
case ZMQ_REP: case ZMQ_REP:
s = new rep_t (this); s = new rep_t (this);
break; break;
case ZMQ_P2P: case ZMQ_UPSTREAM:
s = new p2p_t (this); s = new upstream_t (this);
break;
case ZMQ_DOWNSTREAM:
s = new downstream_t (this);
break; break;
default: default:
// TODO: This should be EINVAL. // TODO: This should be EINVAL.
......
...@@ -69,10 +69,12 @@ namespace zmq ...@@ -69,10 +69,12 @@ namespace zmq
} attach; } attach;
// Sent from session to socket to establish pipe(s) between them. // Sent from session to socket to establish pipe(s) between them.
// If adjust_seqnum is true, caller have used inc_seqnum beforehand
// and thus the callee should take care of catching up.
struct { struct {
class owned_t *session;
class reader_t *in_pipe; class reader_t *in_pipe;
class writer_t *out_pipe; class writer_t *out_pipe;
bool adjust_seqnum;
} bind; } bind;
// Sent by pipe writer to inform dormant pipe reader that there // Sent by pipe writer to inform dormant pipe reader that there
......
...@@ -37,7 +37,8 @@ ...@@ -37,7 +37,8 @@
#include "config.hpp" #include "config.hpp"
#include "i_poll_events.hpp" #include "i_poll_events.hpp"
zmq::devpoll_t::devpoll_t () zmq::devpoll_t::devpoll_t () :
stopping (false)
{ {
// Get limit on open files // Get limit on open files
struct rlimit rl; struct rlimit rl;
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "../bindings/c/zmq.h" #include "../bindings/c/zmq.h"
#include "dispatcher.hpp" #include "dispatcher.hpp"
#include "socket_base.hpp"
#include "app_thread.hpp" #include "app_thread.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "platform.hpp" #include "platform.hpp"
...@@ -202,3 +203,58 @@ void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_) ...@@ -202,3 +203,58 @@ void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_)
zmq_assert (erased == 1); zmq_assert (erased == 1);
pipes_sync.unlock (); pipes_sync.unlock ();
} }
int zmq::dispatcher_t::register_endpoint (const char *addr_,
socket_base_t *socket_)
{
endpoints_sync.lock ();
bool inserted = endpoints.insert (std::make_pair (addr_, socket_)).second;
if (!inserted) {
errno = EADDRINUSE;
endpoints_sync.unlock ();
return -1;
}
endpoints_sync.unlock ();
return 0;
}
void zmq::dispatcher_t::unregister_endpoints (socket_base_t *socket_)
{
endpoints_sync.lock ();
endpoints_t::iterator it = endpoints.begin ();
while (it != endpoints.end ()) {
if (it->second == socket_) {
endpoints_t::iterator to_erase = it;
it++;
endpoints.erase (to_erase);
continue;
}
it++;
}
endpoints_sync.unlock ();
}
zmq::socket_base_t *zmq::dispatcher_t::find_endpoint (const char *addr_)
{
endpoints_sync.lock ();
endpoints_t::iterator it = endpoints.find (addr_);
if (it == endpoints.end ()) {
endpoints_sync.unlock ();
errno = ECONNREFUSED;
return NULL;
}
socket_base_t *endpoint = it->second;
// Increment the command sequence number of the peer so that it won't
// get deallocated until "bind" command is issued by the caller.
endpoint->inc_seqnum ();
endpoints_sync.unlock ();
return endpoint;
}
...@@ -97,6 +97,11 @@ namespace zmq ...@@ -97,6 +97,11 @@ namespace zmq
void register_pipe (class pipe_t *pipe_); void register_pipe (class pipe_t *pipe_);
void unregister_pipe (class pipe_t *pipe_); void unregister_pipe (class pipe_t *pipe_);
// Management of inproc endpoints.
int register_endpoint (const char *addr_, class socket_base_t *socket_);
void unregister_endpoints (class socket_base_t *socket_);
class socket_base_t *find_endpoint (const char *addr_);
private: private:
~dispatcher_t (); ~dispatcher_t ();
...@@ -149,6 +154,13 @@ namespace zmq ...@@ -149,6 +154,13 @@ namespace zmq
// and 'terminated' flag). // and 'terminated' flag).
mutex_t term_sync; mutex_t term_sync;
// List of inproc endpoints within this context.
typedef std::map <std::string, class socket_base_t*> endpoints_t;
endpoints_t endpoints;
// Synchronisation of access to the list of inproc endpoints.
mutex_t endpoints_sync;
dispatcher_t (const dispatcher_t&); dispatcher_t (const dispatcher_t&);
void operator = (const dispatcher_t&); void operator = (const dispatcher_t&);
}; };
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../bindings/c/zmq.h"
#include "downstream.hpp"
#include "err.hpp"
#include "pipe.hpp"
zmq::downstream_t::downstream_t (class app_thread_t *parent_) :
socket_base_t (parent_),
current (0)
{
options.requires_in = false;
options.requires_out = true;
}
zmq::downstream_t::~downstream_t ()
{
}
void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
zmq_assert (!inpipe_ && outpipe_);
pipes.push_back (outpipe_);
}
void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_)
{
// There are no inpipes, so this function shouldn't be called at all.
zmq_assert (false);
}
void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_)
{
zmq_assert (pipe_);
pipes.erase (pipes.index (pipe_));
}
void zmq::downstream_t::xkill (class reader_t *pipe_)
{
// There are no inpipes, so this function shouldn't be called at all.
zmq_assert (false);
}
void zmq::downstream_t::xrevive (class reader_t *pipe_)
{
// There are no inpipes, so this function shouldn't be called at all.
zmq_assert (false);
}
int zmq::downstream_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
// No special option for this socket type.
errno = EINVAL;
return -1;
}
int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_)
{
// If there are no pipes we cannot send the message.
if (pipes.empty ()) {
errno = EAGAIN;
return -1;
}
// Move to the next pipe (load-balancing).
current++;
if (current >= pipes.size ())
current = 0;
// TODO: Implement this once queue limits are in-place.
zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_)));
// Push message to the selected pipe.
pipes [current]->write (msg_);
pipes [current]->flush ();
// Detach the message from the data buffer.
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0;
}
int zmq::downstream_t::xflush ()
{
// TODO: Maybe there's a point in flushing messages downstream.
// It may be useful in the case where number of messages in a single
// transaction is much greater than the number of attached pipes.
errno = ENOTSUP;
return -1;
}
int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_)
{
errno = ENOTSUP;
return -1;
}
bool zmq::downstream_t::xhas_in ()
{
return false;
}
bool zmq::downstream_t::xhas_out ()
{
// TODO: Modify this code once pipe limits are in place.
return true;
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__
#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
namespace zmq
{
class downstream_t : public socket_base_t
{
public:
downstream_t (class app_thread_t *parent_);
~downstream_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
private:
// List of outbound pipes.
typedef yarray_t <class writer_t> pipes_t;
pipes_t pipes;
// Points to the last pipe that the most recent message was sent to.
pipes_t::size_type current;
downstream_t (const downstream_t&);
void operator = (const downstream_t&);
};
}
#endif
...@@ -33,7 +33,8 @@ ...@@ -33,7 +33,8 @@
#include "config.hpp" #include "config.hpp"
#include "i_poll_events.hpp" #include "i_poll_events.hpp"
zmq::kqueue_t::kqueue_t () zmq::kqueue_t::kqueue_t () :
stopping (false)
{ {
// Create event queue // Create event queue
kqueue_fd = kqueue (); kqueue_fd = kqueue ();
......
...@@ -83,8 +83,8 @@ void zmq::object_t::process_command (command_t &cmd_) ...@@ -83,8 +83,8 @@ void zmq::object_t::process_command (command_t &cmd_)
return; return;
case command_t::bind: case command_t::bind:
process_bind (cmd_.args.bind.session, process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe,
cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe); cmd_.args.bind.adjust_seqnum);
return; return;
case command_t::pipe_term: case command_t::pipe_term:
...@@ -122,6 +122,21 @@ void zmq::object_t::unregister_pipe (class pipe_t *pipe_) ...@@ -122,6 +122,21 @@ void zmq::object_t::unregister_pipe (class pipe_t *pipe_)
dispatcher->unregister_pipe (pipe_); dispatcher->unregister_pipe (pipe_);
} }
int zmq::object_t::register_endpoint (const char *addr_, socket_base_t *socket_)
{
return dispatcher->register_endpoint (addr_, socket_);
}
void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
{
return dispatcher->unregister_endpoints (socket_);
}
zmq::socket_base_t *zmq::object_t::find_endpoint (const char *addr_)
{
return dispatcher->find_endpoint (addr_);
}
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
{ {
return dispatcher->choose_io_thread (taskset_); return dispatcher->choose_io_thread (taskset_);
...@@ -168,15 +183,15 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_) ...@@ -168,15 +183,15 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_)
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_bind (object_t *destination_, owned_t *session_, void zmq::object_t::send_bind (object_t *destination_,
reader_t *in_pipe_, writer_t *out_pipe_) reader_t *in_pipe_, writer_t *out_pipe_, bool adjust_seqnum_)
{ {
command_t cmd; command_t cmd;
cmd.destination = destination_; cmd.destination = destination_;
cmd.type = command_t::bind; cmd.type = command_t::bind;
cmd.args.bind.session = session_;
cmd.args.bind.in_pipe = in_pipe_; cmd.args.bind.in_pipe = in_pipe_;
cmd.args.bind.out_pipe = out_pipe_; cmd.args.bind.out_pipe = out_pipe_;
cmd.args.bind.adjust_seqnum = adjust_seqnum_;
send_command (cmd); send_command (cmd);
} }
...@@ -250,8 +265,8 @@ void zmq::object_t::process_attach (i_engine *engine_) ...@@ -250,8 +265,8 @@ void zmq::object_t::process_attach (i_engine *engine_)
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_bind (owned_t *session_, void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
reader_t *in_pipe_, writer_t *out_pipe_) bool adjust_seqnum_)
{ {
zmq_assert (false); zmq_assert (false);
} }
......
...@@ -49,6 +49,12 @@ namespace zmq ...@@ -49,6 +49,12 @@ namespace zmq
protected: protected:
// Using following function, socket is able to access global
// repository of inproc endpoints.
int register_endpoint (const char *addr_, class socket_base_t *socket_);
void unregister_endpoints (class socket_base_t *socket_);
class socket_base_t *find_endpoint (const char *addr_);
// Derived object can use following functions to interact with // Derived object can use following functions to interact with
// global repositories. See dispatcher.hpp for function details. // global repositories. See dispatcher.hpp for function details.
int thread_slot_count (); int thread_slot_count ();
...@@ -62,8 +68,8 @@ namespace zmq ...@@ -62,8 +68,8 @@ namespace zmq
class owned_t *object_); class owned_t *object_);
void send_attach (class session_t *destination_, void send_attach (class session_t *destination_,
struct i_engine *engine_); struct i_engine *engine_);
void send_bind (object_t *destination_, class owned_t *session_, void send_bind (object_t *destination_, class reader_t *in_pipe_,
class reader_t *in_pipe_, class writer_t *out_pipe_); class writer_t *out_pipe_, bool adjust_seqnum_);
void send_revive (class object_t *destination_); void send_revive (class object_t *destination_);
void send_pipe_term (class writer_t *destination_); void send_pipe_term (class writer_t *destination_);
void send_pipe_term_ack (class reader_t *destination_); void send_pipe_term_ack (class reader_t *destination_);
...@@ -78,8 +84,8 @@ namespace zmq ...@@ -78,8 +84,8 @@ namespace zmq
virtual void process_plug (); virtual void process_plug ();
virtual void process_own (class owned_t *object_); virtual void process_own (class owned_t *object_);
virtual void process_attach (struct i_engine *engine_); virtual void process_attach (struct i_engine *engine_);
virtual void process_bind (class owned_t *session_, virtual void process_bind (class reader_t *in_pipe_,
class reader_t *in_pipe_, class writer_t *out_pipe_); class writer_t *out_pipe_, bool adjust_seqnum_);
virtual void process_revive (); virtual void process_revive ();
virtual void process_pipe_term (); virtual void process_pipe_term ();
virtual void process_pipe_term_ack (); virtual void process_pipe_term_ack ();
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef __ZMQ_P2P_INCLUDED__ #ifndef __ZMQ_P2P_HPP_INCLUDED__
#define __ZMQ_P2P_INCLUDED__ #define __ZMQ_P2P_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
......
...@@ -81,7 +81,11 @@ void zmq::reader_t::term () ...@@ -81,7 +81,11 @@ void zmq::reader_t::term ()
void zmq::reader_t::process_revive () void zmq::reader_t::process_revive ()
{ {
endpoint->revive (this); // Beacuse of command throttling mechanism, incoming termination request
// may not have been processed before subsequent send.
// In that case endpoint is NULL.
if (endpoint)
endpoint->revive (this);
} }
void zmq::reader_t::process_pipe_term_ack () void zmq::reader_t::process_pipe_term_ack ()
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef __ZMQ_PUB_INCLUDED__ #ifndef __ZMQ_PUB_HPP_INCLUDED__
#define __ZMQ_PUB_INCLUDED__ #define __ZMQ_PUB_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "yarray.hpp" #include "yarray.hpp"
......
...@@ -71,7 +71,7 @@ void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_) ...@@ -71,7 +71,7 @@ void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_)
} }
// Now both inpipe and outpipe are detached. Remove them from the lists. // Now both inpipe and outpipe are detached. Remove them from the lists.
if (in_pipes.index (pipe_) < active) if (index < active)
active--; active--;
in_pipes.erase (index); in_pipes.erase (index);
out_pipes.erase (index); out_pipes.erase (index);
...@@ -178,14 +178,15 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) ...@@ -178,14 +178,15 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
// Round-robin over the pipes to get next message. // Round-robin over the pipes to get next message.
for (int count = active; count != 0; count--) { for (int count = active; count != 0; count--) {
bool fetched = in_pipes [current]->read (msg_); bool fetched = in_pipes [current]->read (msg_);
current++;
if (current >= active)
current = 0;
if (fetched) { if (fetched) {
reply_pipe = out_pipes [current]; reply_pipe = out_pipes [current];
waiting_for_reply = true; waiting_for_reply = true;
return 0;
} }
current++;
if (current >= active)
current = 0;
if (fetched)
return 0;
} }
// No message is available. Initialise the output parameter // No message is available. Initialise the output parameter
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef __ZMQ_REP_INCLUDED__ #ifndef __ZMQ_REP_HPP_INCLUDED__
#define __ZMQ_REP_INCLUDED__ #define __ZMQ_REP_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "yarray.hpp" #include "yarray.hpp"
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef __ZMQ_REQ_INCLUDED__ #ifndef __ZMQ_REQ_HPP_INCLUDED__
#define __ZMQ_REQ_INCLUDED__ #define __ZMQ_REQ_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "yarray.hpp" #include "yarray.hpp"
......
...@@ -51,10 +51,6 @@ bool zmq::session_t::read (::zmq_msg_t *msg_) ...@@ -51,10 +51,6 @@ bool zmq::session_t::read (::zmq_msg_t *msg_)
bool zmq::session_t::write (::zmq_msg_t *msg_) bool zmq::session_t::write (::zmq_msg_t *msg_)
{ {
// The communication is unidirectional.
// We don't expect any message to arrive.
zmq_assert (out_pipe);
if (out_pipe->write (msg_)) { if (out_pipe->write (msg_)) {
zmq_msg_init (msg_); zmq_msg_init (msg_);
return true; return true;
...@@ -155,8 +151,10 @@ void zmq::session_t::process_plug () ...@@ -155,8 +151,10 @@ void zmq::session_t::process_plug ()
out_pipe->set_endpoint (this); out_pipe->set_endpoint (this);
} }
send_bind (owner, this, outbound ? &outbound->reader : NULL, // Note that initial call to inc_seqnum was optimised out. Last
inbound ? &inbound->writer : NULL); // parameter conveys the fact to the callee.
send_bind (owner, outbound ? &outbound->reader : NULL,
inbound ? &inbound->writer : NULL, false);
} }
owned_t::process_plug (); owned_t::process_plug ();
......
...@@ -23,7 +23,11 @@ ...@@ -23,7 +23,11 @@
#include "platform.hpp" #include "platform.hpp"
#include "err.hpp" #include "err.hpp"
#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS #if 0 //defined ZMQ_HAVE_LINUX
#include <sys/syscall.h>
#include <unistd.h>
#include <linux/futex.h>
#elif defined ZMQ_HAVE_LINUX ||defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS
#include <pthread.h> #include <pthread.h>
#elif defined ZMQ_HAVE_WINDOWS #elif defined ZMQ_HAVE_WINDOWS
#include "windows.hpp" #include "windows.hpp"
...@@ -33,13 +37,63 @@ ...@@ -33,13 +37,63 @@
namespace zmq namespace zmq
{ {
// Simple semaphore. Only single thread may be waiting at any given time. // Simple semaphore. Only single thread may be waiting at any given time.
// Also, the semaphore may not be posted before the previous post // Also, the semaphore may not be posted before the previous post
// was matched by corresponding wait and the waiting thread was // was matched by corresponding wait and the waiting thread was
// released. // released.
#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS #if 0 //defined ZMQ_HAVE_LINUX
// In theory, using private futexes should be more efficient on Linux
// platform than using mutexes. However, in uncontended cases of TCP
// transport on loopback interface we haven't seen any latency improvement.
// The code is commented out waiting for more thorough testing.
class simple_semaphore_t
{
public:
// Initialise the semaphore.
inline simple_semaphore_t () :
dummy (0)
{
}
// Destroy the semaphore.
inline ~simple_semaphore_t ()
{
}
// Wait for the semaphore.
inline void wait ()
{
int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAIT_PRIVATE,
(int) 0, NULL, NULL, (int) 0);
zmq_assert (rc == 0);
}
// Post the semaphore.
inline void post ()
{
while (true) {
int rc = syscall (SYS_futex, &dummy, (int) FUTEX_WAKE_PRIVATE,
(int) 1, NULL, NULL, (int) 0);
zmq_assert (rc != -1 && rc <= 1);
if (rc == 1)
break;
}
}
private:
int dummy;
// Disable copying of the object.
simple_semaphore_t (const simple_semaphore_t&);
void operator = (const simple_semaphore_t&);
};
#elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_OPENVMS
// On platforms that allow for double locking of a mutex from the same // On platforms that allow for double locking of a mutex from the same
// thread, simple semaphore is implemented using mutex, as it is more // thread, simple semaphore is implemented using mutex, as it is more
......
...@@ -43,7 +43,9 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) : ...@@ -43,7 +43,9 @@ zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
pending_term_acks (0), pending_term_acks (0),
ticks (0), ticks (0),
app_thread (parent_), app_thread (parent_),
shutting_down (false) shutting_down (false),
sent_seqnum (0),
processed_seqnum (0)
{ {
} }
...@@ -81,6 +83,9 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -81,6 +83,9 @@ int zmq::socket_base_t::bind (const char *addr_)
addr_type = addr.substr (0, pos); addr_type = addr.substr (0, pos);
addr_args = addr.substr (pos + 3); addr_args = addr.substr (pos + 3);
if (addr_type == "inproc")
return register_endpoint (addr_args.c_str (), this);
if (addr_type == "tcp") { if (addr_type == "tcp") {
zmq_listener_t *listener = new zmq_listener_t ( zmq_listener_t *listener = new zmq_listener_t (
choose_io_thread (options.affinity), this, options); choose_io_thread (options.affinity), this, options);
...@@ -126,6 +131,41 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -126,6 +131,41 @@ int zmq::socket_base_t::connect (const char *addr_)
addr_type = addr.substr (0, pos); addr_type = addr.substr (0, pos);
addr_args = addr.substr (pos + 3); addr_args = addr.substr (pos + 3);
if (addr_type == "inproc") {
// Find the peer socket.
socket_base_t *peer = find_endpoint (addr_args.c_str ());
if (!peer)
return -1;
pipe_t *in_pipe = NULL;
pipe_t *out_pipe = NULL;
// Create inbound pipe, if required.
if (options.requires_in) {
in_pipe = new pipe_t (this, peer, options.hwm, options.lwm);
zmq_assert (in_pipe);
}
// Create outbound pipe, if required.
if (options.requires_out) {
out_pipe = new pipe_t (peer, this, options.hwm, options.lwm);
zmq_assert (out_pipe);
}
// Attach the pipes to this socket object.
attach_pipes (in_pipe ? &in_pipe->reader : NULL,
out_pipe ? &out_pipe->writer : NULL);
// Attach the pipes to the peer socket. Note that peer's seqnum
// was incremented in find_endpoint function. The callee is notified
// about the fact via the last parameter.
send_bind (peer, out_pipe ? &out_pipe->reader : NULL,
in_pipe ? &in_pipe->writer : NULL, true);
return 0;
}
// Create the session. // Create the session.
io_thread_t *io_thread = choose_io_thread (options.affinity); io_thread_t *io_thread = choose_io_thread (options.affinity);
session_t *session = new session_t (io_thread, this, session_name.c_str (), session_t *session = new session_t (io_thread, this, session_name.c_str (),
...@@ -319,13 +359,24 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) ...@@ -319,13 +359,24 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
int zmq::socket_base_t::close () int zmq::socket_base_t::close ()
{ {
shutting_down = true;
// Let the thread know that the socket is no longer available.
app_thread->remove_socket (this); app_thread->remove_socket (this);
// Pointer to the dispatcher must be retrieved before the socket is // Pointer to the dispatcher must be retrieved before the socket is
// deallocated. Afterwards it is not available. // deallocated. Afterwards it is not available.
dispatcher_t *dispatcher = get_dispatcher (); dispatcher_t *dispatcher = get_dispatcher ();
shutting_down = true; // Unregister all inproc endpoints associated with this socket.
// From this point we are sure that inc_seqnum won't be called again
// on this object.
dispatcher->unregister_endpoints (this);
// Wait till all undelivered commands are delivered. This should happen
// very quickly. There's no way to wait here for extensive period of time.
while (processed_seqnum != sent_seqnum.get ())
app_thread->process_commands (true, false);
while (true) { while (true) {
...@@ -364,6 +415,12 @@ int zmq::socket_base_t::close () ...@@ -364,6 +415,12 @@ int zmq::socket_base_t::close ()
return 0; return 0;
} }
void zmq::socket_base_t::inc_seqnum ()
{
// NB: This function may be called from a different thread!
sent_seqnum.add (1);
}
zmq::app_thread_t *zmq::socket_base_t::get_thread () zmq::app_thread_t *zmq::socket_base_t::get_thread ()
{ {
return app_thread; return app_thread;
...@@ -452,9 +509,16 @@ void zmq::socket_base_t::process_own (owned_t *object_) ...@@ -452,9 +509,16 @@ void zmq::socket_base_t::process_own (owned_t *object_)
io_objects.insert (object_); io_objects.insert (object_);
} }
void zmq::socket_base_t::process_bind (owned_t *session_, void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
reader_t *in_pipe_, writer_t *out_pipe_) bool adjust_seqnum_)
{ {
// In case of inproc transport, the seqnum should catch up here.
// For other transports the seqnum modification can be optimised out
// because final handshaking between the socket and the session ensures
// that no 'bind' command will be left unprocessed.
if (adjust_seqnum_)
processed_seqnum++;
attach_pipes (in_pipe_, out_pipe_); attach_pipes (in_pipe_, out_pipe_);
} }
......
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
#include "mutex.hpp" #include "mutex.hpp"
#include "options.hpp" #include "options.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "atomic_counter.hpp"
namespace zmq namespace zmq
{ {
...@@ -54,6 +55,11 @@ namespace zmq ...@@ -54,6 +55,11 @@ namespace zmq
int recv (zmq_msg_t *msg_, int flags_); int recv (zmq_msg_t *msg_, int flags_);
int close (); int close ();
// When another owned object wants to send command to this object
// it calls this function to let it know it should not shut down
// before the command is delivered.
void inc_seqnum ();
// This function is used by the polling mechanism to determine // This function is used by the polling mechanism to determine
// whether the socket belongs to the application thread the poll // whether the socket belongs to the application thread the poll
// is called from. // is called from.
...@@ -108,8 +114,8 @@ namespace zmq ...@@ -108,8 +114,8 @@ namespace zmq
// Handlers for incoming commands. // Handlers for incoming commands.
void process_own (class owned_t *object_); void process_own (class owned_t *object_);
void process_bind (class owned_t *session_, void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_,
class reader_t *in_pipe_, class writer_t *out_pipe_); bool adjust_seqnum_);
void process_term_req (class owned_t *object_); void process_term_req (class owned_t *object_);
void process_term_ack (); void process_term_ack ();
...@@ -132,6 +138,12 @@ namespace zmq ...@@ -132,6 +138,12 @@ namespace zmq
// started. // started.
bool shutting_down; bool shutting_down;
// Sequence number of the last command sent to this object.
atomic_counter_t sent_seqnum;
// Sequence number of the last command processed by this object.
uint64_t processed_seqnum;
// List of existing sessions. This list is never referenced from within // List of existing sessions. This list is never referenced from within
// the socket, instead it is used by I/O objects owned by the session. // the socket, instead it is used by I/O objects owned by the session.
// As those objects can live in different threads, the access is // As those objects can live in different threads, the access is
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef __ZMQ_SUB_INCLUDED__ #ifndef __ZMQ_SUB_HPP_INCLUDED__
#define __ZMQ_SUB_INCLUDED__ #define __ZMQ_SUB_HPP_INCLUDED__
#include <set> #include <set>
#include <string> #include <string>
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../bindings/c/zmq.h"
#include "upstream.hpp"
#include "err.hpp"
#include "pipe.hpp"
zmq::upstream_t::upstream_t (class app_thread_t *parent_) :
socket_base_t (parent_),
active (0),
current (0)
{
options.requires_in = true;
options.requires_out = false;
}
zmq::upstream_t::~upstream_t ()
{
}
void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
zmq_assert (inpipe_ && !outpipe_);
pipes.push_back (inpipe_);
pipes.swap (active, pipes.size () - 1);
active++;
}
void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_)
{
// Remove the pipe from the list; adjust number of active pipes
// accordingly.
zmq_assert (pipe_);
pipes_t::size_type index = pipes.index (pipe_);
if (index < active)
active--;
pipes.erase (index);
}
void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_)
{
// There are no outpipes, so this function shouldn't be called at all.
zmq_assert (false);
}
void zmq::upstream_t::xkill (class reader_t *pipe_)
{
// Move the pipe to the list of inactive pipes.
active--;
pipes.swap (pipes.index (pipe_), active);
}
void zmq::upstream_t::xrevive (class reader_t *pipe_)
{
// Move the pipe to the list of active pipes.
pipes.swap (pipes.index (pipe_), active);
active++;
}
int zmq::upstream_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
// No special options for this socket type.
errno = EINVAL;
return -1;
}
int zmq::upstream_t::xsend (zmq_msg_t *msg_, int flags_)
{
errno = ENOTSUP;
return -1;
}
int zmq::upstream_t::xflush ()
{
errno = ENOTSUP;
return -1;
}
int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_)
{
// Deallocate old content of the message.
zmq_msg_close (msg_);
// Round-robin over the pipes to get next message.
for (int count = active; count != 0; count--) {
bool fetched = pipes [current]->read (msg_);
current++;
if (current >= active)
current = 0;
if (fetched)
return 0;
}
// No message is available. Initialise the output parameter
// to be a 0-byte message.
zmq_msg_init (msg_);
errno = EAGAIN;
return -1;
}
bool zmq::upstream_t::xhas_in ()
{
// Note that messing with current doesn't break the fairness of fair
// queueing algorithm. If there are no messages available current will
// get back to its original value. Otherwise it'll point to the first
// pipe holding messages, skipping only pipes with no messages available.
for (int count = active; count != 0; count--) {
if (pipes [current]->check_read ())
return true;
current++;
if (current >= active)
current = 0;
}
return false;
}
bool zmq::upstream_t::xhas_out ()
{
return false;
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__
#define __ZMQ_UPSTREAM_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
namespace zmq
{
class upstream_t : public socket_base_t
{
public:
upstream_t (class app_thread_t *parent_);
~upstream_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
private:
// Inbound pipes.
typedef yarray_t <class reader_t> pipes_t;
pipes_t pipes;
// Number of active pipes. All the active pipes are located at the
// beginning of the pipes array.
pipes_t::size_type active;
// Index of the next bound pipe to read a message from.
pipes_t::size_type current;
upstream_t (const upstream_t&);
void operator = (const upstream_t&);
};
}
#endif
...@@ -198,8 +198,10 @@ size_t zmq_msg_size (zmq_msg_t *msg_) ...@@ -198,8 +198,10 @@ size_t zmq_msg_size (zmq_msg_t *msg_)
void *zmq_init (int app_threads_, int io_threads_, int flags_) void *zmq_init (int app_threads_, int io_threads_, int flags_)
{ {
// There should be at least a single thread managed by the dispatcher. // There should be at least a single application thread managed
if (app_threads_ <= 0 || io_threads_ <= 0 || // by the dispatcher. There's no need for I/O threads if 0MQ is used
// only for inproc messaging
if (app_threads_ < 1 || io_threads_ < 0 ||
app_threads_ > 63 || io_threads_ > 63) { app_threads_ > 63 || io_threads_ > 63) {
errno = EINVAL; errno = EINVAL;
return NULL; return NULL;
......
...@@ -51,6 +51,9 @@ bool zmq::zmq_decoder_t::one_byte_size_ready () ...@@ -51,6 +51,9 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
else { else {
// TODO: Handle over-sized message decently. // TODO: Handle over-sized message decently.
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
int rc = zmq_msg_init_size (&in_progress, *tmpbuf); int rc = zmq_msg_init_size (&in_progress, *tmpbuf);
errno_assert (rc == 0); errno_assert (rc == 0);
...@@ -67,6 +70,9 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () ...@@ -67,6 +70,9 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
size_t size = (size_t) get_uint64 (tmpbuf); size_t size = (size_t) get_uint64 (tmpbuf);
// TODO: Handle over-sized message decently. // TODO: Handle over-sized message decently.
// in_progress is initialised at this point so in theory we should
// close it before calling zmq_msg_init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
int rc = zmq_msg_init_size (&in_progress, size); int rc = zmq_msg_init_size (&in_progress, size);
errno_assert (rc == 0); errno_assert (rc == 0);
...@@ -78,7 +84,7 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () ...@@ -78,7 +84,7 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
bool zmq::zmq_decoder_t::message_ready () bool zmq::zmq_decoder_t::message_ready ()
{ {
// Message is completely read. Push it further and start reading // Message is completely read. Push it further and start reading
// new message. // new message. (in_progress is a 0-byte message after this point.)
if (!destination || !destination->write (&in_progress)) if (!destination || !destination->write (&in_progress))
return false; return false;
......
...@@ -50,12 +50,17 @@ bool zmq::zmq_encoder_t::size_ready () ...@@ -50,12 +50,17 @@ bool zmq::zmq_encoder_t::size_ready ()
bool zmq::zmq_encoder_t::message_ready () bool zmq::zmq_encoder_t::message_ready ()
{ {
// Destroy content of the old message.
zmq_msg_close(&in_progress);
// Read new message from the dispatcher. If there is none, return false. // Read new message from the dispatcher. If there is none, return false.
// Note that new state is set only if write is successful. That way // Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine // unsuccessful write will cause retry on the next state machine
// invocation. // invocation.
if (!source || !source->read (&in_progress)) if (!source || !source->read (&in_progress)) {
zmq_msg_init (&in_progress);
return false; return false;
}
size_t size = zmq_msg_size (&in_progress); size_t size = zmq_msg_size (&in_progress);
......
...@@ -55,7 +55,6 @@ bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_) ...@@ -55,7 +55,6 @@ bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_)
has_peer_identity = true; has_peer_identity = true;
peer_identity.assign ((const char*) zmq_msg_data (msg_), peer_identity.assign ((const char*) zmq_msg_data (msg_),
zmq_msg_size (msg_)); zmq_msg_size (msg_));
return true; return true;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment