Commit d0461752 authored by Lourens Naudé's avatar Lourens Naudé

Merge branch 'master' into events

parents c38aecdc 70b067ec
...@@ -39,6 +39,7 @@ tests/test_invalid_rep ...@@ -39,6 +39,7 @@ tests/test_invalid_rep
tests/test_msg_flags tests/test_msg_flags
tests/test_ts_context tests/test_ts_context
tests/test_connect_resolve tests/test_connect_resolve
tests/test_term_endpoint
src/platform.hpp* src/platform.hpp*
src/stamp-h1 src/stamp-h1
perf/local_lat perf/local_lat
......
...@@ -42,6 +42,7 @@ ...@@ -42,6 +42,7 @@
Name="VCCLCompilerTool" Name="VCCLCompilerTool"
AdditionalOptions="-DDLL_EXPORT -DFD_SETSIZE=1024 -D_CRT_SECURE_NO_WARNINGS" AdditionalOptions="-DDLL_EXPORT -DFD_SETSIZE=1024 -D_CRT_SECURE_NO_WARNINGS"
Optimization="0" Optimization="0"
PreprocessorDefinitions="NOMINMAX"
MinimalRebuild="true" MinimalRebuild="true"
BasicRuntimeChecks="3" BasicRuntimeChecks="3"
RuntimeLibrary="3" RuntimeLibrary="3"
...@@ -258,10 +259,18 @@ ...@@ -258,10 +259,18 @@
RelativePath="..\..\..\src\ctx.cpp" RelativePath="..\..\..\src\ctx.cpp"
> >
</File> </File>
<File
RelativePath="..\..\..\src\dealer.cpp"
>
</File>
<File <File
RelativePath="..\..\..\src\decoder.cpp" RelativePath="..\..\..\src\decoder.cpp"
> >
</File> </File>
<File
RelativePath="..\..\..\src\device.cpp"
>
</File>
<File <File
RelativePath="..\..\..\src\devpoll.cpp" RelativePath="..\..\..\src\devpoll.cpp"
> >
...@@ -402,6 +411,10 @@ ...@@ -402,6 +411,10 @@
RelativePath="..\..\..\src\req.cpp" RelativePath="..\..\..\src\req.cpp"
> >
</File> </File>
<File
RelativePath="..\..\..\src\router.cpp"
>
</File>
<File <File
RelativePath="..\..\..\src\select.cpp" RelativePath="..\..\..\src\select.cpp"
> >
...@@ -450,14 +463,6 @@ ...@@ -450,14 +463,6 @@
RelativePath="..\..\..\src\xpub.cpp" RelativePath="..\..\..\src\xpub.cpp"
> >
</File> </File>
<File
RelativePath="..\..\..\src\xrep.cpp"
>
</File>
<File
RelativePath="..\..\..\src\xreq.cpp"
>
</File>
<File <File
RelativePath="..\..\..\src\xsub.cpp" RelativePath="..\..\..\src\xsub.cpp"
> >
......
...@@ -106,7 +106,9 @@ ...@@ -106,7 +106,9 @@
<ClCompile Include="..\..\..\src\address.cpp" /> <ClCompile Include="..\..\..\src\address.cpp" />
<ClCompile Include="..\..\..\src\clock.cpp" /> <ClCompile Include="..\..\..\src\clock.cpp" />
<ClCompile Include="..\..\..\src\ctx.cpp" /> <ClCompile Include="..\..\..\src\ctx.cpp" />
<ClCompile Include="..\..\..\src\dealer.cpp" />
<ClCompile Include="..\..\..\src\decoder.cpp" /> <ClCompile Include="..\..\..\src\decoder.cpp" />
<ClCompile Include="..\..\..\src\device.cpp" />
<ClCompile Include="..\..\..\src\devpoll.cpp" /> <ClCompile Include="..\..\..\src\devpoll.cpp" />
<ClCompile Include="..\..\..\src\dist.cpp" /> <ClCompile Include="..\..\..\src\dist.cpp" />
<ClCompile Include="..\..\..\src\encoder.cpp" /> <ClCompile Include="..\..\..\src\encoder.cpp" />
...@@ -144,6 +146,7 @@ ...@@ -144,6 +146,7 @@
<ClCompile Include="..\..\..\src\reaper.cpp" /> <ClCompile Include="..\..\..\src\reaper.cpp" />
<ClCompile Include="..\..\..\src\rep.cpp" /> <ClCompile Include="..\..\..\src\rep.cpp" />
<ClCompile Include="..\..\..\src\req.cpp" /> <ClCompile Include="..\..\..\src\req.cpp" />
<ClCompile Include="..\..\..\src\router.cpp" />
<ClCompile Include="..\..\..\src\select.cpp" /> <ClCompile Include="..\..\..\src\select.cpp" />
<ClCompile Include="..\..\..\src\session_base.cpp" /> <ClCompile Include="..\..\..\src\session_base.cpp" />
<ClCompile Include="..\..\..\src\signaler.cpp" /> <ClCompile Include="..\..\..\src\signaler.cpp" />
...@@ -156,8 +159,6 @@ ...@@ -156,8 +159,6 @@
<ClCompile Include="..\..\..\src\thread.cpp" /> <ClCompile Include="..\..\..\src\thread.cpp" />
<ClCompile Include="..\..\..\src\trie.cpp" /> <ClCompile Include="..\..\..\src\trie.cpp" />
<ClCompile Include="..\..\..\src\xpub.cpp" /> <ClCompile Include="..\..\..\src\xpub.cpp" />
<ClCompile Include="..\..\..\src\xrep.cpp" />
<ClCompile Include="..\..\..\src\xreq.cpp" />
<ClCompile Include="..\..\..\src\xsub.cpp" /> <ClCompile Include="..\..\..\src\xsub.cpp" />
<ClCompile Include="..\..\..\src\zmq.cpp" /> <ClCompile Include="..\..\..\src\zmq.cpp" />
<ClCompile Include="..\..\..\src\zmq_utils.cpp" /> <ClCompile Include="..\..\..\src\zmq_utils.cpp" />
......
zmq_disconnect(3)
==============
NAME
----
zmq_disconnect - Disconnect a socket
SYNOPSIS
--------
int zmq_disconnect (void '*socket', const char '*endpoint');
DESCRIPTION
-----------
The _zmq_disconnect()_ function shall disconnect a socket specified
by the 'socket' argument from the endpoint specified by the 'endpoint'
argument.
The 'endpoint' argument is as described in linkzmq:zmq_connect[3]
RETURN VALUE
------------
The _zmq_disconnect()_ function shall return zero if successful. Otherwise it
shall return `-1` and set 'errno' to one of the values defined below.
ERRORS
------
*EINVAL*::
The endpoint supplied is invalid.
*ETERM*::
The 0MQ 'context' associated with the specified 'socket' was terminated.
*ENOTSOCK*::
The provided 'socket' was invalid.
EXAMPLE
-------
.Connecting a subscriber socket to an in-process and a TCP transport
----
/* Create a ZMQ_SUB socket */
void *socket = zmq_socket (context, ZMQ_SUB);
assert (socket);
/* Connect it to the host server001, port 5555 using a TCP transport */
rc = zmq_connect (socket, "tcp://server001:5555");
assert (rc == 0);
/* Disconnect from the previously connected endpoint */
rc = zmq_disconnect (socket, "tcp://server001:5555");
assert (rc == 0);
----
SEE ALSO
--------
linkzmq:zmq_connect[3]
linkzmq:zmq_socket[3]
linkzmq:zmq[7]
AUTHORS
-------
This 0MQ manual page was written by Martin Sustrik <sustrik@250bpm.com>,
Martin Lucina <mato@kotelna.sk> and Ian Barber <ian.barber@gmail.com>
zmq_unbind(3)
==============
NAME
----
zmq_unbind - Stop accepting connections on a socket
SYNOPSIS
--------
int zmq_unbind (void '*socket', const char '*endpoint');
DESCRIPTION
-----------
The _zmq_unbind()_ function shall unbind a socket specified
by the 'socket' argument from the endpoint specified by the 'endpoint'
argument.
The 'endpoint' argument is as described in linkzmq:zmq_bind[3]
RETURN VALUE
------------
The _zmq_unbind()_ function shall return zero if successful. Otherwise it
shall return `-1` and set 'errno' to one of the values defined below.
ERRORS
------
*EINVAL*::
The endpoint supplied is invalid.
*ETERM*::
The 0MQ 'context' associated with the specified 'socket' was terminated.
*ENOTSOCK*::
The provided 'socket' was invalid.
EXAMPLE
-------
.Unbind a subscriber socket from a TCP transport
----
/* Create a ZMQ_SUB socket */
void *socket = zmq_socket (context, ZMQ_SUB);
assert (socket);
/* Connect it to the host server001, port 5555 using a TCP transport */
rc = zmq_bind (socket, "tcp://127.0.0.1:5555");
assert (rc == 0);
/* Disconnect from the previously connected endpoint */
rc = zmq_unbind (socket, "tcp://127.0.0.1:5555");
assert (rc == 0);
----
SEE ALSO
--------
linkzmq:zmq_bind[3]
linkzmq:zmq_socket[3]
linkzmq:zmq[7]
AUTHORS
-------
This 0MQ manual page was written by Martin Sustrik <sustrik@250bpm.com>,
Martin Lucina <mato@kotelna.sk> and Ian Barber <ian.barber@gmail.com>
...@@ -51,7 +51,7 @@ bool zmq::encoder_t::size_ready () ...@@ -51,7 +51,7 @@ bool zmq::encoder_t::size_ready ()
{ {
// Write message body into the buffer. // Write message body into the buffer.
next_step (in_progress.data (), in_progress.size (), next_step (in_progress.data (), in_progress.size (),
&encoder_t::message_ready, false); &encoder_t::message_ready, !(in_progress.flags () & msg_t::more));
return true; return true;
} }
...@@ -90,15 +90,13 @@ bool zmq::encoder_t::message_ready () ...@@ -90,15 +90,13 @@ bool zmq::encoder_t::message_ready ()
if (size < 255) { if (size < 255) {
tmpbuf [0] = (unsigned char) size; tmpbuf [0] = (unsigned char) size;
tmpbuf [1] = (in_progress.flags () & msg_t::more); tmpbuf [1] = (in_progress.flags () & msg_t::more);
next_step (tmpbuf, 2, &encoder_t::size_ready, next_step (tmpbuf, 2, &encoder_t::size_ready, false);
!(in_progress.flags () & msg_t::more));
} }
else { else {
tmpbuf [0] = 0xff; tmpbuf [0] = 0xff;
put_uint64 (tmpbuf + 1, size); put_uint64 (tmpbuf + 1, size);
tmpbuf [9] = (in_progress.flags () & msg_t::more); tmpbuf [9] = (in_progress.flags () & msg_t::more);
next_step (tmpbuf, 10, &encoder_t::size_ready, next_step (tmpbuf, 10, &encoder_t::size_ready, false);
!(in_progress.flags () & msg_t::more));
} }
return true; return true;
} }
...@@ -69,29 +69,24 @@ namespace zmq ...@@ -69,29 +69,24 @@ namespace zmq
unsigned char *buffer = !*data_ ? buf : *data_; unsigned char *buffer = !*data_ ? buf : *data_;
size_t buffersize = !*data_ ? bufsize : *size_; size_t buffersize = !*data_ ? bufsize : *size_;
size_t pos = 0;
if (offset_) if (offset_)
*offset_ = -1; *offset_ = -1;
while (true) { size_t pos = 0;
while (pos < buffersize) {
// If there are no more data to return, run the state machine. // If there are no more data to return, run the state machine.
// If there are still no data, return what we already have // If there are still no data, return what we already have
// in the buffer. // in the buffer.
if (!to_write) { if (!to_write) {
if (!(static_cast <T*> (this)->*next) ()) { // If we are to encode the beginning of a new message,
*data_ = buffer; // adjust the message offset.
*size_ = pos; if (beginning)
return;
}
// If beginning of the message was processed, adjust the
// first-message-offset.
if (beginning) {
if (offset_ && *offset_ == -1) if (offset_ && *offset_ == -1)
*offset_ = (int) pos; *offset_ = static_cast <int> (pos);
beginning = false;
} if (!(static_cast <T*> (this)->*next) ())
break;
} }
// If there are no data in the buffer yet and we are able to // If there are no data in the buffer yet and we are able to
...@@ -118,12 +113,10 @@ namespace zmq ...@@ -118,12 +113,10 @@ namespace zmq
pos += to_copy; pos += to_copy;
write_pos += to_copy; write_pos += to_copy;
to_write -= to_copy; to_write -= to_copy;
if (pos == buffersize) { }
*data_ = buffer; *data_ = buffer;
*size_ = pos; *size_ = pos;
return;
}
}
} }
protected: protected:
......
...@@ -57,6 +57,12 @@ zmq::fd_t zmq::open_socket (int domain_, int type_, int protocol_) ...@@ -57,6 +57,12 @@ zmq::fd_t zmq::open_socket (int domain_, int type_, int protocol_)
errno_assert (rc != -1); errno_assert (rc != -1);
#endif #endif
// On Windows, preventing sockets to be inherited by child processes.
#if defined ZMQ_HAVE_WINDOWS && defined HANDLE_FLAG_INHERIT
BOOL brc = SetHandleInformation ((HANDLE) s, HANDLE_FLAG_INHERIT, 0);
win_assert (brc);
#endif
return s; return s;
} }
......
...@@ -288,6 +288,10 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) ...@@ -288,6 +288,10 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
*w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0); *w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0);
wsa_assert (*w_ != INVALID_SOCKET); wsa_assert (*w_ != INVALID_SOCKET);
// On Windows, preventing sockets to be inherited by child processes.
BOOL brc = SetHandleInformation ((HANDLE) *w_, HANDLE_FLAG_INHERIT, 0);
win_assert (brc);
// Set TCP_NODELAY on writer socket. // Set TCP_NODELAY on writer socket.
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
(char *)&tcp_nodelay, sizeof (tcp_nodelay)); (char *)&tcp_nodelay, sizeof (tcp_nodelay));
...@@ -301,12 +305,16 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) ...@@ -301,12 +305,16 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
*r_ = accept (listener, NULL, NULL); *r_ = accept (listener, NULL, NULL);
wsa_assert (*r_ != INVALID_SOCKET); wsa_assert (*r_ != INVALID_SOCKET);
// On Windows, preventing sockets to be inherited by child processes.
brc = SetHandleInformation ((HANDLE) *r_, HANDLE_FLAG_INHERIT, 0);
win_assert (brc);
// We don't need the listening socket anymore. Close it. // We don't need the listening socket anymore. Close it.
rc = closesocket (listener); rc = closesocket (listener);
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
// Exit the critical section. // Exit the critical section.
BOOL brc = SetEvent (sync); brc = SetEvent (sync);
win_assert (brc != 0); win_assert (brc != 0);
return 0; return 0;
......
...@@ -173,6 +173,9 @@ int zmq::tcp_listener_t::set_address (const char *addr_) ...@@ -173,6 +173,9 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
wsa_error_to_errno (); wsa_error_to_errno ();
return -1; return -1;
} }
// On Windows, preventing sockets to be inherited by child processes.
BOOL brc = SetHandleInformation ((HANDLE) s, HANDLE_FLAG_INHERIT, 0);
win_assert (brc);
#else #else
if (s == -1) if (s == -1)
return -1; return -1;
...@@ -239,6 +242,9 @@ zmq::fd_t zmq::tcp_listener_t::accept () ...@@ -239,6 +242,9 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
WSAGetLastError () == WSAECONNRESET); WSAGetLastError () == WSAECONNRESET);
return retired_fd; return retired_fd;
} }
// On Windows, preventing sockets to be inherited by child processes.
BOOL brc = SetHandleInformation ((HANDLE) sock, HANDLE_FLAG_INHERIT, 0);
win_assert (brc);
#else #else
if (sock == -1) { if (sock == -1) {
errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment