Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
eded1f8b
Unverified
Commit
eded1f8b
authored
Feb 09, 2018
by
Luca Boccassi
Committed by
GitHub
Feb 09, 2018
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #2923 from sigiesec/fastpath
Add support for SIO_LOOPBACK_FAST_PATH on Windows
parents
9544dade
a5e76303
Show whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
101 additions
and
10 deletions
+101
-10
zmq.h
include/zmq.h
+1
-0
options.cpp
src/options.cpp
+16
-1
options.hpp
src/options.hpp
+3
-0
signaler.cpp
src/signaler.cpp
+16
-7
tcp.cpp
src/tcp.cpp
+24
-0
tcp.hpp
src/tcp.hpp
+2
-0
tcp_connecter.cpp
src/tcp_connecter.cpp
+4
-0
tcp_listener.cpp
src/tcp_listener.cpp
+4
-0
zmq_draft.h
src/zmq_draft.h
+1
-0
test_pair_tcp.cpp
tests/test_pair_tcp.cpp
+30
-2
No files found.
include/zmq.h
View file @
eded1f8b
...
@@ -584,6 +584,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread);
...
@@ -584,6 +584,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread);
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL_NAMETYPE 91
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL_NAMETYPE 91
#define ZMQ_BINDTODEVICE 92
#define ZMQ_BINDTODEVICE 92
#define ZMQ_ZAP_ENFORCE_DOMAIN 93
#define ZMQ_ZAP_ENFORCE_DOMAIN 93
#define ZMQ_LOOPBACK_FASTPATH 94
/* DRAFT 0MQ socket events and monitoring */
/* DRAFT 0MQ socket events and monitoring */
/* Unspecified system errors during handshake. Event value is an errno. */
/* Unspecified system errors during handshake. Event value is an errno. */
...
...
src/options.cpp
View file @
eded1f8b
...
@@ -90,7 +90,8 @@ zmq::options_t::options_t () :
...
@@ -90,7 +90,8 @@ zmq::options_t::options_t () :
heartbeat_interval
(
0
),
heartbeat_interval
(
0
),
heartbeat_timeout
(
-
1
),
heartbeat_timeout
(
-
1
),
use_fd
(
-
1
),
use_fd
(
-
1
),
zap_enforce_domain
(
false
)
zap_enforce_domain
(
false
),
loopback_fastpath
(
false
)
{
{
memset
(
curve_public_key
,
0
,
CURVE_KEYSIZE
);
memset
(
curve_public_key
,
0
,
CURVE_KEYSIZE
);
memset
(
curve_secret_key
,
0
,
CURVE_KEYSIZE
);
memset
(
curve_secret_key
,
0
,
CURVE_KEYSIZE
);
...
@@ -627,6 +628,13 @@ int zmq::options_t::setsockopt (int option_,
...
@@ -627,6 +628,13 @@ int zmq::options_t::setsockopt (int option_,
}
}
break
;
break
;
case
ZMQ_LOOPBACK_FASTPATH
:
if
(
is_int
)
{
loopback_fastpath
=
(
value
!=
0
);
return
0
;
}
break
;
default
:
default
:
#if defined(ZMQ_ACT_MILITANT)
#if defined(ZMQ_ACT_MILITANT)
...
@@ -1064,6 +1072,13 @@ int zmq::options_t::getsockopt (int option_,
...
@@ -1064,6 +1072,13 @@ int zmq::options_t::getsockopt (int option_,
}
}
break
;
break
;
case
ZMQ_LOOPBACK_FASTPATH
:
if
(
is_int
)
{
*
value
=
loopback_fastpath
;
return
0
;
}
break
;
default
:
default
:
#if defined(ZMQ_ACT_MILITANT)
#if defined(ZMQ_ACT_MILITANT)
malformed
=
false
;
malformed
=
false
;
...
...
src/options.hpp
View file @
eded1f8b
...
@@ -248,6 +248,9 @@ struct options_t
...
@@ -248,6 +248,9 @@ struct options_t
// Enforce a non-empty ZAP domain requirement for PLAIN auth
// Enforce a non-empty ZAP domain requirement for PLAIN auth
bool
zap_enforce_domain
;
bool
zap_enforce_domain
;
// Use of loopback fastpath.
bool
loopback_fastpath
;
};
};
}
}
...
...
src/signaler.cpp
View file @
eded1f8b
...
@@ -65,6 +65,7 @@
...
@@ -65,6 +65,7 @@
#include "err.hpp"
#include "err.hpp"
#include "fd.hpp"
#include "fd.hpp"
#include "ip.hpp"
#include "ip.hpp"
#include "tcp.hpp"
#if defined ZMQ_HAVE_EVENTFD
#if defined ZMQ_HAVE_EVENTFD
#include <sys/eventfd.h>
#include <sys/eventfd.h>
...
@@ -384,6 +385,18 @@ void zmq::signaler_t::forked ()
...
@@ -384,6 +385,18 @@ void zmq::signaler_t::forked ()
}
}
#endif
#endif
#if defined ZMQ_HAVE_WINDOWS
static
void
tune_socket
(
const
SOCKET
socket
)
{
BOOL
tcp_nodelay
=
1
;
int
rc
=
setsockopt
(
socket
,
IPPROTO_TCP
,
TCP_NODELAY
,
(
char
*
)
&
tcp_nodelay
,
sizeof
tcp_nodelay
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
zmq
::
tcp_tune_loopback_fast_path
(
socket
);
}
#endif
// Returns -1 if we could not make the socket pair successfully
// Returns -1 if we could not make the socket pair successfully
int
zmq
::
signaler_t
::
make_fdpair
(
fd_t
*
r_
,
fd_t
*
w_
)
int
zmq
::
signaler_t
::
make_fdpair
(
fd_t
*
r_
,
fd_t
*
w_
)
{
{
...
@@ -483,10 +496,8 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
...
@@ -483,10 +496,8 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
int
rc
=
setsockopt
(
listener
,
SOL_SOCKET
,
SO_REUSEADDR
,
int
rc
=
setsockopt
(
listener
,
SOL_SOCKET
,
SO_REUSEADDR
,
(
char
*
)
&
so_reuseaddr
,
sizeof
so_reuseaddr
);
(
char
*
)
&
so_reuseaddr
,
sizeof
so_reuseaddr
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
BOOL
tcp_nodelay
=
1
;
rc
=
setsockopt
(
listener
,
IPPROTO_TCP
,
TCP_NODELAY
,
(
char
*
)
&
tcp_nodelay
,
tune_socket
(
listener
);
sizeof
tcp_nodelay
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
// Init sockaddr to signaler port.
// Init sockaddr to signaler port.
struct
sockaddr_in
addr
;
struct
sockaddr_in
addr
;
...
@@ -500,9 +511,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
...
@@ -500,9 +511,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
wsa_assert
(
*
w_
!=
INVALID_SOCKET
);
wsa_assert
(
*
w_
!=
INVALID_SOCKET
);
// Set TCP_NODELAY on writer socket.
// Set TCP_NODELAY on writer socket.
rc
=
setsockopt
(
*
w_
,
IPPROTO_TCP
,
TCP_NODELAY
,
(
char
*
)
&
tcp_nodelay
,
tune_socket
(
*
w_
);
sizeof
tcp_nodelay
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
if
(
sync
!=
NULL
)
{
if
(
sync
!=
NULL
)
{
// Enter the critical section.
// Enter the critical section.
...
...
src/tcp.cpp
View file @
eded1f8b
...
@@ -332,3 +332,27 @@ void zmq::tcp_assert_tuning_error (zmq::fd_t s_, int rc_)
...
@@ -332,3 +332,27 @@ void zmq::tcp_assert_tuning_error (zmq::fd_t s_, int rc_)
}
}
#endif
#endif
}
}
void
zmq
::
tcp_tune_loopback_fast_path
(
const
fd_t
socket_
)
{
#if defined ZMQ_HAVE_WINDOWS
int
sio_loopback_fastpath
=
1
;
DWORD
numberOfBytesReturned
=
0
;
int
rc
=
WSAIoctl
(
socket_
,
SIO_LOOPBACK_FAST_PATH
,
&
sio_loopback_fastpath
,
sizeof
sio_loopback_fastpath
,
NULL
,
0
,
&
numberOfBytesReturned
,
0
,
0
);
if
(
SOCKET_ERROR
==
rc
)
{
DWORD
lastError
=
::
WSAGetLastError
();
if
(
WSAEOPNOTSUPP
==
lastError
)
{
// This system is not Windows 8 or Server 2012, and the call is not supported.
}
else
{
wsa_assert
(
false
);
}
}
#else
LIBZMQ_UNUSED
(
socket_
);
#endif
}
src/tcp.hpp
View file @
eded1f8b
...
@@ -66,6 +66,8 @@ int tcp_read (fd_t s_, void *data_, size_t size_);
...
@@ -66,6 +66,8 @@ int tcp_read (fd_t s_, void *data_, size_t size_);
// Asserts that an internal error did not occur. Does not assert
// Asserts that an internal error did not occur. Does not assert
// on network errors such as reset or aborted connections.
// on network errors such as reset or aborted connections.
void
tcp_assert_tuning_error
(
fd_t
s_
,
int
rc_
);
void
tcp_assert_tuning_error
(
fd_t
s_
,
int
rc_
);
void
tcp_tune_loopback_fast_path
(
fd_t
socket_
);
}
}
#endif
#endif
src/tcp_connecter.cpp
View file @
eded1f8b
...
@@ -302,6 +302,10 @@ int zmq::tcp_connecter_t::open ()
...
@@ -302,6 +302,10 @@ int zmq::tcp_connecter_t::open ()
// Set the socket to non-blocking mode so that we get async connect().
// Set the socket to non-blocking mode so that we get async connect().
unblock_socket
(
s
);
unblock_socket
(
s
);
// Set the socket to loopback fastpath if configured.
if
(
options
.
loopback_fastpath
)
tcp_tune_loopback_fast_path
(
s
);
// Set the socket buffer limits for the underlying socket.
// Set the socket buffer limits for the underlying socket.
if
(
options
.
sndbuf
>=
0
)
if
(
options
.
sndbuf
>=
0
)
set_tcp_send_buffer
(
s
,
options
.
sndbuf
);
set_tcp_send_buffer
(
s
,
options
.
sndbuf
);
...
...
src/tcp_listener.cpp
View file @
eded1f8b
...
@@ -216,6 +216,10 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
...
@@ -216,6 +216,10 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
if
(
options
.
tos
!=
0
)
if
(
options
.
tos
!=
0
)
set_ip_type_of_service
(
s
,
options
.
tos
);
set_ip_type_of_service
(
s
,
options
.
tos
);
// Set the socket to loopback fastpath if configured.
if
(
options
.
loopback_fastpath
)
tcp_tune_loopback_fast_path
(
s
);
// Bind the socket to a device if applicable
// Bind the socket to a device if applicable
if
(
!
options
.
bound_device
.
empty
())
if
(
!
options
.
bound_device
.
empty
())
bind_to_device
(
s
,
options
.
bound_device
);
bind_to_device
(
s
,
options
.
bound_device
);
...
...
src/zmq_draft.h
View file @
eded1f8b
...
@@ -55,6 +55,7 @@ unsigned long zmq_stopwatch_intermediate (void *watch_);
...
@@ -55,6 +55,7 @@ unsigned long zmq_stopwatch_intermediate (void *watch_);
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL_NAMETYPE 91
#define ZMQ_GSSAPI_SERVICE_PRINCIPAL_NAMETYPE 91
#define ZMQ_BINDTODEVICE 92
#define ZMQ_BINDTODEVICE 92
#define ZMQ_ZAP_ENFORCE_DOMAIN 93
#define ZMQ_ZAP_ENFORCE_DOMAIN 93
#define ZMQ_LOOPBACK_FASTPATH 94
/* DRAFT 0MQ socket events and monitoring */
/* DRAFT 0MQ socket events and monitoring */
/* Unspecified system errors during handshake. Event value is an errno. */
/* Unspecified system errors during handshake. Event value is an errno. */
...
...
tests/test_pair_tcp.cpp
View file @
eded1f8b
...
@@ -29,9 +29,20 @@
...
@@ -29,9 +29,20 @@
#include "testutil.hpp"
#include "testutil.hpp"
int
main
(
void
)
typedef
void
(
*
extra_func_t
)
(
void
*
socket
);
#ifdef ZMQ_BUILD_DRAFT
void
set_sockopt_fastpath
(
void
*
socket
)
{
int
value
=
1
;
int
rc
=
zmq_setsockopt
(
socket
,
ZMQ_LOOPBACK_FASTPATH
,
&
value
,
sizeof
value
);
assert
(
rc
==
0
);
}
#endif
void
test_pair_tcp
(
extra_func_t
extra_func
=
NULL
)
{
{
setup_test_environment
();
size_t
len
=
MAX_SOCKET_STRING
;
size_t
len
=
MAX_SOCKET_STRING
;
char
my_endpoint
[
MAX_SOCKET_STRING
];
char
my_endpoint
[
MAX_SOCKET_STRING
];
void
*
ctx
=
zmq_ctx_new
();
void
*
ctx
=
zmq_ctx_new
();
...
@@ -39,6 +50,10 @@ int main (void)
...
@@ -39,6 +50,10 @@ int main (void)
void
*
sb
=
zmq_socket
(
ctx
,
ZMQ_PAIR
);
void
*
sb
=
zmq_socket
(
ctx
,
ZMQ_PAIR
);
assert
(
sb
);
assert
(
sb
);
if
(
extra_func
)
extra_func
(
sb
);
int
rc
=
zmq_bind
(
sb
,
"tcp://127.0.0.1:*"
);
int
rc
=
zmq_bind
(
sb
,
"tcp://127.0.0.1:*"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
rc
=
zmq_getsockopt
(
sb
,
ZMQ_LAST_ENDPOINT
,
my_endpoint
,
&
len
);
rc
=
zmq_getsockopt
(
sb
,
ZMQ_LAST_ENDPOINT
,
my_endpoint
,
&
len
);
...
@@ -46,6 +61,9 @@ int main (void)
...
@@ -46,6 +61,9 @@ int main (void)
void
*
sc
=
zmq_socket
(
ctx
,
ZMQ_PAIR
);
void
*
sc
=
zmq_socket
(
ctx
,
ZMQ_PAIR
);
assert
(
sc
);
assert
(
sc
);
if
(
extra_func
)
extra_func
(
sc
);
rc
=
zmq_connect
(
sc
,
my_endpoint
);
rc
=
zmq_connect
(
sc
,
my_endpoint
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
...
@@ -59,6 +77,16 @@ int main (void)
...
@@ -59,6 +77,16 @@ int main (void)
rc
=
zmq_ctx_term
(
ctx
);
rc
=
zmq_ctx_term
(
ctx
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
}
int
main
(
void
)
{
setup_test_environment
();
test_pair_tcp
();
#ifdef ZMQ_BUILD_DRAFT
test_pair_tcp
(
set_sockopt_fastpath
);
#endif
return
0
;
return
0
;
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment