Commit dfaa55fa authored by Richard Newton's avatar Richard Newton

Merge branch 'master' of https://github.com/zeromq/libzmq

parents f9fe77f4 f03a78bb
......@@ -74,6 +74,7 @@ test_linger
test_security_null
test_security_plain
test_proxy
test_proxy_terminate
test_abstract_ipc
test_filter_ipc
test_connect_delay_tipc
......
......@@ -86,6 +86,7 @@ Philip Kovacs
Pieter Hintjens
Piotr Trojanek
Richard Newton
Rik van der Heijden
Robert G. Jakabosky
Sebastian Otaegui
Stefan Radomski
......
......@@ -294,8 +294,8 @@ if(MSVC)
set(OPENPGM_INCLUDE_DIRS ${OPENPGM_ROOT}/include)
set(OPENPGM_LIBRARY_DIRS ${OPENPGM_ROOT}/lib)
set(OPENPGM_LIBRARIES
optimized libpgm${_zmq_COMPILER}-mt-${OPENPGM_VERSION_MAJOR}_${OPENPGM_VERSION_MINOR}_${OPENPGM_VERSION_MICRO}.lib
debug libpgm${_zmq_COMPILER}-mt-gd-${OPENPGM_VERSION_MAJOR}_${OPENPGM_VERSION_MINOR}_${OPENPGM_VERSION_MICRO}.lib)
optimized libpgm-${CMAKE_VS_PLATFORM_TOOLSET}-mt-${OPENPGM_VERSION_MAJOR}_${OPENPGM_VERSION_MINOR}_${OPENPGM_VERSION_MICRO}.lib
debug libpgm-${CMAKE_VS_PLATFORM_TOOLSET}-mt-gd-${OPENPGM_VERSION_MAJOR}_${OPENPGM_VERSION_MINOR}_${OPENPGM_VERSION_MICRO}.lib)
endif()
else()
if(WITH_OPENPGM)
......@@ -578,15 +578,15 @@ if(MSVC)
target_link_libraries(libzmq ${OPTIONAL_LIBRARIES})
set_target_properties(libzmq PROPERTIES
PUBLIC_HEADER "${public_headers}"
RELEASE_POSTFIX "${_zmq_COMPILER}-mt-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}"
DEBUG_POSTFIX "${_zmq_COMPILER}-mt-gd-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}"
RELEASE_POSTFIX "-${CMAKE_VS_PLATFORM_TOOLSET}-mt-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}"
DEBUG_POSTFIX "-${CMAKE_VS_PLATFORM_TOOLSET}-mt-gd-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}"
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}/bin"
COMPILE_DEFINITIONS "DLL_EXPORT")
add_library(libzmq-static STATIC ${sources})
set_target_properties(libzmq-static PROPERTIES
PUBLIC_HEADER "${public_headers}"
RELEASE_POSTFIX "${_zmq_COMPILER}-mt-s-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}"
DEBUG_POSTFIX "${_zmq_COMPILER}-mt-sgd-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}"
RELEASE_POSTFIX "-${CMAKE_VS_PLATFORM_TOOLSET}-mt-s-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}"
DEBUG_POSTFIX "-${CMAKE_VS_PLATFORM_TOOLSET}-mt-sgd-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}"
COMPILE_FLAGS "/D ZMQ_STATIC"
OUTPUT_NAME "libzmq")
else()
......@@ -648,6 +648,12 @@ set(perf-tools local_lat
inproc_thr)
if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug") # Why?
option(WITH_PERF_TOOL "Build with perf-tools" ON)
else()
option(WITH_PERF_TOOL "Build with perf-tools" OFF)
endif()
if(WITH_PERF_TOOL)
foreach(perf-tool ${perf-tools})
add_executable(${perf-tool} perf/${perf-tool}.cpp)
target_link_libraries(${perf-tool} libzmq)
......@@ -698,7 +704,7 @@ if(MSVC)
ARCHIVE DESTINATION lib
PUBLIC_HEADER DESTINATION include
COMPONENT SDK)
install(FILES ${CMAKE_CURRENT_BINARY_DIR}/bin/libzmq${_zmq_COMPILER}-mt-gd-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}.pdb DESTINATION lib
install(FILES ${CMAKE_CURRENT_BINARY_DIR}/bin/libzmq-${CMAKE_VS_PLATFORM_TOOLSET}-mt-gd-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}.pdb DESTINATION lib
COMPONENT SDK)
else()
install(TARGETS libzmq
......
......@@ -340,6 +340,7 @@ test_apps = \
tests/test_inproc_connect \
tests/test_issue_566 \
tests/test_proxy \
tests/test_proxy_terminate \
tests/test_many_sockets \
tests/test_ipc_wildcard \
tests/test_diffserv \
......@@ -493,6 +494,9 @@ tests_test_issue_566_LDADD = src/libzmq.la
tests_test_proxy_SOURCES = tests/test_proxy.cpp
tests_test_proxy_LDADD = src/libzmq.la
tests_test_proxy_terminate_SOURCES = tests/test_proxy_terminate.cpp
tests_test_proxy_terminate_LDADD = src/libzmq.la
tests_test_many_sockets_SOURCES = tests/test_many_sockets.cpp
tests_test_many_sockets_LDADD = src/libzmq.la
......
......@@ -6,13 +6,3 @@ string(REGEX REPLACE ".*#define ZMQ_VERSION_PATCH ([0-9]+).*" "\\1" ZMQ_VERSION_
set(ZMQ_VERSION "${ZMQ_VERSION_MAJOR}.${ZMQ_VERSION_MINOR}.${ZMQ_VERSION_PATCH}")
message(STATUS "Detected ZMQ Version - ${ZMQ_VERSION}")
if(MSVC_VERSION MATCHES "1700")
set(_zmq_COMPILER "-v110")
elseif(MSVC10)
set(_zmq_COMPILER "-v100")
elseif(MSVC90)
set(_zmq_COMPILER "-v90")
else()
set(_zmq_COMPILER "")
endif()
......@@ -86,14 +86,4 @@
#cmakedefine ZMQ_HAVE_WINDOWS
#ifdef ZMQ_HAVE_WINDOWS
#if defined _WIN32_WINNT && _WIN32_WINNT < 0x0600
#undef _WIN32_WINNT
#endif
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#endif
#endif
#endif
\ No newline at end of file
......@@ -29,13 +29,4 @@
#define ZMQ_HAVE_WINDOWS
#if defined _WIN32_WINNT && _WIN32_WINNT < 0x0600
#undef _WIN32_WINNT
#endif
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#endif
#endif
......@@ -29,12 +29,4 @@
#define ZMQ_HAVE_WINDOWS
#if defined _WIN32_WINNT && _WIN32_WINNT < 0x0600
#undef _WIN32_WINNT
#endif
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0600
#endif
#endif
......@@ -26,15 +26,11 @@ if [[ $ANDROID_BUILD_CLEAN ]]; then
fi
##
# Build libsodium from latest release tarball
# Build libsodium from latest master branch
(android_build_verify_so "libsodium.so" &> /dev/null) || {
rm -rf "${cache}/libsodium"
(cd "${cache}" && mkdir libsodium \
&& wget https://download.libsodium.org/libsodium/releases/LATEST.tar.gz\
-O "${cache}/libsodium.tar.gz" \
&& tar -C libsodium -xf libsodium.tar.gz --strip=1) || exit 1
(cd "${cache}" && git clone git://github.com/jedisct1/libsodium.git) || exit 1
(cd "${cache}/libsodium" && ./autogen.sh \
&& ./configure "${ANDROID_BUILD_OPTS[@]}" --disable-soname-versions \
&& make \
......
......@@ -225,7 +225,7 @@ case "${host_os}" in
AC_DEFINE(ZMQ_FORCE_MUTEXES, 1, [Force to use mutexes])
fi
;;
*openbsd*)
*openbsd*|*bitrig*)
# Define on OpenBSD to enable all library features
CPPFLAGS="-D_BSD_SOURCE $CPPFLAGS"
AC_DEFINE(ZMQ_HAVE_OPENBSD, 1, [Have OpenBSD OS])
......@@ -245,7 +245,7 @@ case "${host_os}" in
LIBZMQ_CHECK_LANG_FLAG_PREPEND([-Ae])
AC_CHECK_FUNCS(gethrtime)
;;
*mingw32*)
*mingw*)
AC_DEFINE(ZMQ_HAVE_WINDOWS, 1, [Have Windows OS])
AC_DEFINE(ZMQ_HAVE_MINGW32, 1, [Have MinGW32])
AC_CHECK_HEADERS(windows.h)
......@@ -265,6 +265,9 @@ case "${host_os}" in
if test "x$enable_static" = "xyes"; then
AC_MSG_ERROR([Building static libraries is not supported under MinGW32])
fi
# Set FD_SETSIZE to 1024
CPPFLAGS=" -DFD_SETSIZE=1024 $CPPFLAGS"
;;
*cygwin*)
# Define on Cygwin to enable all library features
......
......@@ -31,6 +31,47 @@
#include "windows.hpp"
// Condition variable is supported from Windows Vista only, to use condition variable define _WIN32_WINNT to 0x0600
#if _WIN32_WINNT < 0x0600
namespace zmq
{
class condition_variable_t
{
public:
inline condition_variable_t ()
{
zmq_assert(false);
}
inline ~condition_variable_t ()
{
}
inline int wait (mutex_t* mutex_, int timeout_ )
{
zmq_assert(false);
return -1;
}
inline void broadcast ()
{
zmq_assert(false);
}
private:
// Disable copy construction and assignment.
condition_variable_t (const condition_variable_t&);
void operator = (const condition_variable_t&);
};
}
#else
namespace zmq
{
......@@ -79,6 +120,8 @@ namespace zmq
}
#endif
#else
#include <pthread.h>
......
......@@ -7,4 +7,5 @@ Name: libzmq
Description: 0MQ c++ library
Version: @ZMQ_VERSION_MAJOR@.@ZMQ_VERSION_MINOR@.@ZMQ_VERSION_PATCH@
Libs: -L${libdir} -lzmq
Libs.private: -lstdc++
Cflags: -I${includedir}
......@@ -7,4 +7,5 @@ Name: libzmq
Description: 0MQ c++ library
Version: @VERSION@
Libs: -L${libdir} -lzmq
Libs.private: -lstdc++
Cflags: -I${includedir}
......@@ -96,7 +96,7 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
case ZMQ_IDENTITY:
// Identity is any binary string from 1 to 255 octets
if (optvallen_ > 0 && optvallen_ < 256) {
identity_size = optvallen_;
identity_size = (unsigned char) optvallen_;
memcpy (identity, optval_, identity_size);
return 0;
}
......
......@@ -265,7 +265,7 @@ int zmq::plain_server_t::produce_error (msg_t *msg_) const
zmq_assert (rc == 0);
char *msg_data = static_cast <char *> (msg_->data ());
memcpy (msg_data, "\5ERROR", 6);
msg_data [6] = status_code.length ();
msg_data [6] = (char) status_code.length ();
memcpy (msg_data + 7, status_code.c_str (), status_code.length ());
return 0;
}
......
......@@ -159,14 +159,16 @@ int zmq::proxy (
}
// Process a request
if (state == active
&& items [0].revents & ZMQ_POLLIN) {
&& items [0].revents & ZMQ_POLLIN
&& items [1].revents & ZMQ_POLLOUT) {
rc = forward(frontend_, backend_, capture_,msg);
if (unlikely (rc < 0))
return -1;
}
// Process a reply
if (state == active
&& items [1].revents & ZMQ_POLLIN) {
&& items [1].revents & ZMQ_POLLIN
&& items [0].revents & ZMQ_POLLOUT) {
rc = forward(backend_, frontend_, capture_,msg);
if (unlikely (rc < 0))
return -1;
......
......@@ -18,6 +18,8 @@
*/
#include "pub.hpp"
#include "pipe.hpp"
#include "err.hpp"
#include "msg.hpp"
zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
......@@ -30,6 +32,17 @@ zmq::pub_t::~pub_t ()
{
}
void zmq::pub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{
zmq_assert (pipe_);
// Don't delay pipe termination as there is no one
// to receive the delimiter.
pipe_->set_nodelay ();
xpub_t::xattach_pipe (pipe_, subscribe_to_all_);
}
int zmq::pub_t::xrecv (class msg_t *)
{
// Messages cannot be received from PUB socket.
......
......@@ -38,6 +38,7 @@ namespace zmq
~pub_t ();
// Implementations of virtual functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_ = false);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
......
......@@ -36,6 +36,9 @@ void zmq::push_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{
// subscribe_to_all_ is unused
(void)subscribe_to_all_;
// Don't delay pipe termination as there is no one
// to receive the delimiter.
pipe_->set_nodelay ();
zmq_assert (pipe_);
lb.attach (pipe_);
......
......@@ -132,11 +132,14 @@ zmq::signaler_t::~signaler_t ()
const struct linger so_linger = { 1, 0 };
int rc = setsockopt (w, SOL_SOCKET, SO_LINGER,
(const char *) &so_linger, sizeof so_linger);
wsa_assert (rc != SOCKET_ERROR);
rc = closesocket (w);
wsa_assert (rc != SOCKET_ERROR);
rc = closesocket (r);
wsa_assert (rc != SOCKET_ERROR);
// Only check shutdown if WSASTARTUP was previously done
if (rc == 0 || WSAGetLastError () != WSANOTINITIALISED) {
wsa_assert (rc != SOCKET_ERROR);
rc = closesocket (w);
wsa_assert (rc != SOCKET_ERROR);
rc = closesocket (r);
wsa_assert (rc != SOCKET_ERROR);
}
#else
int rc = close_wait_ms (w);
errno_assert (rc == 0);
......
......@@ -37,12 +37,10 @@ zmq::socks_greeting_t::socks_greeting_t (uint8_t method_) :
}
zmq::socks_greeting_t::socks_greeting_t (
uint8_t *methods_, size_t num_methods_)
uint8_t *methods_, uint8_t num_methods_)
: num_methods (num_methods_)
{
zmq_assert (num_methods_ <= 255);
for (size_t i = 0; i < num_methods_; i++)
for (uint8_t i = 0; i < num_methods_; i++)
methods [i] = methods_ [i];
}
......@@ -55,8 +53,8 @@ void zmq::socks_greeting_encoder_t::encode (const socks_greeting_t &greeting_)
uint8_t *ptr = buf;
*ptr++ = 0x05;
*ptr++ = greeting_.num_methods;
for (size_t i = 0; i < greeting_.num_methods; i++)
*ptr++ = (uint8_t) greeting_.num_methods;
for (uint8_t i = 0; i < greeting_.num_methods; i++)
*ptr++ = greeting_.methods [i];
bytes_encoded = 2 + greeting_.num_methods;
......@@ -118,10 +116,13 @@ void zmq::socks_choice_decoder_t::reset ()
bytes_read = 0;
}
zmq::socks_request_t::socks_request_t (
uint8_t command_, std::string hostname_, uint16_t port_)
: command (command_), hostname (hostname_), port (port_)
{}
{
zmq_assert (hostname_.size () <= UINT8_MAX);
}
zmq::socks_request_encoder_t::socks_request_encoder_t ()
: bytes_encoded (0), bytes_written (0)
......@@ -129,6 +130,8 @@ zmq::socks_request_encoder_t::socks_request_encoder_t ()
void zmq::socks_request_encoder_t::encode (const socks_request_t &req)
{
zmq_assert (req.hostname.size() <= UINT8_MAX);
unsigned char *ptr = buf;
*ptr++ = 0x05;
*ptr++ = req.command;
......@@ -163,7 +166,7 @@ void zmq::socks_request_encoder_t::encode (const socks_request_t &req)
}
else {
*ptr++ = 0x03;
*ptr++ = req.hostname.size ();
*ptr++ = (unsigned char) req.hostname.size ();
memcpy (ptr, req.hostname.c_str (), req.hostname.size ());
ptr += req.hostname.size ();
}
......
......@@ -30,9 +30,9 @@ namespace zmq
struct socks_greeting_t
{
socks_greeting_t (uint8_t method);
socks_greeting_t (uint8_t *methods_, size_t num_methods_);
socks_greeting_t (uint8_t *methods_, uint8_t num_methods_);
uint8_t methods [255];
uint8_t methods [UINT8_MAX];
const size_t num_methods;
};
......@@ -48,7 +48,7 @@ namespace zmq
private:
size_t bytes_encoded;
size_t bytes_written;
uint8_t buf [2 + 255];
uint8_t buf [2 + UINT8_MAX];
};
struct socks_choice_t
......@@ -94,7 +94,7 @@ namespace zmq
private:
size_t bytes_encoded;
size_t bytes_written;
uint8_t buf [4 + 256 + 2];
uint8_t buf [4 + UINT8_MAX + 1 + 2];
};
struct socks_response_t
......@@ -116,7 +116,7 @@ namespace zmq
void reset ();
private:
uint8_t buf [4 + 256 + 2];
int8_t buf [4 + UINT8_MAX + 1 + 2];
size_t bytes_read;
};
......
......@@ -148,7 +148,7 @@ void zmq::socks_connecter_t::in_event ()
// Attach the engine to the corresponding session object.
send_attach (session, engine);
socket->event_connected (endpoint, s);
socket->event_connected (endpoint, (int) s);
rm_fd (handle);
s = -1;
......@@ -170,7 +170,7 @@ void zmq::socks_connecter_t::out_event ()
|| status == sending_request);
if (status == waiting_for_proxy_connection) {
const int rc = check_proxy_connection ();
const int rc = (int) check_proxy_connection ();
if (rc == -1)
error ();
else {
......@@ -436,7 +436,7 @@ void zmq::socks_connecter_t::close ()
const int rc = ::close (s);
errno_assert (rc == 0);
#endif
socket->event_closed (endpoint, s);
socket->event_closed (endpoint, (int) s);
s = retired_fd;
}
......
......@@ -59,4 +59,8 @@ typedef unsigned __int64 uint64_t;
#endif
#ifndef UINT8_MAX
#define UINT8_MAX 0xFF
#endif
#endif
......@@ -289,14 +289,13 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
connect_rid.length ());
connect_rid.clear ();
outpipes_t::iterator it = outpipes.find (identity);
if (it != outpipes.end ())
zmq_assert(false);
zmq_assert (it == outpipes.end ());
}
else {
put_uint32 (buffer + 1, next_rid++);
identity = blob_t (buffer, sizeof buffer);
memcpy (options.identity, identity.data (), identity.size ());
options.identity_size = identity.size ();
options.identity_size = (unsigned char) identity.size ();
}
pipe_->set_identity (identity);
// Add the record into output pipes lookup table
......
......@@ -668,18 +668,6 @@ bool zmq::stream_engine_t::handshake ()
}
#endif
else {
// Temporary support for security debugging
char mechanism [21];
memcpy (mechanism, greeting_recv + 12, 20);
mechanism [20] = 0;
printf ("LIBZMQ I: security failure, self=%s peer=%s\n",
options.mechanism == ZMQ_NULL? "NULL":
options.mechanism == ZMQ_PLAIN? "PLAIN":
options.mechanism == ZMQ_CURVE? "CURVE":
options.mechanism == ZMQ_GSSAPI? "GSSAPI":
"OTHER",
mechanism);
error (protocol_error);
return false;
}
......@@ -928,7 +916,7 @@ void zmq::stream_engine_t::error (error_reason_t reason)
terminator.close();
}
zmq_assert (session);
socket->event_disconnected (endpoint, s);
socket->event_disconnected (endpoint, (int) s);
session->flush ();
session->engine_error (reason);
unplug ();
......
......@@ -138,7 +138,7 @@ void zmq::tcp_connecter_t::out_event ()
// Shut the connecter down.
terminate ();
socket->event_connected (endpoint, fd);
socket->event_connected (endpoint, (int) fd);
}
void zmq::tcp_connecter_t::timer_event (int id_)
......@@ -352,6 +352,6 @@ void zmq::tcp_connecter_t::close ()
const int rc = ::close (s);
errno_assert (rc == 0);
#endif
socket->event_closed (endpoint, s);
socket->event_closed (endpoint, (int) s);
s = retired_fd;
}
......@@ -111,7 +111,7 @@ void zmq::tcp_listener_t::in_event ()
session->inc_seqnum ();
launch_child (session);
send_attach (session, engine, false);
socket->event_accepted (endpoint, fd);
socket->event_accepted (endpoint, (int) fd);
}
void zmq::tcp_listener_t::close ()
......@@ -124,7 +124,7 @@ void zmq::tcp_listener_t::close ()
int rc = ::close (s);
errno_assert (rc == 0);
#endif
socket->event_closed (endpoint, s);
socket->event_closed (endpoint, (int) s);
s = retired_fd;
}
......@@ -239,7 +239,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
goto error;
#endif
socket->event_listening (endpoint, s);
socket->event_listening (endpoint, (int) s);
return 0;
error:
......
......@@ -27,9 +27,9 @@
#define NOMINMAX // Macros min(a,b) and max(a,b)
#endif
// Set target version to Windows Server 2003, Windows XP/SP1 or higher.
// Set target version to Windows Server 2008, Windows Vista or higher. Windows XP (0x0501) is also supported but without client & server socket types.
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0501
#define _WIN32_WINNT 0x0600
#endif
#ifdef __MINGW32__
......
......@@ -155,7 +155,7 @@ uint8_t *zmq_z85_decode (uint8_t *dest, const char *string)
}
unsigned int byte_nbr = 0;
unsigned int char_nbr = 0;
unsigned int string_len = strlen (string);
size_t string_len = strlen (string);
uint32_t value = 0;
while (char_nbr < string_len) {
// Accumulate value in base 85
......
......@@ -55,6 +55,7 @@ if(NOT WIN32)
test_reqrep_ipc
test_abstract_ipc
test_proxy
test_proxy_terminate
test_filter_ipc
)
if(HAVE_FORK)
......
......@@ -94,11 +94,10 @@ int main(int, char**) {
}
if (iteration == 1) {
zmq_connect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_connect: %s\n", zmq_strerror(errno));
//zmq_connect(subSocket, "tcp://127.0.0.1:30010") && printf("zmq_connect: %s\n", zmq_strerror(errno));
msleep (SETTLE_TIME);
}
if (iteration == 4) {
zmq_disconnect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_disconnect(%d): %s\n", errno, zmq_strerror(errno));
//zmq_disconnect(subSocket, "tcp://127.0.0.1:30010") && printf("zmq_disconnect: %s\n", zmq_strerror(errno));
}
if (iteration > 4 && rc == 0)
break;
......
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#include "../include/zmq_utils.h"
// This is a test for issue #1382. The server thread creates a SUB-PUSH
// steerable proxy. The main process then sends messages to the SUB
// but there is no pull on the other side, previously the proxy blocks
// in writing to the backend, preventing the proxy from terminating
void
server_task (void *ctx)
{
// Frontend socket talks to main process
void *frontend = zmq_socket (ctx, ZMQ_SUB);
assert (frontend);
int rc = zmq_setsockopt (frontend, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
rc = zmq_bind (frontend, "tcp://127.0.0.1:15564");
assert (rc == 0);
// Nice socket which is never read
void *backend = zmq_socket (ctx, ZMQ_PUSH);
assert (backend);
rc = zmq_bind (backend, "tcp://127.0.0.1:15563");
assert (rc == 0);
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (ctx, ZMQ_SUB);
assert (control);
rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
rc = zmq_connect (control, "inproc://control");
assert (rc == 0);
// Connect backend to frontend via a proxy
zmq_proxy_steerable (frontend, backend, NULL, control);
rc = zmq_close (frontend);
assert (rc == 0);
rc = zmq_close (backend);
assert (rc == 0);
rc = zmq_close (control);
assert (rc == 0);
}
// The main thread simply starts a basic steerable proxy server, publishes some messages, and then
// waits for the server to terminate.
int main (void)
{
setup_test_environment ();
void *ctx = zmq_ctx_new ();
assert (ctx);
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (ctx, ZMQ_PUB);
assert (control);
int rc = zmq_bind (control, "inproc://control");
assert (rc == 0);
void *thread = zmq_threadstart(&server_task, ctx);
msleep (500); // Run for 500 ms
// Start a secondary publisher which writes data to the SUB-PUSH server socket
void *publisher = zmq_socket (ctx, ZMQ_PUB);
assert (publisher);
rc = zmq_connect (publisher, "tcp://127.0.0.1:15564");
assert (rc == 0);
msleep (50);
rc = zmq_send (publisher, "This is a test", 14, 0);
assert (rc == 14);
msleep (50);
rc = zmq_send (publisher, "This is a test", 14, 0);
assert (rc == 14);
msleep (50);
rc = zmq_send (publisher, "This is a test", 14, 0);
assert (rc == 14);
rc = zmq_send (control, "TERMINATE", 9, 0);
assert (rc == 9);
rc = zmq_close (publisher);
assert (rc == 0);
rc = zmq_close (control);
assert (rc == 0);
zmq_threadclose (thread);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
}
......@@ -235,7 +235,11 @@ int main (void)
ip4addr.sin_family = AF_INET;
ip4addr.sin_port = htons (9998);
inet_pton (AF_INET, "127.0.0.1", &ip4addr.sin_addr);
#if (_WIN32_WINNT < 0x0600)
ip4addr.sin_addr.s_addr = inet_addr ("127.0.0.1");
#else
inet_pton(AF_INET, "127.0.0.1", &ip4addr.sin_addr);
#endif
s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
rc = connect (s, (struct sockaddr*) &ip4addr, sizeof (ip4addr));
......
......@@ -148,7 +148,11 @@ int main (void)
ip4addr.sin_family = AF_INET;
ip4addr.sin_port = htons(9003);
#if (_WIN32_WINNT < 0x0600)
ip4addr.sin_addr.s_addr = inet_addr ("127.0.0.1");
#else
inet_pton(AF_INET, "127.0.0.1", &ip4addr.sin_addr);
#endif
s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
rc = connect (s, (struct sockaddr*) &ip4addr, sizeof ip4addr);
......
......@@ -154,7 +154,11 @@ int main (void)
ip4addr.sin_family = AF_INET;
ip4addr.sin_port = htons (9998);
inet_pton (AF_INET, "127.0.0.1", &ip4addr.sin_addr);
#if (_WIN32_WINNT < 0x0600)
ip4addr.sin_addr.s_addr = inet_addr ("127.0.0.1");
#else
inet_pton(AF_INET, "127.0.0.1", &ip4addr.sin_addr);
#endif
s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
rc = connect (s, (struct sockaddr*) &ip4addr, sizeof (ip4addr));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment