Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
afe5fd87
Commit
afe5fd87
authored
May 09, 2014
by
Will Strang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Issue #1017: add ZMQ_HANDSHAKE_IVL time limit on connection handshake
parent
1cf12ee6
Show whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
314 additions
and
1 deletion
+314
-1
zmq_getsockopt.txt
doc/zmq_getsockopt.txt
+16
-0
zmq_setsockopt.txt
doc/zmq_setsockopt.txt
+16
-0
zmq.h
include/zmq.h
+1
-0
options.cpp
src/options.cpp
+16
-1
options.hpp
src/options.hpp
+5
-0
stream_engine.cpp
src/stream_engine.cpp
+34
-0
stream_engine.hpp
src/stream_engine.hpp
+9
-0
Makefile.am
tests/Makefile.am
+1
-0
test_stream_timeout.cpp
tests/test_stream_timeout.cpp
+216
-0
No files found.
doc/zmq_getsockopt.txt
View file @
afe5fd87
...
@@ -164,6 +164,22 @@ Default value:: N/A
...
@@ -164,6 +164,22 @@ Default value:: N/A
Applicable socket types:: all
Applicable socket types:: all
ZMQ_HANDSHAKE_IVL: Retrieve maximum handshake interval
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_HANDSHAKE_IVL' option shall retrieve the maximum handshake interval
for the specified 'socket'. Handshaking is the exchange of socket configuration
information (socket type, identity, security) that occurs when a connection
is first opened, only for connection-oriented transports. If handshaking does
not complete within the configured time, the connection shall be closed.
The value 0 means no handshake time limit.
[horizontal]
Option value type:: int
Option value unit:: milliseconds
Default value:: 30000
Applicable socket types:: all but ZMQ_STREAM, only for connection-oriented transports
ZMQ_IDENTITY: Retrieve socket identity
ZMQ_IDENTITY: Retrieve socket identity
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_IDENTITY' option shall retrieve the identity of the specified 'socket'.
The 'ZMQ_IDENTITY' option shall retrieve the identity of the specified 'socket'.
...
...
doc/zmq_setsockopt.txt
View file @
afe5fd87
...
@@ -170,6 +170,22 @@ Default value:: NULL
...
@@ -170,6 +170,22 @@ Default value:: NULL
Applicable socket types:: all, when using TCP transport
Applicable socket types:: all, when using TCP transport
ZMQ_HANDSHAKE_IVL: Set maximum handshake interval
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_HANDSHAKE_IVL' option shall set the maximum handshake interval for
the specified 'socket'. Handshaking is the exchange of socket configuration
information (socket type, identity, security) that occurs when a connection
is first opened, only for connection-oriented transports. If handshaking does
not complete within the configured time, the connection shall be closed.
The value 0 means no handshake time limit.
[horizontal]
Option value type:: int
Option value unit:: milliseconds
Default value:: 30000
Applicable socket types:: all but ZMQ_STREAM, only for connection-oriented transports
ZMQ_IDENTITY: Set socket identity
ZMQ_IDENTITY: Set socket identity
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_IDENTITY' option shall set the identity of the specified 'socket'
The 'ZMQ_IDENTITY' option shall set the identity of the specified 'socket'
...
...
include/zmq.h
View file @
afe5fd87
...
@@ -300,6 +300,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
...
@@ -300,6 +300,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
#define ZMQ_GSSAPI_PRINCIPAL 63
#define ZMQ_GSSAPI_PRINCIPAL 63
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL 64
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL 64
#define ZMQ_GSSAPI_PLAINTEXT 65
#define ZMQ_GSSAPI_PLAINTEXT 65
#define ZMQ_HANDSHAKE_IVL 66
/* Message options */
/* Message options */
#define ZMQ_MORE 1
#define ZMQ_MORE 1
...
...
src/options.cpp
View file @
afe5fd87
...
@@ -55,7 +55,8 @@ zmq::options_t::options_t () :
...
@@ -55,7 +55,8 @@ zmq::options_t::options_t () :
as_server
(
0
),
as_server
(
0
),
gss_plaintext
(
false
),
gss_plaintext
(
false
),
socket_id
(
0
),
socket_id
(
0
),
conflate
(
false
)
conflate
(
false
),
handshake_ivl
(
30000
)
{
{
}
}
...
@@ -435,6 +436,13 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
...
@@ -435,6 +436,13 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
}
}
break
;
break
;
case
ZMQ_HANDSHAKE_IVL
:
if
(
is_int
&&
value
>=
0
)
{
handshake_ivl
=
value
;
return
0
;
}
break
;
default
:
default
:
break
;
break
;
...
@@ -746,6 +754,13 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
...
@@ -746,6 +754,13 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
}
}
break
;
break
;
case
ZMQ_HANDSHAKE_IVL
:
if
(
is_int
)
{
*
value
=
handshake_ivl
;
return
0
;
}
break
;
}
}
errno
=
EINVAL
;
errno
=
EINVAL
;
...
...
src/options.hpp
View file @
afe5fd87
...
@@ -171,6 +171,11 @@ namespace zmq
...
@@ -171,6 +171,11 @@ namespace zmq
// Cannot receive multi-part messages.
// Cannot receive multi-part messages.
// Ignores hwm
// Ignores hwm
bool
conflate
;
bool
conflate
;
// If connection handshake is not done after this many milliseconds,
// close socket. Default is 30 secs. 0 means no handshake timeout.
int
handshake_ivl
;
};
};
}
}
...
...
src/stream_engine.cpp
View file @
afe5fd87
...
@@ -80,6 +80,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
...
@@ -80,6 +80,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
mechanism
(
NULL
),
mechanism
(
NULL
),
input_stopped
(
false
),
input_stopped
(
false
),
output_stopped
(
false
),
output_stopped
(
false
),
has_handshake_timer
(
false
),
socket
(
NULL
)
socket
(
NULL
)
{
{
int
rc
=
tx_msg
.
init
();
int
rc
=
tx_msg
.
init
();
...
@@ -197,6 +198,9 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
...
@@ -197,6 +198,9 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
session
->
flush
();
session
->
flush
();
}
}
else
{
else
{
// start optional timer, to prevent handshake hanging on no input
set_handshake_timer
();
// Send the 'length' and 'flags' fields of the identity message.
// Send the 'length' and 'flags' fields of the identity message.
// The 'length' field is encoded in the long format.
// The 'length' field is encoded in the long format.
outpos
=
greeting_send
;
outpos
=
greeting_send
;
...
@@ -217,6 +221,12 @@ void zmq::stream_engine_t::unplug ()
...
@@ -217,6 +221,12 @@ void zmq::stream_engine_t::unplug ()
zmq_assert
(
plugged
);
zmq_assert
(
plugged
);
plugged
=
false
;
plugged
=
false
;
// Cancel all timers.
if
(
has_handshake_timer
)
{
cancel_timer
(
handshake_timer_id
);
has_handshake_timer
=
false
;
}
// Cancel all fd subscriptions.
// Cancel all fd subscriptions.
if
(
!
io_error
)
if
(
!
io_error
)
rm_fd
(
handle
);
rm_fd
(
handle
);
...
@@ -631,6 +641,11 @@ bool zmq::stream_engine_t::handshake ()
...
@@ -631,6 +641,11 @@ bool zmq::stream_engine_t::handshake ()
// Switch into the normal message flow.
// Switch into the normal message flow.
handshaking
=
false
;
handshaking
=
false
;
if
(
has_handshake_timer
)
{
cancel_timer
(
handshake_timer_id
);
has_handshake_timer
=
false
;
}
return
true
;
return
true
;
}
}
...
@@ -969,3 +984,22 @@ int zmq::stream_engine_t::read (void *data_, size_t size_)
...
@@ -969,3 +984,22 @@ int zmq::stream_engine_t::read (void *data_, size_t size_)
#endif
#endif
}
}
void
zmq
::
stream_engine_t
::
set_handshake_timer
()
{
zmq_assert
(
!
has_handshake_timer
);
if
(
!
options
.
raw_sock
&&
options
.
handshake_ivl
>
0
)
{
add_timer
(
options
.
handshake_ivl
,
handshake_timer_id
);
has_handshake_timer
=
true
;
}
}
void
zmq
::
stream_engine_t
::
timer_event
(
int
id_
)
{
zmq_assert
(
id_
==
handshake_timer_id
);
has_handshake_timer
=
false
;
// handshake timer expired before handshake completed, so engine fails
error
();
}
src/stream_engine.hpp
View file @
afe5fd87
...
@@ -68,6 +68,7 @@ namespace zmq
...
@@ -68,6 +68,7 @@ namespace zmq
// i_poll_events interface implementation.
// i_poll_events interface implementation.
void
in_event
();
void
in_event
();
void
out_event
();
void
out_event
();
void
timer_event
(
int
id_
);
private
:
private
:
...
@@ -114,6 +115,8 @@ namespace zmq
...
@@ -114,6 +115,8 @@ namespace zmq
size_t
add_property
(
unsigned
char
*
ptr
,
size_t
add_property
(
unsigned
char
*
ptr
,
const
char
*
name
,
const
void
*
value
,
size_t
value_len
);
const
char
*
name
,
const
void
*
value
,
size_t
value_len
);
void
set_handshake_timer
();
// Underlying socket.
// Underlying socket.
fd_t
s
;
fd_t
s
;
...
@@ -187,6 +190,12 @@ namespace zmq
...
@@ -187,6 +190,12 @@ namespace zmq
// True iff the engine doesn't have any message to encode.
// True iff the engine doesn't have any message to encode.
bool
output_stopped
;
bool
output_stopped
;
// ID of the handshake timer
enum
{
handshake_timer_id
=
0x40
};
// True is linger timer is running.
bool
has_handshake_timer
;
// Socket
// Socket
zmq
::
socket_base_t
*
socket
;
zmq
::
socket_base_t
*
socket
;
...
...
tests/Makefile.am
View file @
afe5fd87
...
@@ -25,6 +25,7 @@ noinst_PROGRAMS = test_system \
...
@@ -25,6 +25,7 @@ noinst_PROGRAMS = test_system \
test_stream
\
test_stream
\
test_stream_empty
\
test_stream_empty
\
test_stream_disconnect
\
test_stream_disconnect
\
test_stream_timeout
\
test_disconnect_inproc
\
test_disconnect_inproc
\
test_ctx_options
\
test_ctx_options
\
test_ctx_destroy
\
test_ctx_destroy
\
...
...
tests/test_stream_timeout.cpp
0 → 100644
View file @
afe5fd87
/*
Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
// Read one event off the monitor socket; return value and address
// by reference, if not null, and event number by value. Returns -1
// in case of error.
static
int
get_monitor_event
(
void
*
monitor
,
int
*
value
,
char
**
address
)
{
// First frame in message contains event number and value
zmq_msg_t
msg
;
zmq_msg_init
(
&
msg
);
if
(
zmq_msg_recv
(
&
msg
,
monitor
,
0
)
==
-
1
)
return
-
1
;
// Interruped, presumably
assert
(
zmq_msg_more
(
&
msg
));
uint8_t
*
data
=
(
uint8_t
*
)
zmq_msg_data
(
&
msg
);
uint16_t
event
=
*
(
uint16_t
*
)
(
data
);
if
(
value
)
*
value
=
*
(
uint32_t
*
)
(
data
+
2
);
// Second frame in message contains event address
zmq_msg_init
(
&
msg
);
if
(
zmq_msg_recv
(
&
msg
,
monitor
,
0
)
==
-
1
)
return
-
1
;
// Interruped, presumably
assert
(
!
zmq_msg_more
(
&
msg
));
if
(
address
)
{
uint8_t
*
data
=
(
uint8_t
*
)
zmq_msg_data
(
&
msg
);
size_t
size
=
zmq_msg_size
(
&
msg
);
*
address
=
(
char
*
)
malloc
(
size
+
1
);
memcpy
(
*
address
,
data
,
size
);
*
address
[
size
]
=
0
;
}
return
event
;
}
static
void
test_stream_handshake_timeout_accept
(
void
)
{
int
rc
;
// Set up our context and sockets
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
// We use this socket in raw mode, to make a connection and send nothing
void
*
stream
=
zmq_socket
(
ctx
,
ZMQ_STREAM
);
assert
(
stream
);
int
zero
=
0
;
rc
=
zmq_setsockopt
(
stream
,
ZMQ_LINGER
,
&
zero
,
sizeof
(
zero
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
stream
,
"tcp://localhost:5557"
);
assert
(
rc
==
0
);
// We'll be using this socket to test TCP stream handshake timeout
void
*
dealer
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
dealer
);
rc
=
zmq_setsockopt
(
dealer
,
ZMQ_LINGER
,
&
zero
,
sizeof
(
zero
));
assert
(
rc
==
0
);
int
val
,
tenth
=
100
;
size_t
vsize
=
sizeof
(
val
);
// check for the expected default handshake timeout value - 30 sec
rc
=
zmq_getsockopt
(
dealer
,
ZMQ_HANDSHAKE_IVL
,
&
val
,
&
vsize
);
assert
(
rc
==
0
);
assert
(
vsize
==
sizeof
(
val
));
assert
(
val
==
30000
);
// make handshake timeout faster - 1/10 sec
rc
=
zmq_setsockopt
(
dealer
,
ZMQ_HANDSHAKE_IVL
,
&
tenth
,
sizeof
(
tenth
));
assert
(
rc
==
0
);
vsize
=
sizeof
(
val
);
// make sure zmq_setsockopt changed the value
rc
=
zmq_getsockopt
(
dealer
,
ZMQ_HANDSHAKE_IVL
,
&
val
,
&
vsize
);
assert
(
rc
==
0
);
assert
(
vsize
==
sizeof
(
val
));
assert
(
val
==
tenth
);
// Create and connect a socket for collecting monitor events on dealer
void
*
dealer_mon
=
zmq_socket
(
ctx
,
ZMQ_PAIR
);
assert
(
dealer_mon
);
rc
=
zmq_socket_monitor
(
dealer
,
"inproc://monitor-dealer"
,
ZMQ_EVENT_CONNECTED
|
ZMQ_EVENT_DISCONNECTED
|
ZMQ_EVENT_ACCEPTED
);
assert
(
rc
==
0
);
// Connect to the inproc endpoint so we'll get events
rc
=
zmq_connect
(
dealer_mon
,
"inproc://monitor-dealer"
);
assert
(
rc
==
0
);
// bind dealer socket to accept connection from non-sending stream socket
rc
=
zmq_bind
(
dealer
,
"tcp://127.0.0.1:5557"
);
assert
(
rc
==
0
);
// we should get ZMQ_EVENT_ACCEPTED and then ZMQ_EVENT_DISCONNECTED
int
event
=
get_monitor_event
(
dealer_mon
,
NULL
,
NULL
);
assert
(
event
==
ZMQ_EVENT_ACCEPTED
);
event
=
get_monitor_event
(
dealer_mon
,
NULL
,
NULL
);
assert
(
event
==
ZMQ_EVENT_DISCONNECTED
);
rc
=
zmq_close
(
dealer
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
dealer_mon
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
stream
);
assert
(
rc
==
0
);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
}
static
void
test_stream_handshake_timeout_connect
(
void
)
{
int
rc
;
// Set up our context and sockets
void
*
ctx
=
zmq_ctx_new
();
assert
(
ctx
);
// We use this socket in raw mode, to accept a connection and send nothing
void
*
stream
=
zmq_socket
(
ctx
,
ZMQ_STREAM
);
assert
(
stream
);
int
zero
=
0
;
rc
=
zmq_setsockopt
(
stream
,
ZMQ_LINGER
,
&
zero
,
sizeof
(
zero
));
assert
(
rc
==
0
);
rc
=
zmq_bind
(
stream
,
"tcp://127.0.0.1:5556"
);
assert
(
rc
==
0
);
// We'll be using this socket to test TCP stream handshake timeout
void
*
dealer
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
dealer
);
rc
=
zmq_setsockopt
(
dealer
,
ZMQ_LINGER
,
&
zero
,
sizeof
(
zero
));
assert
(
rc
==
0
);
int
val
,
tenth
=
100
;
size_t
vsize
=
sizeof
(
val
);
// check for the expected default handshake timeout value - 30 sec
rc
=
zmq_getsockopt
(
dealer
,
ZMQ_HANDSHAKE_IVL
,
&
val
,
&
vsize
);
assert
(
rc
==
0
);
assert
(
vsize
==
sizeof
(
val
));
assert
(
val
==
30000
);
// make handshake timeout faster - 1/10 sec
rc
=
zmq_setsockopt
(
dealer
,
ZMQ_HANDSHAKE_IVL
,
&
tenth
,
sizeof
(
tenth
));
assert
(
rc
==
0
);
vsize
=
sizeof
(
val
);
// make sure zmq_setsockopt changed the value
rc
=
zmq_getsockopt
(
dealer
,
ZMQ_HANDSHAKE_IVL
,
&
val
,
&
vsize
);
assert
(
rc
==
0
);
assert
(
vsize
==
sizeof
(
val
));
assert
(
val
==
tenth
);
// Create and connect a socket for collecting monitor events on dealer
void
*
dealer_mon
=
zmq_socket
(
ctx
,
ZMQ_PAIR
);
assert
(
dealer_mon
);
rc
=
zmq_socket_monitor
(
dealer
,
"inproc://monitor-dealer"
,
ZMQ_EVENT_CONNECTED
|
ZMQ_EVENT_DISCONNECTED
|
ZMQ_EVENT_ACCEPTED
);
assert
(
rc
==
0
);
// Connect to the inproc endpoint so we'll get events
rc
=
zmq_connect
(
dealer_mon
,
"inproc://monitor-dealer"
);
assert
(
rc
==
0
);
// connect dealer socket to non-sending stream socket
rc
=
zmq_connect
(
dealer
,
"tcp://localhost:5556"
);
assert
(
rc
==
0
);
// we should get ZMQ_EVENT_CONNECTED and then ZMQ_EVENT_DISCONNECTED
int
event
=
get_monitor_event
(
dealer_mon
,
NULL
,
NULL
);
assert
(
event
==
ZMQ_EVENT_CONNECTED
);
event
=
get_monitor_event
(
dealer_mon
,
NULL
,
NULL
);
assert
(
event
==
ZMQ_EVENT_DISCONNECTED
);
rc
=
zmq_close
(
dealer
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
dealer_mon
);
assert
(
rc
==
0
);
rc
=
zmq_close
(
stream
);
assert
(
rc
==
0
);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
}
int
main
(
void
)
{
setup_test_environment
();
test_stream_handshake_timeout_accept
();
test_stream_handshake_timeout_connect
();
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment