Commit b4f5ee58 authored by Martin Lucina's avatar Martin Lucina

Merge branch 'master' of github.com:zeromq/libzmq

parents 90d0435b fc17bd41
syntax: glob # for hg-git users
Makefile Makefile
Makefile.in Makefile.in
configure configure
......
[patterns]
** = native
...@@ -59,11 +59,13 @@ Pavel Gushcha <pavimus@gmail.com> ...@@ -59,11 +59,13 @@ Pavel Gushcha <pavimus@gmail.com>
Pavol Malosek <malosek@fastmq.com> Pavol Malosek <malosek@fastmq.com>
Perry Kundert <perry@kundert.ca> Perry Kundert <perry@kundert.ca>
Peter Bourgon <peter.bourgon@gmail.com> Peter Bourgon <peter.bourgon@gmail.com>
Philip Kovacs <phil@philkovacs.com>
Pieter Hintjens <ph@imatix.com> Pieter Hintjens <ph@imatix.com>
Piotr Trojanek <piotr.trojanek@gmail.com> Piotr Trojanek <piotr.trojanek@gmail.com>
Robert G. Jakabosky <bobby@sharedrealm.com> Robert G. Jakabosky <bobby@sharedrealm.com>
Sebastian Otaegui <feniix@gmail.com> Sebastian Otaegui <feniix@gmail.com>
Steven McCoy <steven.mccoy@miru.hk> Steven McCoy <steven.mccoy@miru.hk>
Stuart Webster <sw_webster@hotmail.com>
Tamara Kustarova <kustarova.tamara@gmail.com> Tamara Kustarova <kustarova.tamara@gmail.com>
Taras Shpot <taras.shpot@eleks.com> Taras Shpot <taras.shpot@eleks.com>
Tero Marttila <terom@fixme.fi> Tero Marttila <terom@fixme.fi>
......
...@@ -29,11 +29,11 @@ if BUILD_DOC ...@@ -29,11 +29,11 @@ if BUILD_DOC
SUFFIXES=.html .txt .xml .3 .7 SUFFIXES=.html .txt .xml .3 .7
.txt.html: .txt.html:
asciidoc -d manpage -b xhtml11 -f asciidoc.conf \ asciidoc -d manpage -b xhtml11 -f $(srcdir)/asciidoc.conf \
-azmq_version=@PACKAGE_VERSION@ $< -azmq_version=@PACKAGE_VERSION@ -o$@ $<
.txt.xml: .txt.xml:
asciidoc -d manpage -b docbook -f asciidoc.conf \ asciidoc -d manpage -b docbook -f $(srcdir)/asciidoc.conf \
-azmq_version=@PACKAGE_VERSION@ $< -azmq_version=@PACKAGE_VERSION@ -o$@ $<
.xml.1: .xml.1:
xmlto man $< xmlto man $<
.xml.3: .xml.3:
......
...@@ -331,7 +331,7 @@ ZMQ_IPV4ONLY: Retrieve IPv4-only socket override status ...@@ -331,7 +331,7 @@ ZMQ_IPV4ONLY: Retrieve IPv4-only socket override status
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Retrieve the underlying native socket type. A value of `1` will use IPv4 Retrieve the underlying native socket type. A value of `1` will use IPv4
sockets, while the default value of `0` will use IPv6 sockets. An IPv6 socket sockets, while the value of `0` will use IPv6 sockets. An IPv6 socket
lets applications connect to and accept connections from both IPv4 and IPv6 lets applications connect to and accept connections from both IPv4 and IPv6
hosts. hosts.
......
...@@ -338,7 +338,7 @@ ZMQ_IPV4ONLY: Use IPv4-only sockets ...@@ -338,7 +338,7 @@ ZMQ_IPV4ONLY: Use IPv4-only sockets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the underlying native socket type. A value of `1` will use IPv4 sockets, Sets the underlying native socket type. A value of `1` will use IPv4 sockets,
while the default value of `0` will use IPv6 sockets. An IPv6 socket lets while the value of `0` will use IPv6 sockets. An IPv6 socket lets
applications connect to and accept connections from both IPv4 and IPv6 hosts. applications connect to and accept connections from both IPv4 and IPv6 hosts.
[horizontal] [horizontal]
......
INCLUDES = -I$(top_builddir)/include INCLUDES = -I$(top_builddir)/include \
-I$(top_srcdir)/include
noinst_PROGRAMS = local_lat remote_lat local_thr remote_thr inproc_lat inproc_thr noinst_PROGRAMS = local_lat remote_lat local_thr remote_thr inproc_lat inproc_thr
......
...@@ -238,7 +238,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) ...@@ -238,7 +238,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
// two instances of the library don't accidentally create signaler // two instances of the library don't accidentally create signaler
// crossing the process boundary. // crossing the process boundary.
// We'll use named event object to implement the critical section. // We'll use named event object to implement the critical section.
HANDLE sync = CreateEvent (NULL, FALSE, FALSE, "zmq-signaler-port-sync"); HANDLE sync = CreateEvent (NULL, FALSE, TRUE, "zmq-signaler-port-sync");
win_assert (sync != NULL); win_assert (sync != NULL);
// Enter the critical section. // Enter the critical section.
......
...@@ -360,7 +360,7 @@ int zmq::stream_engine_t::read (void *data_, size_t size_) ...@@ -360,7 +360,7 @@ int zmq::stream_engine_t::read (void *data_, size_t size_)
// Signalise peer failure. // Signalise peer failure.
if (nbytes == -1 && (errno == ECONNRESET || errno == ECONNREFUSED || if (nbytes == -1 && (errno == ECONNRESET || errno == ECONNREFUSED ||
errno == ETIMEDOUT || errno == EHOSTUNREACH)) errno == ETIMEDOUT || errno == EHOSTUNREACH || errno == ENOTCONN))
return -1; return -1;
errno_assert (nbytes != -1); errno_assert (nbytes != -1);
......
...@@ -110,7 +110,7 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv4only_) ...@@ -110,7 +110,7 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv4only_)
return 0; return 0;
} }
#elif defined ZMQ_HAVE_AIX || ZMQ_HAVE_HPUX || ZMQ_HAVE_ANDROID #elif defined ZMQ_HAVE_AIX || defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_ANDROID
#include <sys/types.h> #include <sys/types.h>
#include <unistd.h> #include <unistd.h>
......
...@@ -189,14 +189,18 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) ...@@ -189,14 +189,18 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
return 0; return 0;
} }
// Get next message part. pipe_t *pipe = NULL;
pipe_t *pipe; while (true) {
int rc = fq.recvpipe (msg_, flags_, &pipe);
if (rc != 0) // Get next message part.
return -1; int rc = fq.recvpipe (msg_, flags_, &pipe);
if (rc != 0)
// If identity is received, change the key assigned to the pipe. return -1;
if (unlikely (msg_->flags () & msg_t::identity)) {
// If identity is received, change the key assigned to the pipe.
if (likely (!(msg_->flags () & msg_t::identity)))
break;
zmq_assert (!more_in); zmq_assert (!more_in);
// Empty identity means we can preserve the auto-generated identity. // Empty identity means we can preserve the auto-generated identity.
...@@ -219,11 +223,6 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) ...@@ -219,11 +223,6 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
} }
zmq_assert (it != outpipes.end ()); zmq_assert (it != outpipes.end ());
} }
// After processing the identity, try to get the next message.
rc = fq.recvpipe (msg_, flags_, &pipe);
if (rc != 0)
return -1;
} }
// If we are in the middle of reading a message, just return the next part. // If we are in the middle of reading a message, just return the next part.
...@@ -234,7 +233,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) ...@@ -234,7 +233,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
// We are at the beginning of a new message. Move the message part we // We are at the beginning of a new message. Move the message part we
// have to the prefetched and return the ID of the peer instead. // have to the prefetched and return the ID of the peer instead.
rc = prefetched_msg.move (*msg_); int rc = prefetched_msg.move (*msg_);
errno_assert (rc == 0); errno_assert (rc == 0);
prefetched = true; prefetched = true;
rc = msg_->close (); rc = msg_->close ();
...@@ -260,9 +259,17 @@ int zmq::xrep_t::rollback (void) ...@@ -260,9 +259,17 @@ int zmq::xrep_t::rollback (void)
bool zmq::xrep_t::xhas_in () bool zmq::xrep_t::xhas_in ()
{ {
// We may already have a message pre-fetched.
if (prefetched) if (prefetched)
return true; return true;
return fq.has_in ();
// Try to read the next message to the pre-fetch buffer.
int rc = xrep_t::xrecv (&prefetched_msg, ZMQ_DONTWAIT);
if (rc != 0 && errno == EAGAIN)
return false;
zmq_assert (rc == 0);
prefetched = true;
return true;
} }
bool zmq::xrep_t::xhas_out () bool zmq::xrep_t::xhas_out ()
......
...@@ -24,7 +24,8 @@ ...@@ -24,7 +24,8 @@
#include "msg.hpp" #include "msg.hpp"
zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) : zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_) socket_base_t (parent_, tid_),
prefetched (false)
{ {
options.type = ZMQ_XREQ; options.type = ZMQ_XREQ;
...@@ -36,10 +37,13 @@ zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) : ...@@ -36,10 +37,13 @@ zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) :
options.send_identity = true; options.send_identity = true;
options.recv_identity = true; options.recv_identity = true;
prefetched_msg.init ();
} }
zmq::xreq_t::~xreq_t () zmq::xreq_t::~xreq_t ()
{ {
prefetched_msg.close ();
} }
void zmq::xreq_t::xattach_pipe (pipe_t *pipe_) void zmq::xreq_t::xattach_pipe (pipe_t *pipe_)
...@@ -56,6 +60,14 @@ int zmq::xreq_t::xsend (msg_t *msg_, int flags_) ...@@ -56,6 +60,14 @@ int zmq::xreq_t::xsend (msg_t *msg_, int flags_)
int zmq::xreq_t::xrecv (msg_t *msg_, int flags_) int zmq::xreq_t::xrecv (msg_t *msg_, int flags_)
{ {
// If there is a prefetched message, return it.
if (prefetched) {
int rc = msg_->move (prefetched_msg);
errno_assert (rc == 0);
prefetched = false;
return 0;
}
// XREQ socket doesn't use identities. We can safely drop it and // XREQ socket doesn't use identities. We can safely drop it and
while (true) { while (true) {
int rc = fq.recv (msg_, flags_); int rc = fq.recv (msg_, flags_);
...@@ -69,7 +81,17 @@ int zmq::xreq_t::xrecv (msg_t *msg_, int flags_) ...@@ -69,7 +81,17 @@ int zmq::xreq_t::xrecv (msg_t *msg_, int flags_)
bool zmq::xreq_t::xhas_in () bool zmq::xreq_t::xhas_in ()
{ {
return fq.has_in (); // We may already have a message pre-fetched.
if (prefetched)
return true;
// Try to read the next message to the pre-fetch buffer.
int rc = xreq_t::xrecv (&prefetched_msg, ZMQ_DONTWAIT);
if (rc != 0 && errno == EAGAIN)
return false;
zmq_assert (rc == 0);
prefetched = true;
return true;
} }
bool zmq::xreq_t::xhas_out () bool zmq::xreq_t::xhas_out ()
......
...@@ -62,6 +62,12 @@ namespace zmq ...@@ -62,6 +62,12 @@ namespace zmq
fq_t fq; fq_t fq;
lb_t lb; lb_t lb;
// Have we prefetched a message.
bool prefetched;
// Holds the prefetched message.
msg_t prefetched_msg;
xreq_t (const xreq_t&); xreq_t (const xreq_t&);
const xreq_t &operator = (const xreq_t&); const xreq_t &operator = (const xreq_t&);
}; };
......
INCLUDES = -I$(top_builddir)/include INCLUDES = -I$(top_builddir)/include \
-I$(top_srcdir)/include
LDADD = $(top_builddir)/src/libzmq.la LDADD = $(top_builddir)/src/libzmq.la
noinst_PROGRAMS = test_pair_inproc \ noinst_PROGRAMS = test_pair_inproc \
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment