Commit d4b11b0d authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #1423 from ricnewton/master

Fail ZMQ_SNDHWM and ZMQ_RCVHWM setsockopt if already connected.
parents 383f67a5 32770d2e
...@@ -635,7 +635,7 @@ check_PROGRAMS = ${test_apps} ...@@ -635,7 +635,7 @@ check_PROGRAMS = ${test_apps}
# Run the test cases # Run the test cases
TESTS = $(test_apps) TESTS = $(test_apps)
XFAIL_TESTS = tests/test_socketopt_hwm XFAIL_TESTS =
if !ON_LINUX if !ON_LINUX
XFAIL_TESTS += tests/test_abstract_ipc XFAIL_TESTS += tests/test_abstract_ipc
......
...@@ -68,7 +68,8 @@ zmq::options_t::options_t () : ...@@ -68,7 +68,8 @@ zmq::options_t::options_t () :
gss_plaintext (false), gss_plaintext (false),
socket_id (0), socket_id (0),
conflate (false), conflate (false),
handshake_ivl (30000) handshake_ivl (30000),
connected (false)
{ {
} }
...@@ -539,7 +540,7 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, ...@@ -539,7 +540,7 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return -1; return -1;
} }
int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) const
{ {
bool is_int = (*optvallen_ == sizeof (int)); bool is_int = (*optvallen_ == sizeof (int));
int *value = (int *) optval_; int *value = (int *) optval_;
...@@ -884,3 +885,21 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) ...@@ -884,3 +885,21 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
bool zmq::options_t::is_valid (int option_) const
{
bool valid = true;
if (connected) {
switch (option_) {
case ZMQ_SNDHWM:
case ZMQ_RCVHWM:
valid = false;
break;
default:
break;
}
}
return valid;
}
...@@ -55,7 +55,9 @@ namespace zmq ...@@ -55,7 +55,9 @@ namespace zmq
options_t (); options_t ();
int setsockopt (int option_, const void *optval_, size_t optvallen_); int setsockopt (int option_, const void *optval_, size_t optvallen_);
int getsockopt (int option_, void *optval_, size_t *optvallen_); int getsockopt (int option_, void *optval_, size_t *optvallen_) const;
bool is_valid (int option_) const;
// High-water marks for message pipes. // High-water marks for message pipes.
int sndhwm; int sndhwm;
...@@ -195,6 +197,7 @@ namespace zmq ...@@ -195,6 +197,7 @@ namespace zmq
// close socket. Default is 30 secs. 0 means no handshake timeout. // close socket. Default is 30 secs. 0 means no handshake timeout.
int handshake_ivl; int handshake_ivl;
bool connected;
}; };
} }
......
...@@ -312,6 +312,13 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, ...@@ -312,6 +312,13 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
{ {
ENTER_MUTEX(); ENTER_MUTEX();
if (!options.is_valid(option_)) {
errno = EINVAL;
EXIT_MUTEX();
return -1;
}
if (unlikely (ctx_terminated)) { if (unlikely (ctx_terminated)) {
errno = ETERM; errno = ETERM;
EXIT_MUTEX(); EXIT_MUTEX();
...@@ -447,6 +454,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -447,6 +454,7 @@ int zmq::socket_base_t::bind (const char *addr_)
if (rc == 0) { if (rc == 0) {
connect_pending (addr_, this); connect_pending (addr_, this);
last_endpoint.assign (addr_); last_endpoint.assign (addr_);
options.connected = true;
} }
EXIT_MUTEX(); EXIT_MUTEX();
return rc; return rc;
...@@ -456,7 +464,10 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -456,7 +464,10 @@ int zmq::socket_base_t::bind (const char *addr_)
// For convenience's sake, bind can be used interchageable with // For convenience's sake, bind can be used interchageable with
// connect for PGM, EPGM and NORM transports. // connect for PGM, EPGM and NORM transports.
EXIT_MUTEX(); EXIT_MUTEX();
return connect (addr_); rc = connect (addr_);
if (rc != -1)
options.connected = true;
return rc;
} }
// Remaining trasnports require to be run in an I/O thread, so at this // Remaining trasnports require to be run in an I/O thread, so at this
...@@ -484,6 +495,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -484,6 +495,7 @@ int zmq::socket_base_t::bind (const char *addr_)
listener->get_address (last_endpoint); listener->get_address (last_endpoint);
add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL); add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
options.connected = true;
EXIT_MUTEX(); EXIT_MUTEX();
return 0; return 0;
} }
...@@ -505,6 +517,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -505,6 +517,7 @@ int zmq::socket_base_t::bind (const char *addr_)
listener->get_address (last_endpoint); listener->get_address (last_endpoint);
add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL); add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
options.connected = true;
EXIT_MUTEX(); EXIT_MUTEX();
return 0; return 0;
} }
...@@ -526,6 +539,7 @@ int zmq::socket_base_t::bind (const char *addr_) ...@@ -526,6 +539,7 @@ int zmq::socket_base_t::bind (const char *addr_)
listener->get_address (last_endpoint); listener->get_address (last_endpoint);
add_endpoint (addr_, (own_t *) listener, NULL); add_endpoint (addr_, (own_t *) listener, NULL);
options.connected = true;
EXIT_MUTEX(); EXIT_MUTEX();
return 0; return 0;
} }
...@@ -657,6 +671,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -657,6 +671,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// remember inproc connections for disconnect // remember inproc connections for disconnect
inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0])); inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0]));
options.connected = true;
EXIT_MUTEX(); EXIT_MUTEX();
return 0; return 0;
} }
......
...@@ -28,7 +28,7 @@ void test_valid_hwm_change() ...@@ -28,7 +28,7 @@ void test_valid_hwm_change()
* Test that zmq_setsockopt() fails to change the RCVHWM when called * Test that zmq_setsockopt() fails to change the RCVHWM when called
* after a call to zmq_bind(). * after a call to zmq_bind().
*/ */
void test_invalid_hwm_change() void test_invalid_hwm_change_bind()
{ {
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
...@@ -41,12 +41,37 @@ void test_invalid_hwm_change() ...@@ -41,12 +41,37 @@ void test_invalid_hwm_change()
assert (rc == 0); assert (rc == 0);
int val = 500; int val = 500;
rc = zmq_setsockopt(bind_socket, ZMQ_RCVHWM, &val, sizeof(val)); rc = zmq_setsockopt (bind_socket, ZMQ_RCVHWM, &val, sizeof(val));
assert (rc == -1); assert (rc == -1);
zmq_close (bind_socket);
zmq_ctx_term (ctx);
}
void test_invalid_hwm_change_connect()
{
void *ctx = zmq_ctx_new();
assert(ctx);
int rc;
void *connect_socket = zmq_socket (ctx, ZMQ_SUB);
assert(connect_socket);
rc = zmq_connect (connect_socket, "inproc://a");
assert(rc == 0);
int val = 500;
rc = zmq_setsockopt (connect_socket, ZMQ_RCVHWM, &val, sizeof(val));
assert(rc == -1);
zmq_close (connect_socket);
zmq_ctx_term (ctx);
} }
int main() int main()
{ {
test_valid_hwm_change(); test_valid_hwm_change();
test_invalid_hwm_change(); test_invalid_hwm_change_bind();
test_invalid_hwm_change_connect();
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment