Commit 235a1dbf authored by Michael's avatar Michael

Merge pull request #2 from zeromq/master

update to current master
parents a53404f2 43a1192c
...@@ -74,6 +74,7 @@ test_linger ...@@ -74,6 +74,7 @@ test_linger
test_security_null test_security_null
test_security_plain test_security_plain
test_proxy test_proxy
test_proxy_terminate
test_abstract_ipc test_abstract_ipc
test_filter_ipc test_filter_ipc
test_connect_delay_tipc test_connect_delay_tipc
......
...@@ -86,6 +86,7 @@ Philip Kovacs ...@@ -86,6 +86,7 @@ Philip Kovacs
Pieter Hintjens Pieter Hintjens
Piotr Trojanek Piotr Trojanek
Richard Newton Richard Newton
Rik van der Heijden
Robert G. Jakabosky Robert G. Jakabosky
Sebastian Otaegui Sebastian Otaegui
Stefan Radomski Stefan Radomski
......
...@@ -340,6 +340,7 @@ test_apps = \ ...@@ -340,6 +340,7 @@ test_apps = \
tests/test_inproc_connect \ tests/test_inproc_connect \
tests/test_issue_566 \ tests/test_issue_566 \
tests/test_proxy \ tests/test_proxy \
tests/test_proxy_terminate \
tests/test_many_sockets \ tests/test_many_sockets \
tests/test_ipc_wildcard \ tests/test_ipc_wildcard \
tests/test_diffserv \ tests/test_diffserv \
...@@ -493,6 +494,9 @@ tests_test_issue_566_LDADD = src/libzmq.la ...@@ -493,6 +494,9 @@ tests_test_issue_566_LDADD = src/libzmq.la
tests_test_proxy_SOURCES = tests/test_proxy.cpp tests_test_proxy_SOURCES = tests/test_proxy.cpp
tests_test_proxy_LDADD = src/libzmq.la tests_test_proxy_LDADD = src/libzmq.la
tests_test_proxy_terminate_SOURCES = tests/test_proxy_terminate.cpp
tests_test_proxy_terminate_LDADD = src/libzmq.la
tests_test_many_sockets_SOURCES = tests/test_many_sockets.cpp tests_test_many_sockets_SOURCES = tests/test_many_sockets.cpp
tests_test_many_sockets_LDADD = src/libzmq.la tests_test_many_sockets_LDADD = src/libzmq.la
......
...@@ -86,14 +86,4 @@ ...@@ -86,14 +86,4 @@
#cmakedefine ZMQ_HAVE_WINDOWS #cmakedefine ZMQ_HAVE_WINDOWS
#ifdef ZMQ_HAVE_WINDOWS
#if defined _WIN32_WINNT && _WIN32_WINNT < 0x0600
#undef _WIN32_WINNT
#endif
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#endif
#endif
#endif #endif
\ No newline at end of file
...@@ -29,13 +29,4 @@ ...@@ -29,13 +29,4 @@
#define ZMQ_HAVE_WINDOWS #define ZMQ_HAVE_WINDOWS
#if defined _WIN32_WINNT && _WIN32_WINNT < 0x0600
#undef _WIN32_WINNT
#endif
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#endif
#endif #endif
...@@ -29,12 +29,4 @@ ...@@ -29,12 +29,4 @@
#define ZMQ_HAVE_WINDOWS #define ZMQ_HAVE_WINDOWS
#if defined _WIN32_WINNT && _WIN32_WINNT < 0x0600
#undef _WIN32_WINNT
#endif
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#endif
#endif #endif
...@@ -26,15 +26,11 @@ if [[ $ANDROID_BUILD_CLEAN ]]; then ...@@ -26,15 +26,11 @@ if [[ $ANDROID_BUILD_CLEAN ]]; then
fi fi
## ##
# Build libsodium from latest release tarball # Build libsodium from latest master branch
(android_build_verify_so "libsodium.so" &> /dev/null) || { (android_build_verify_so "libsodium.so" &> /dev/null) || {
rm -rf "${cache}/libsodium" rm -rf "${cache}/libsodium"
(cd "${cache}" && mkdir libsodium \ (cd "${cache}" && git clone git://github.com/jedisct1/libsodium.git) || exit 1
&& wget https://download.libsodium.org/libsodium/releases/LATEST.tar.gz\
-O "${cache}/libsodium.tar.gz" \
&& tar -C libsodium -xf libsodium.tar.gz --strip=1) || exit 1
(cd "${cache}/libsodium" && ./autogen.sh \ (cd "${cache}/libsodium" && ./autogen.sh \
&& ./configure "${ANDROID_BUILD_OPTS[@]}" --disable-soname-versions \ && ./configure "${ANDROID_BUILD_OPTS[@]}" --disable-soname-versions \
&& make \ && make \
......
...@@ -225,7 +225,7 @@ case "${host_os}" in ...@@ -225,7 +225,7 @@ case "${host_os}" in
AC_DEFINE(ZMQ_FORCE_MUTEXES, 1, [Force to use mutexes]) AC_DEFINE(ZMQ_FORCE_MUTEXES, 1, [Force to use mutexes])
fi fi
;; ;;
*openbsd*) *openbsd*|*bitrig*)
# Define on OpenBSD to enable all library features # Define on OpenBSD to enable all library features
CPPFLAGS="-D_BSD_SOURCE $CPPFLAGS" CPPFLAGS="-D_BSD_SOURCE $CPPFLAGS"
AC_DEFINE(ZMQ_HAVE_OPENBSD, 1, [Have OpenBSD OS]) AC_DEFINE(ZMQ_HAVE_OPENBSD, 1, [Have OpenBSD OS])
......
...@@ -31,6 +31,47 @@ ...@@ -31,6 +31,47 @@
#include "windows.hpp" #include "windows.hpp"
// Condition variable is supported from Windows Vista only, to use condition variable define _WIN32_WINNT to 0x0600
#if _WIN32_WINNT < 0x0600
namespace zmq
{
class condition_variable_t
{
public:
inline condition_variable_t ()
{
zmq_assert(false);
}
inline ~condition_variable_t ()
{
}
inline int wait (mutex_t* mutex_, int timeout_ )
{
zmq_assert(false);
return -1;
}
inline void broadcast ()
{
zmq_assert(false);
}
private:
// Disable copy construction and assignment.
condition_variable_t (const condition_variable_t&);
void operator = (const condition_variable_t&);
};
}
#else
namespace zmq namespace zmq
{ {
...@@ -79,6 +120,8 @@ namespace zmq ...@@ -79,6 +120,8 @@ namespace zmq
} }
#endif
#else #else
#include <pthread.h> #include <pthread.h>
......
...@@ -96,7 +96,7 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, ...@@ -96,7 +96,7 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
case ZMQ_IDENTITY: case ZMQ_IDENTITY:
// Identity is any binary string from 1 to 255 octets // Identity is any binary string from 1 to 255 octets
if (optvallen_ > 0 && optvallen_ < 256) { if (optvallen_ > 0 && optvallen_ < 256) {
identity_size = optvallen_; identity_size = (unsigned char) optvallen_;
memcpy (identity, optval_, identity_size); memcpy (identity, optval_, identity_size);
return 0; return 0;
} }
......
...@@ -265,7 +265,7 @@ int zmq::plain_server_t::produce_error (msg_t *msg_) const ...@@ -265,7 +265,7 @@ int zmq::plain_server_t::produce_error (msg_t *msg_) const
zmq_assert (rc == 0); zmq_assert (rc == 0);
char *msg_data = static_cast <char *> (msg_->data ()); char *msg_data = static_cast <char *> (msg_->data ());
memcpy (msg_data, "\5ERROR", 6); memcpy (msg_data, "\5ERROR", 6);
msg_data [6] = status_code.length (); msg_data [6] = (char) status_code.length ();
memcpy (msg_data + 7, status_code.c_str (), status_code.length ()); memcpy (msg_data + 7, status_code.c_str (), status_code.length ());
return 0; return 0;
} }
......
...@@ -159,14 +159,16 @@ int zmq::proxy ( ...@@ -159,14 +159,16 @@ int zmq::proxy (
} }
// Process a request // Process a request
if (state == active if (state == active
&& items [0].revents & ZMQ_POLLIN) { && items [0].revents & ZMQ_POLLIN
&& items [1].revents & ZMQ_POLLOUT) {
rc = forward(frontend_, backend_, capture_,msg); rc = forward(frontend_, backend_, capture_,msg);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return -1;
} }
// Process a reply // Process a reply
if (state == active if (state == active
&& items [1].revents & ZMQ_POLLIN) { && items [1].revents & ZMQ_POLLIN
&& items [0].revents & ZMQ_POLLOUT) {
rc = forward(backend_, frontend_, capture_,msg); rc = forward(backend_, frontend_, capture_,msg);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return -1;
......
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
*/ */
#include "pub.hpp" #include "pub.hpp"
#include "pipe.hpp"
#include "err.hpp"
#include "msg.hpp" #include "msg.hpp"
zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
...@@ -30,6 +32,17 @@ zmq::pub_t::~pub_t () ...@@ -30,6 +32,17 @@ zmq::pub_t::~pub_t ()
{ {
} }
void zmq::pub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{
zmq_assert (pipe_);
// Don't delay pipe termination as there is no one
// to receive the delimiter.
pipe_->set_nodelay ();
xpub_t::xattach_pipe (pipe_, subscribe_to_all_);
}
int zmq::pub_t::xrecv (class msg_t *) int zmq::pub_t::xrecv (class msg_t *)
{ {
// Messages cannot be received from PUB socket. // Messages cannot be received from PUB socket.
......
...@@ -38,6 +38,7 @@ namespace zmq ...@@ -38,6 +38,7 @@ namespace zmq
~pub_t (); ~pub_t ();
// Implementations of virtual functions from socket_base_t. // Implementations of virtual functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_ = false);
int xrecv (zmq::msg_t *msg_); int xrecv (zmq::msg_t *msg_);
bool xhas_in (); bool xhas_in ();
......
...@@ -36,6 +36,9 @@ void zmq::push_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) ...@@ -36,6 +36,9 @@ void zmq::push_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{ {
// subscribe_to_all_ is unused // subscribe_to_all_ is unused
(void)subscribe_to_all_; (void)subscribe_to_all_;
// Don't delay pipe termination as there is no one
// to receive the delimiter.
pipe_->set_nodelay ();
zmq_assert (pipe_); zmq_assert (pipe_);
lb.attach (pipe_); lb.attach (pipe_);
......
...@@ -132,11 +132,14 @@ zmq::signaler_t::~signaler_t () ...@@ -132,11 +132,14 @@ zmq::signaler_t::~signaler_t ()
const struct linger so_linger = { 1, 0 }; const struct linger so_linger = { 1, 0 };
int rc = setsockopt (w, SOL_SOCKET, SO_LINGER, int rc = setsockopt (w, SOL_SOCKET, SO_LINGER,
(const char *) &so_linger, sizeof so_linger); (const char *) &so_linger, sizeof so_linger);
wsa_assert (rc != SOCKET_ERROR); // Only check shutdown if WSASTARTUP was previously done
rc = closesocket (w); if (rc == 0 || WSAGetLastError () != WSANOTINITIALISED) {
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
rc = closesocket (r); rc = closesocket (w);
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
rc = closesocket (r);
wsa_assert (rc != SOCKET_ERROR);
}
#else #else
int rc = close_wait_ms (w); int rc = close_wait_ms (w);
errno_assert (rc == 0); errno_assert (rc == 0);
......
...@@ -37,12 +37,10 @@ zmq::socks_greeting_t::socks_greeting_t (uint8_t method_) : ...@@ -37,12 +37,10 @@ zmq::socks_greeting_t::socks_greeting_t (uint8_t method_) :
} }
zmq::socks_greeting_t::socks_greeting_t ( zmq::socks_greeting_t::socks_greeting_t (
uint8_t *methods_, size_t num_methods_) uint8_t *methods_, uint8_t num_methods_)
: num_methods (num_methods_) : num_methods (num_methods_)
{ {
zmq_assert (num_methods_ <= 255); for (uint8_t i = 0; i < num_methods_; i++)
for (size_t i = 0; i < num_methods_; i++)
methods [i] = methods_ [i]; methods [i] = methods_ [i];
} }
...@@ -55,8 +53,8 @@ void zmq::socks_greeting_encoder_t::encode (const socks_greeting_t &greeting_) ...@@ -55,8 +53,8 @@ void zmq::socks_greeting_encoder_t::encode (const socks_greeting_t &greeting_)
uint8_t *ptr = buf; uint8_t *ptr = buf;
*ptr++ = 0x05; *ptr++ = 0x05;
*ptr++ = greeting_.num_methods; *ptr++ = (uint8_t) greeting_.num_methods;
for (size_t i = 0; i < greeting_.num_methods; i++) for (uint8_t i = 0; i < greeting_.num_methods; i++)
*ptr++ = greeting_.methods [i]; *ptr++ = greeting_.methods [i];
bytes_encoded = 2 + greeting_.num_methods; bytes_encoded = 2 + greeting_.num_methods;
...@@ -118,10 +116,13 @@ void zmq::socks_choice_decoder_t::reset () ...@@ -118,10 +116,13 @@ void zmq::socks_choice_decoder_t::reset ()
bytes_read = 0; bytes_read = 0;
} }
zmq::socks_request_t::socks_request_t ( zmq::socks_request_t::socks_request_t (
uint8_t command_, std::string hostname_, uint16_t port_) uint8_t command_, std::string hostname_, uint16_t port_)
: command (command_), hostname (hostname_), port (port_) : command (command_), hostname (hostname_), port (port_)
{} {
zmq_assert (hostname_.size () <= UINT8_MAX);
}
zmq::socks_request_encoder_t::socks_request_encoder_t () zmq::socks_request_encoder_t::socks_request_encoder_t ()
: bytes_encoded (0), bytes_written (0) : bytes_encoded (0), bytes_written (0)
...@@ -129,6 +130,8 @@ zmq::socks_request_encoder_t::socks_request_encoder_t () ...@@ -129,6 +130,8 @@ zmq::socks_request_encoder_t::socks_request_encoder_t ()
void zmq::socks_request_encoder_t::encode (const socks_request_t &req) void zmq::socks_request_encoder_t::encode (const socks_request_t &req)
{ {
zmq_assert (req.hostname.size() <= UINT8_MAX);
unsigned char *ptr = buf; unsigned char *ptr = buf;
*ptr++ = 0x05; *ptr++ = 0x05;
*ptr++ = req.command; *ptr++ = req.command;
...@@ -163,7 +166,7 @@ void zmq::socks_request_encoder_t::encode (const socks_request_t &req) ...@@ -163,7 +166,7 @@ void zmq::socks_request_encoder_t::encode (const socks_request_t &req)
} }
else { else {
*ptr++ = 0x03; *ptr++ = 0x03;
*ptr++ = req.hostname.size (); *ptr++ = (unsigned char) req.hostname.size ();
memcpy (ptr, req.hostname.c_str (), req.hostname.size ()); memcpy (ptr, req.hostname.c_str (), req.hostname.size ());
ptr += req.hostname.size (); ptr += req.hostname.size ();
} }
......
...@@ -30,9 +30,9 @@ namespace zmq ...@@ -30,9 +30,9 @@ namespace zmq
struct socks_greeting_t struct socks_greeting_t
{ {
socks_greeting_t (uint8_t method); socks_greeting_t (uint8_t method);
socks_greeting_t (uint8_t *methods_, size_t num_methods_); socks_greeting_t (uint8_t *methods_, uint8_t num_methods_);
uint8_t methods [255]; uint8_t methods [UINT8_MAX];
const size_t num_methods; const size_t num_methods;
}; };
...@@ -48,7 +48,7 @@ namespace zmq ...@@ -48,7 +48,7 @@ namespace zmq
private: private:
size_t bytes_encoded; size_t bytes_encoded;
size_t bytes_written; size_t bytes_written;
uint8_t buf [2 + 255]; uint8_t buf [2 + UINT8_MAX];
}; };
struct socks_choice_t struct socks_choice_t
...@@ -94,7 +94,7 @@ namespace zmq ...@@ -94,7 +94,7 @@ namespace zmq
private: private:
size_t bytes_encoded; size_t bytes_encoded;
size_t bytes_written; size_t bytes_written;
uint8_t buf [4 + 256 + 2]; uint8_t buf [4 + UINT8_MAX + 1 + 2];
}; };
struct socks_response_t struct socks_response_t
...@@ -116,7 +116,7 @@ namespace zmq ...@@ -116,7 +116,7 @@ namespace zmq
void reset (); void reset ();
private: private:
uint8_t buf [4 + 256 + 2]; int8_t buf [4 + UINT8_MAX + 1 + 2];
size_t bytes_read; size_t bytes_read;
}; };
......
...@@ -148,7 +148,7 @@ void zmq::socks_connecter_t::in_event () ...@@ -148,7 +148,7 @@ void zmq::socks_connecter_t::in_event ()
// Attach the engine to the corresponding session object. // Attach the engine to the corresponding session object.
send_attach (session, engine); send_attach (session, engine);
socket->event_connected (endpoint, s); socket->event_connected (endpoint, (int) s);
rm_fd (handle); rm_fd (handle);
s = -1; s = -1;
...@@ -170,7 +170,7 @@ void zmq::socks_connecter_t::out_event () ...@@ -170,7 +170,7 @@ void zmq::socks_connecter_t::out_event ()
|| status == sending_request); || status == sending_request);
if (status == waiting_for_proxy_connection) { if (status == waiting_for_proxy_connection) {
const int rc = check_proxy_connection (); const int rc = (int) check_proxy_connection ();
if (rc == -1) if (rc == -1)
error (); error ();
else { else {
...@@ -436,7 +436,7 @@ void zmq::socks_connecter_t::close () ...@@ -436,7 +436,7 @@ void zmq::socks_connecter_t::close ()
const int rc = ::close (s); const int rc = ::close (s);
errno_assert (rc == 0); errno_assert (rc == 0);
#endif #endif
socket->event_closed (endpoint, s); socket->event_closed (endpoint, (int) s);
s = retired_fd; s = retired_fd;
} }
......
...@@ -59,4 +59,8 @@ typedef unsigned __int64 uint64_t; ...@@ -59,4 +59,8 @@ typedef unsigned __int64 uint64_t;
#endif #endif
#ifndef UINT8_MAX
#define UINT8_MAX 0xFF
#endif
#endif #endif
...@@ -289,14 +289,13 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) ...@@ -289,14 +289,13 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
connect_rid.length ()); connect_rid.length ());
connect_rid.clear (); connect_rid.clear ();
outpipes_t::iterator it = outpipes.find (identity); outpipes_t::iterator it = outpipes.find (identity);
if (it != outpipes.end ()) zmq_assert (it == outpipes.end ());
zmq_assert(false);
} }
else { else {
put_uint32 (buffer + 1, next_rid++); put_uint32 (buffer + 1, next_rid++);
identity = blob_t (buffer, sizeof buffer); identity = blob_t (buffer, sizeof buffer);
memcpy (options.identity, identity.data (), identity.size ()); memcpy (options.identity, identity.data (), identity.size ());
options.identity_size = identity.size (); options.identity_size = (unsigned char) identity.size ();
} }
pipe_->set_identity (identity); pipe_->set_identity (identity);
// Add the record into output pipes lookup table // Add the record into output pipes lookup table
......
...@@ -928,7 +928,7 @@ void zmq::stream_engine_t::error (error_reason_t reason) ...@@ -928,7 +928,7 @@ void zmq::stream_engine_t::error (error_reason_t reason)
terminator.close(); terminator.close();
} }
zmq_assert (session); zmq_assert (session);
socket->event_disconnected (endpoint, s); socket->event_disconnected (endpoint, (int) s);
session->flush (); session->flush ();
session->engine_error (reason); session->engine_error (reason);
unplug (); unplug ();
......
...@@ -138,7 +138,7 @@ void zmq::tcp_connecter_t::out_event () ...@@ -138,7 +138,7 @@ void zmq::tcp_connecter_t::out_event ()
// Shut the connecter down. // Shut the connecter down.
terminate (); terminate ();
socket->event_connected (endpoint, fd); socket->event_connected (endpoint, (int) fd);
} }
void zmq::tcp_connecter_t::timer_event (int id_) void zmq::tcp_connecter_t::timer_event (int id_)
...@@ -352,6 +352,6 @@ void zmq::tcp_connecter_t::close () ...@@ -352,6 +352,6 @@ void zmq::tcp_connecter_t::close ()
const int rc = ::close (s); const int rc = ::close (s);
errno_assert (rc == 0); errno_assert (rc == 0);
#endif #endif
socket->event_closed (endpoint, s); socket->event_closed (endpoint, (int) s);
s = retired_fd; s = retired_fd;
} }
...@@ -111,7 +111,7 @@ void zmq::tcp_listener_t::in_event () ...@@ -111,7 +111,7 @@ void zmq::tcp_listener_t::in_event ()
session->inc_seqnum (); session->inc_seqnum ();
launch_child (session); launch_child (session);
send_attach (session, engine, false); send_attach (session, engine, false);
socket->event_accepted (endpoint, fd); socket->event_accepted (endpoint, (int) fd);
} }
void zmq::tcp_listener_t::close () void zmq::tcp_listener_t::close ()
...@@ -124,7 +124,7 @@ void zmq::tcp_listener_t::close () ...@@ -124,7 +124,7 @@ void zmq::tcp_listener_t::close ()
int rc = ::close (s); int rc = ::close (s);
errno_assert (rc == 0); errno_assert (rc == 0);
#endif #endif
socket->event_closed (endpoint, s); socket->event_closed (endpoint, (int) s);
s = retired_fd; s = retired_fd;
} }
...@@ -239,7 +239,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_) ...@@ -239,7 +239,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
goto error; goto error;
#endif #endif
socket->event_listening (endpoint, s); socket->event_listening (endpoint, (int) s);
return 0; return 0;
error: error:
......
...@@ -27,9 +27,9 @@ ...@@ -27,9 +27,9 @@
#define NOMINMAX // Macros min(a,b) and max(a,b) #define NOMINMAX // Macros min(a,b) and max(a,b)
#endif #endif
// Set target version to Windows Server 2003, Windows XP/SP1 or higher. // Set target version to Windows Server 2008, Windows Vista or higher. Windows XP (0x0501) is also supported but without client & server socket types.
#ifndef _WIN32_WINNT #ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0501 #define _WIN32_WINNT 0x0600
#endif #endif
#ifdef __MINGW32__ #ifdef __MINGW32__
......
...@@ -155,7 +155,7 @@ uint8_t *zmq_z85_decode (uint8_t *dest, const char *string) ...@@ -155,7 +155,7 @@ uint8_t *zmq_z85_decode (uint8_t *dest, const char *string)
} }
unsigned int byte_nbr = 0; unsigned int byte_nbr = 0;
unsigned int char_nbr = 0; unsigned int char_nbr = 0;
unsigned int string_len = strlen (string); size_t string_len = strlen (string);
uint32_t value = 0; uint32_t value = 0;
while (char_nbr < string_len) { while (char_nbr < string_len) {
// Accumulate value in base 85 // Accumulate value in base 85
......
...@@ -55,6 +55,7 @@ if(NOT WIN32) ...@@ -55,6 +55,7 @@ if(NOT WIN32)
test_reqrep_ipc test_reqrep_ipc
test_abstract_ipc test_abstract_ipc
test_proxy test_proxy
test_proxy_terminate
test_filter_ipc test_filter_ipc
) )
if(HAVE_FORK) if(HAVE_FORK)
......
...@@ -94,11 +94,10 @@ int main(int, char**) { ...@@ -94,11 +94,10 @@ int main(int, char**) {
} }
if (iteration == 1) { if (iteration == 1) {
zmq_connect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_connect: %s\n", zmq_strerror(errno)); zmq_connect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_connect: %s\n", zmq_strerror(errno));
//zmq_connect(subSocket, "tcp://127.0.0.1:30010") && printf("zmq_connect: %s\n", zmq_strerror(errno)); msleep (SETTLE_TIME);
} }
if (iteration == 4) { if (iteration == 4) {
zmq_disconnect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_disconnect(%d): %s\n", errno, zmq_strerror(errno)); zmq_disconnect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_disconnect(%d): %s\n", errno, zmq_strerror(errno));
//zmq_disconnect(subSocket, "tcp://127.0.0.1:30010") && printf("zmq_disconnect: %s\n", zmq_strerror(errno));
} }
if (iteration > 4 && rc == 0) if (iteration > 4 && rc == 0)
break; break;
......
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#include "../include/zmq_utils.h"
// This is a test for issue #1382. The server thread creates a SUB-PUSH
// steerable proxy. The main process then sends messages to the SUB
// but there is no pull on the other side, previously the proxy blocks
// in writing to the backend, preventing the proxy from terminating
void
server_task (void *ctx)
{
// Frontend socket talks to main process
void *frontend = zmq_socket (ctx, ZMQ_SUB);
assert (frontend);
int rc = zmq_setsockopt (frontend, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
rc = zmq_bind (frontend, "tcp://127.0.0.1:15564");
assert (rc == 0);
// Nice socket which is never read
void *backend = zmq_socket (ctx, ZMQ_PUSH);
assert (backend);
rc = zmq_bind (backend, "tcp://127.0.0.1:15563");
assert (rc == 0);
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (ctx, ZMQ_SUB);
assert (control);
rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
rc = zmq_connect (control, "inproc://control");
assert (rc == 0);
// Connect backend to frontend via a proxy
zmq_proxy_steerable (frontend, backend, NULL, control);
rc = zmq_close (frontend);
assert (rc == 0);
rc = zmq_close (backend);
assert (rc == 0);
rc = zmq_close (control);
assert (rc == 0);
}
// The main thread simply starts a basic steerable proxy server, publishes some messages, and then
// waits for the server to terminate.
int main (void)
{
setup_test_environment ();
void *ctx = zmq_ctx_new ();
assert (ctx);
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (ctx, ZMQ_PUB);
assert (control);
int rc = zmq_bind (control, "inproc://control");
assert (rc == 0);
void *thread = zmq_threadstart(&server_task, ctx);
msleep (500); // Run for 500 ms
// Start a secondary publisher which writes data to the SUB-PUSH server socket
void *publisher = zmq_socket (ctx, ZMQ_PUB);
assert (publisher);
rc = zmq_connect (publisher, "tcp://127.0.0.1:15564");
assert (rc == 0);
msleep (50);
rc = zmq_send (publisher, "This is a test", 14, 0);
assert (rc == 14);
msleep (50);
rc = zmq_send (publisher, "This is a test", 14, 0);
assert (rc == 14);
msleep (50);
rc = zmq_send (publisher, "This is a test", 14, 0);
assert (rc == 14);
rc = zmq_send (control, "TERMINATE", 9, 0);
assert (rc == 9);
rc = zmq_close (publisher);
assert (rc == 0);
rc = zmq_close (control);
assert (rc == 0);
zmq_threadclose (thread);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment