Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
7ce68da2
Commit
7ce68da2
authored
Mar 30, 2017
by
Luca Boccassi
Committed by
GitHub
Mar 30, 2017
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #2482 from evoskuil/master
Problem: insufficient error handling relative to zap_connect.
parents
9c6fb099
0bfd747a
Show whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
155 additions
and
189 deletions
+155
-189
curve_server.cpp
src/curve_server.cpp
+21
-28
gssapi_server.cpp
src/gssapi_server.cpp
+28
-32
msg.hpp
src/msg.hpp
+15
-0
null_mechanism.cpp
src/null_mechanism.cpp
+18
-23
plain_server.cpp
src/plain_server.cpp
+22
-29
session_base.cpp
src/session_base.cpp
+3
-2
test_heartbeats.cpp
tests/test_heartbeats.cpp
+48
-75
No files found.
src/curve_server.cpp
View file @
7ce68da2
...
@@ -491,7 +491,8 @@ int zmq::curve_server_t::process_initiate (msg_t *msg_)
...
@@ -491,7 +491,8 @@ int zmq::curve_server_t::process_initiate (msg_t *msg_)
// Use ZAP protocol (RFC 27) to authenticate the user.
// Use ZAP protocol (RFC 27) to authenticate the user.
rc
=
session
->
zap_connect
();
rc
=
session
->
zap_connect
();
if
(
rc
==
0
)
{
if
(
rc
!=
0
)
return
-
1
;
rc
=
send_zap_request
(
client_key
);
rc
=
send_zap_request
(
client_key
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
-
1
;
...
@@ -505,9 +506,6 @@ int zmq::curve_server_t::process_initiate (msg_t *msg_)
...
@@ -505,9 +506,6 @@ int zmq::curve_server_t::process_initiate (msg_t *msg_)
state
=
expect_zap_reply
;
state
=
expect_zap_reply
;
else
else
return
-
1
;
return
-
1
;
}
else
state
=
send_ready
;
return
parse_metadata
(
initiate_plaintext
+
crypto_box_ZEROBYTES
+
128
,
return
parse_metadata
(
initiate_plaintext
+
crypto_box_ZEROBYTES
+
128
,
clen
-
crypto_box_ZEROBYTES
-
128
);
clen
-
crypto_box_ZEROBYTES
-
128
);
...
@@ -582,7 +580,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key)
...
@@ -582,7 +580,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key)
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Version frame
// Version frame
rc
=
msg
.
init_size
(
3
);
rc
=
msg
.
init_size
(
3
);
...
@@ -591,7 +589,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key)
...
@@ -591,7 +589,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key)
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Request ID frame
// Request ID frame
rc
=
msg
.
init_size
(
1
);
rc
=
msg
.
init_size
(
1
);
...
@@ -600,7 +598,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key)
...
@@ -600,7 +598,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key)
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Domain frame
// Domain frame
rc
=
msg
.
init_size
(
options
.
zap_domain
.
length
());
rc
=
msg
.
init_size
(
options
.
zap_domain
.
length
());
...
@@ -609,7 +607,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key)
...
@@ -609,7 +607,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key)
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Address frame
// Address frame
rc
=
msg
.
init_size
(
peer_address
.
length
());
rc
=
msg
.
init_size
(
peer_address
.
length
());
...
@@ -618,7 +616,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key)
...
@@ -618,7 +616,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key)
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Identity frame
// Identity frame
rc
=
msg
.
init_size
(
options
.
identity_size
);
rc
=
msg
.
init_size
(
options
.
identity_size
);
...
@@ -627,7 +625,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key)
...
@@ -627,7 +625,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key)
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Mechanism frame
// Mechanism frame
rc
=
msg
.
init_size
(
5
);
rc
=
msg
.
init_size
(
5
);
...
@@ -636,7 +634,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key)
...
@@ -636,7 +634,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key)
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Credentials frame
// Credentials frame
rc
=
msg
.
init_size
(
crypto_box_PUBLICKEYBYTES
);
rc
=
msg
.
init_size
(
crypto_box_PUBLICKEYBYTES
);
...
@@ -644,7 +642,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key)
...
@@ -644,7 +642,7 @@ int zmq::curve_server_t::send_zap_request (const uint8_t *key)
memcpy
(
msg
.
data
(),
key
,
crypto_box_PUBLICKEYBYTES
);
memcpy
(
msg
.
data
(),
key
,
crypto_box_PUBLICKEYBYTES
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
return
0
;
return
0
;
}
}
...
@@ -663,26 +661,21 @@ int zmq::curve_server_t::receive_and_process_zap_reply ()
...
@@ -663,26 +661,21 @@ int zmq::curve_server_t::receive_and_process_zap_reply ()
for
(
int
i
=
0
;
i
<
7
;
i
++
)
{
for
(
int
i
=
0
;
i
<
7
;
i
++
)
{
rc
=
session
->
read_zap_msg
(
&
msg
[
i
]);
rc
=
session
->
read_zap_msg
(
&
msg
[
i
]);
if
(
rc
==
-
1
)
if
(
rc
==
-
1
)
break
;
return
send_failure
(
msg
)
;
if
((
msg
[
i
].
flags
()
&
msg_t
::
more
)
==
(
i
<
6
?
0
:
msg_t
::
more
))
{
if
((
msg
[
i
].
flags
()
&
msg_t
::
more
)
==
(
i
<
6
?
0
:
msg_t
::
more
))
{
// Temporary support for security debugging
// Temporary support for security debugging
puts
(
"CURVE I: ZAP handler sent incomplete reply message"
);
puts
(
"CURVE I: ZAP handler sent incomplete reply message"
);
errno
=
EPROTO
;
errno
=
EPROTO
;
rc
=
-
1
;
return
send_failure
(
msg
);
break
;
}
}
}
}
if
(
rc
!=
0
)
goto
error
;
// Address delimiter frame
// Address delimiter frame
if
(
msg
[
0
].
size
()
>
0
)
{
if
(
msg
[
0
].
size
()
>
0
)
{
// Temporary support for security debugging
// Temporary support for security debugging
puts
(
"CURVE I: ZAP handler sent malformed reply message"
);
puts
(
"CURVE I: ZAP handler sent malformed reply message"
);
errno
=
EPROTO
;
errno
=
EPROTO
;
rc
=
-
1
;
return
send_failure
(
msg
);
goto
error
;
}
}
// Version frame
// Version frame
...
@@ -690,8 +683,7 @@ int zmq::curve_server_t::receive_and_process_zap_reply ()
...
@@ -690,8 +683,7 @@ int zmq::curve_server_t::receive_and_process_zap_reply ()
// Temporary support for security debugging
// Temporary support for security debugging
puts
(
"CURVE I: ZAP handler sent bad version number"
);
puts
(
"CURVE I: ZAP handler sent bad version number"
);
errno
=
EPROTO
;
errno
=
EPROTO
;
rc
=
-
1
;
return
send_failure
(
msg
);
goto
error
;
}
}
// Request id frame
// Request id frame
...
@@ -699,8 +691,7 @@ int zmq::curve_server_t::receive_and_process_zap_reply ()
...
@@ -699,8 +691,7 @@ int zmq::curve_server_t::receive_and_process_zap_reply ()
// Temporary support for security debugging
// Temporary support for security debugging
puts
(
"CURVE I: ZAP handler sent bad request ID"
);
puts
(
"CURVE I: ZAP handler sent bad request ID"
);
errno
=
EPROTO
;
errno
=
EPROTO
;
rc
=
-
1
;
return
send_failure
(
msg
);
goto
error
;
}
}
// Status code frame
// Status code frame
...
@@ -708,8 +699,7 @@ int zmq::curve_server_t::receive_and_process_zap_reply ()
...
@@ -708,8 +699,7 @@ int zmq::curve_server_t::receive_and_process_zap_reply ()
// Temporary support for security debugging
// Temporary support for security debugging
puts
(
"CURVE I: ZAP handler rejected client authentication"
);
puts
(
"CURVE I: ZAP handler rejected client authentication"
);
errno
=
EACCES
;
errno
=
EACCES
;
rc
=
-
1
;
return
send_failure
(
msg
);
goto
error
;
}
}
// Save status code
// Save status code
...
@@ -722,13 +712,16 @@ int zmq::curve_server_t::receive_and_process_zap_reply ()
...
@@ -722,13 +712,16 @@ int zmq::curve_server_t::receive_and_process_zap_reply ()
rc
=
parse_metadata
(
static_cast
<
const
unsigned
char
*>
(
msg
[
6
].
data
()),
rc
=
parse_metadata
(
static_cast
<
const
unsigned
char
*>
(
msg
[
6
].
data
()),
msg
[
6
].
size
(),
true
);
msg
[
6
].
size
(),
true
);
error
:
if
(
rc
!=
0
)
return
send_failure
(
msg
);
// Close all reply frames
for
(
int
i
=
0
;
i
<
7
;
i
++
)
{
for
(
int
i
=
0
;
i
<
7
;
i
++
)
{
const
int
rc2
=
msg
[
i
].
close
();
const
int
rc2
=
msg
[
i
].
close
();
errno_assert
(
rc2
==
0
);
errno_assert
(
rc2
==
0
);
}
}
return
rc
;
return
0
;
}
}
#endif
#endif
src/gssapi_server.cpp
View file @
7ce68da2
...
@@ -120,20 +120,21 @@ int zmq::gssapi_server_t::process_handshake_command (msg_t *msg_)
...
@@ -120,20 +120,21 @@ int zmq::gssapi_server_t::process_handshake_command (msg_t *msg_)
if
(
security_context_established
)
{
if
(
security_context_established
)
{
// Use ZAP protocol (RFC 27) to authenticate the user.
// Use ZAP protocol (RFC 27) to authenticate the user.
bool
expecting_zap_reply
=
false
;
int
rc
=
session
->
zap_connect
();
int
rc
=
session
->
zap_connect
();
if
(
rc
==
0
)
{
if
(
rc
!=
0
)
return
-
1
;
rc
=
send_zap_request
();
rc
=
send_zap_request
();
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
-
1
;
rc
=
receive_and_process_zap_reply
();
rc
=
receive_and_process_zap_reply
();
if
(
rc
!=
0
)
{
if
(
rc
==
0
)
if
(
errno
!=
EAGAIN
)
state
=
send_ready
;
else
if
(
errno
==
EAGAIN
)
state
=
expect_zap_reply
;
else
return
-
1
;
return
-
1
;
expecting_zap_reply
=
true
;
}
}
state
=
expecting_zap_reply
?
expect_zap_reply
:
send_ready
;
return
0
;
return
0
;
}
}
...
@@ -160,7 +161,7 @@ int zmq::gssapi_server_t::send_zap_request ()
...
@@ -160,7 +161,7 @@ int zmq::gssapi_server_t::send_zap_request ()
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Version frame
// Version frame
rc
=
msg
.
init_size
(
3
);
rc
=
msg
.
init_size
(
3
);
...
@@ -169,7 +170,7 @@ int zmq::gssapi_server_t::send_zap_request ()
...
@@ -169,7 +170,7 @@ int zmq::gssapi_server_t::send_zap_request ()
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Request ID frame
// Request ID frame
rc
=
msg
.
init_size
(
1
);
rc
=
msg
.
init_size
(
1
);
...
@@ -178,7 +179,7 @@ int zmq::gssapi_server_t::send_zap_request ()
...
@@ -178,7 +179,7 @@ int zmq::gssapi_server_t::send_zap_request ()
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Domain frame
// Domain frame
rc
=
msg
.
init_size
(
options
.
zap_domain
.
length
());
rc
=
msg
.
init_size
(
options
.
zap_domain
.
length
());
...
@@ -187,7 +188,7 @@ int zmq::gssapi_server_t::send_zap_request ()
...
@@ -187,7 +188,7 @@ int zmq::gssapi_server_t::send_zap_request ()
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Address frame
// Address frame
rc
=
msg
.
init_size
(
peer_address
.
length
());
rc
=
msg
.
init_size
(
peer_address
.
length
());
...
@@ -196,7 +197,7 @@ int zmq::gssapi_server_t::send_zap_request ()
...
@@ -196,7 +197,7 @@ int zmq::gssapi_server_t::send_zap_request ()
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Identity frame
// Identity frame
rc
=
msg
.
init_size
(
options
.
identity_size
);
rc
=
msg
.
init_size
(
options
.
identity_size
);
...
@@ -205,7 +206,7 @@ int zmq::gssapi_server_t::send_zap_request ()
...
@@ -205,7 +206,7 @@ int zmq::gssapi_server_t::send_zap_request ()
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Mechanism frame
// Mechanism frame
rc
=
msg
.
init_size
(
6
);
rc
=
msg
.
init_size
(
6
);
...
@@ -214,7 +215,7 @@ int zmq::gssapi_server_t::send_zap_request ()
...
@@ -214,7 +215,7 @@ int zmq::gssapi_server_t::send_zap_request ()
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Principal frame
// Principal frame
gss_buffer_desc
principal
;
gss_buffer_desc
principal
;
...
@@ -226,7 +227,7 @@ int zmq::gssapi_server_t::send_zap_request ()
...
@@ -226,7 +227,7 @@ int zmq::gssapi_server_t::send_zap_request ()
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
gss_release_buffer
(
&
min_stat
,
&
principal
);
gss_release_buffer
(
&
min_stat
,
&
principal
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
return
0
;
return
0
;
}
}
...
@@ -245,43 +246,35 @@ int zmq::gssapi_server_t::receive_and_process_zap_reply ()
...
@@ -245,43 +246,35 @@ int zmq::gssapi_server_t::receive_and_process_zap_reply ()
for
(
int
i
=
0
;
i
<
7
;
i
++
)
{
for
(
int
i
=
0
;
i
<
7
;
i
++
)
{
rc
=
session
->
read_zap_msg
(
&
msg
[
i
]);
rc
=
session
->
read_zap_msg
(
&
msg
[
i
]);
if
(
rc
==
-
1
)
if
(
rc
==
-
1
)
break
;
return
send_failure
(
msg
)
;
if
((
msg
[
i
].
flags
()
&
msg_t
::
more
)
==
(
i
<
6
?
0
:
msg_t
::
more
))
{
if
((
msg
[
i
].
flags
()
&
msg_t
::
more
)
==
(
i
<
6
?
0
:
msg_t
::
more
))
{
errno
=
EPROTO
;
errno
=
EPROTO
;
rc
=
-
1
;
return
send_failure
(
msg
);
break
;
}
}
}
}
if
(
rc
!=
0
)
goto
error
;
// Address delimiter frame
// Address delimiter frame
if
(
msg
[
0
].
size
()
>
0
)
{
if
(
msg
[
0
].
size
()
>
0
)
{
rc
=
-
1
;
errno
=
EPROTO
;
errno
=
EPROTO
;
goto
error
;
return
send_failure
(
msg
)
;
}
}
// Version frame
// Version frame
if
(
msg
[
1
].
size
()
!=
3
||
memcmp
(
msg
[
1
].
data
(),
"1.0"
,
3
))
{
if
(
msg
[
1
].
size
()
!=
3
||
memcmp
(
msg
[
1
].
data
(),
"1.0"
,
3
))
{
rc
=
-
1
;
errno
=
EPROTO
;
errno
=
EPROTO
;
goto
error
;
return
send_failure
(
msg
)
;
}
}
// Request id frame
// Request id frame
if
(
msg
[
2
].
size
()
!=
1
||
memcmp
(
msg
[
2
].
data
(),
"1"
,
1
))
{
if
(
msg
[
2
].
size
()
!=
1
||
memcmp
(
msg
[
2
].
data
(),
"1"
,
1
))
{
rc
=
-
1
;
errno
=
EPROTO
;
errno
=
EPROTO
;
goto
error
;
return
send_failure
(
msg
)
;
}
}
// Status code frame
// Status code frame
if
(
msg
[
3
].
size
()
!=
3
||
memcmp
(
msg
[
3
].
data
(),
"200"
,
3
))
{
if
(
msg
[
3
].
size
()
!=
3
||
memcmp
(
msg
[
3
].
data
(),
"200"
,
3
))
{
rc
=
-
1
;
errno
=
EACCES
;
errno
=
EACCES
;
goto
error
;
return
send_failure
(
msg
)
;
}
}
// Save user id
// Save user id
...
@@ -291,13 +284,16 @@ int zmq::gssapi_server_t::receive_and_process_zap_reply ()
...
@@ -291,13 +284,16 @@ int zmq::gssapi_server_t::receive_and_process_zap_reply ()
rc
=
parse_metadata
(
static_cast
<
const
unsigned
char
*>
(
msg
[
6
].
data
()),
rc
=
parse_metadata
(
static_cast
<
const
unsigned
char
*>
(
msg
[
6
].
data
()),
msg
[
6
].
size
(),
true
);
msg
[
6
].
size
(),
true
);
error
:
if
(
rc
!=
0
)
return
send_failure
(
msg
);
// Close all reply frames
for
(
int
i
=
0
;
i
<
7
;
i
++
)
{
for
(
int
i
=
0
;
i
<
7
;
i
++
)
{
const
int
rc2
=
msg
[
i
].
close
();
const
int
rc2
=
msg
[
i
].
close
();
errno_assert
(
rc2
==
0
);
errno_assert
(
rc2
==
0
);
}
}
return
rc
;
return
0
;
}
}
...
...
src/msg.hpp
View file @
7ce68da2
...
@@ -34,6 +34,7 @@
...
@@ -34,6 +34,7 @@
#include <stdio.h>
#include <stdio.h>
#include "config.hpp"
#include "config.hpp"
#include "err.hpp"
#include "fd.hpp"
#include "fd.hpp"
#include "atomic_counter.hpp"
#include "atomic_counter.hpp"
#include "metadata.hpp"
#include "metadata.hpp"
...
@@ -246,6 +247,20 @@ namespace zmq
...
@@ -246,6 +247,20 @@ namespace zmq
}
u
;
}
u
;
};
};
inline
int
send_failure
(
zmq
::
msg_t
*
msg
)
{
const
int
rc
=
msg
->
close
();
errno_assert
(
rc
==
0
);
return
-
1
;
}
inline
int
send_failure
(
zmq
::
msg_t
msg
[],
int
count
)
{
for
(
int
i
=
0
;
i
<
count
;
i
++
)
send_failure
(
&
msg
[
i
]);
return
-
1
;
}
}
}
#endif
#endif
src/null_mechanism.cpp
View file @
7ce68da2
...
@@ -225,7 +225,7 @@ int zmq::null_mechanism_t::send_zap_request ()
...
@@ -225,7 +225,7 @@ int zmq::null_mechanism_t::send_zap_request ()
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Version frame
// Version frame
rc
=
msg
.
init_size
(
3
);
rc
=
msg
.
init_size
(
3
);
...
@@ -234,7 +234,7 @@ int zmq::null_mechanism_t::send_zap_request ()
...
@@ -234,7 +234,7 @@ int zmq::null_mechanism_t::send_zap_request ()
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Request id frame
// Request id frame
rc
=
msg
.
init_size
(
1
);
rc
=
msg
.
init_size
(
1
);
...
@@ -243,7 +243,7 @@ int zmq::null_mechanism_t::send_zap_request ()
...
@@ -243,7 +243,7 @@ int zmq::null_mechanism_t::send_zap_request ()
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Domain frame
// Domain frame
rc
=
msg
.
init_size
(
options
.
zap_domain
.
length
());
rc
=
msg
.
init_size
(
options
.
zap_domain
.
length
());
...
@@ -252,7 +252,7 @@ int zmq::null_mechanism_t::send_zap_request ()
...
@@ -252,7 +252,7 @@ int zmq::null_mechanism_t::send_zap_request ()
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Address frame
// Address frame
rc
=
msg
.
init_size
(
peer_address
.
length
());
rc
=
msg
.
init_size
(
peer_address
.
length
());
...
@@ -261,7 +261,7 @@ int zmq::null_mechanism_t::send_zap_request ()
...
@@ -261,7 +261,7 @@ int zmq::null_mechanism_t::send_zap_request ()
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Identity frame
// Identity frame
rc
=
msg
.
init_size
(
options
.
identity_size
);
rc
=
msg
.
init_size
(
options
.
identity_size
);
...
@@ -270,7 +270,7 @@ int zmq::null_mechanism_t::send_zap_request ()
...
@@ -270,7 +270,7 @@ int zmq::null_mechanism_t::send_zap_request ()
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Mechanism frame
// Mechanism frame
rc
=
msg
.
init_size
(
4
);
rc
=
msg
.
init_size
(
4
);
...
@@ -278,7 +278,7 @@ int zmq::null_mechanism_t::send_zap_request ()
...
@@ -278,7 +278,7 @@ int zmq::null_mechanism_t::send_zap_request ()
memcpy
(
msg
.
data
(),
"NULL"
,
4
);
memcpy
(
msg
.
data
(),
"NULL"
,
4
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
return
0
;
return
0
;
}
}
...
@@ -297,26 +297,21 @@ int zmq::null_mechanism_t::receive_and_process_zap_reply ()
...
@@ -297,26 +297,21 @@ int zmq::null_mechanism_t::receive_and_process_zap_reply ()
for
(
int
i
=
0
;
i
<
7
;
i
++
)
{
for
(
int
i
=
0
;
i
<
7
;
i
++
)
{
rc
=
session
->
read_zap_msg
(
&
msg
[
i
]);
rc
=
session
->
read_zap_msg
(
&
msg
[
i
]);
if
(
rc
==
-
1
)
if
(
rc
==
-
1
)
break
;
return
send_failure
(
msg
)
;
if
((
msg
[
i
].
flags
()
&
msg_t
::
more
)
==
(
i
<
6
?
0
:
msg_t
::
more
))
{
if
((
msg
[
i
].
flags
()
&
msg_t
::
more
)
==
(
i
<
6
?
0
:
msg_t
::
more
))
{
// Temporary support for security debugging
// Temporary support for security debugging
puts
(
"NULL I: ZAP handler sent incomplete reply message"
);
puts
(
"NULL I: ZAP handler sent incomplete reply message"
);
errno
=
EPROTO
;
errno
=
EPROTO
;
rc
=
-
1
;
return
send_failure
(
msg
);
break
;
}
}
}
}
if
(
rc
!=
0
)
goto
error
;
// Address delimiter frame
// Address delimiter frame
if
(
msg
[
0
].
size
()
>
0
)
{
if
(
msg
[
0
].
size
()
>
0
)
{
// Temporary support for security debugging
// Temporary support for security debugging
puts
(
"NULL I: ZAP handler sent malformed reply message"
);
puts
(
"NULL I: ZAP handler sent malformed reply message"
);
errno
=
EPROTO
;
errno
=
EPROTO
;
rc
=
-
1
;
return
send_failure
(
msg
);
goto
error
;
}
}
// Version frame
// Version frame
...
@@ -324,8 +319,7 @@ int zmq::null_mechanism_t::receive_and_process_zap_reply ()
...
@@ -324,8 +319,7 @@ int zmq::null_mechanism_t::receive_and_process_zap_reply ()
// Temporary support for security debugging
// Temporary support for security debugging
puts
(
"NULL I: ZAP handler sent bad version number"
);
puts
(
"NULL I: ZAP handler sent bad version number"
);
errno
=
EPROTO
;
errno
=
EPROTO
;
rc
=
-
1
;
return
send_failure
(
msg
);
goto
error
;
}
}
// Request id frame
// Request id frame
...
@@ -333,8 +327,7 @@ int zmq::null_mechanism_t::receive_and_process_zap_reply ()
...
@@ -333,8 +327,7 @@ int zmq::null_mechanism_t::receive_and_process_zap_reply ()
// Temporary support for security debugging
// Temporary support for security debugging
puts
(
"NULL I: ZAP handler sent bad request ID"
);
puts
(
"NULL I: ZAP handler sent bad request ID"
);
errno
=
EPROTO
;
errno
=
EPROTO
;
rc
=
-
1
;
return
send_failure
(
msg
);
goto
error
;
}
}
// Status code frame
// Status code frame
...
@@ -342,8 +335,7 @@ int zmq::null_mechanism_t::receive_and_process_zap_reply ()
...
@@ -342,8 +335,7 @@ int zmq::null_mechanism_t::receive_and_process_zap_reply ()
// Temporary support for security debugging
// Temporary support for security debugging
puts
(
"NULL I: ZAP handler rejected client authentication"
);
puts
(
"NULL I: ZAP handler rejected client authentication"
);
errno
=
EPROTO
;
errno
=
EPROTO
;
rc
=
-
1
;
return
send_failure
(
msg
);
goto
error
;
}
}
// Save status code
// Save status code
...
@@ -356,11 +348,14 @@ int zmq::null_mechanism_t::receive_and_process_zap_reply ()
...
@@ -356,11 +348,14 @@ int zmq::null_mechanism_t::receive_and_process_zap_reply ()
rc
=
parse_metadata
(
static_cast
<
const
unsigned
char
*>
(
msg
[
6
].
data
()),
rc
=
parse_metadata
(
static_cast
<
const
unsigned
char
*>
(
msg
[
6
].
data
()),
msg
[
6
].
size
(),
true
);
msg
[
6
].
size
(),
true
);
error
:
if
(
rc
!=
0
)
return
send_failure
(
msg
);
// Close all reply frames
for
(
int
i
=
0
;
i
<
7
;
i
++
)
{
for
(
int
i
=
0
;
i
<
7
;
i
++
)
{
const
int
rc2
=
msg
[
i
].
close
();
const
int
rc2
=
msg
[
i
].
close
();
errno_assert
(
rc2
==
0
);
errno_assert
(
rc2
==
0
);
}
}
return
rc
;
return
0
;
}
}
src/plain_server.cpp
View file @
7ce68da2
...
@@ -190,7 +190,8 @@ int zmq::plain_server_t::process_hello (msg_t *msg_)
...
@@ -190,7 +190,8 @@ int zmq::plain_server_t::process_hello (msg_t *msg_)
// Use ZAP protocol (RFC 27) to authenticate the user.
// Use ZAP protocol (RFC 27) to authenticate the user.
int
rc
=
session
->
zap_connect
();
int
rc
=
session
->
zap_connect
();
if
(
rc
==
0
)
{
if
(
rc
!=
0
)
return
-
1
;
rc
=
send_zap_request
(
username
,
password
);
rc
=
send_zap_request
(
username
,
password
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
-
1
;
...
@@ -204,9 +205,6 @@ int zmq::plain_server_t::process_hello (msg_t *msg_)
...
@@ -204,9 +205,6 @@ int zmq::plain_server_t::process_hello (msg_t *msg_)
state
=
waiting_for_zap_reply
;
state
=
waiting_for_zap_reply
;
else
else
return
-
1
;
return
-
1
;
}
else
state
=
sending_welcome
;
return
0
;
return
0
;
}
}
...
@@ -291,7 +289,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username,
...
@@ -291,7 +289,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username,
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Version frame
// Version frame
rc
=
msg
.
init_size
(
3
);
rc
=
msg
.
init_size
(
3
);
...
@@ -300,7 +298,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username,
...
@@ -300,7 +298,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username,
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Request id frame
// Request id frame
rc
=
msg
.
init_size
(
1
);
rc
=
msg
.
init_size
(
1
);
...
@@ -309,7 +307,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username,
...
@@ -309,7 +307,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username,
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Domain frame
// Domain frame
rc
=
msg
.
init_size
(
options
.
zap_domain
.
length
());
rc
=
msg
.
init_size
(
options
.
zap_domain
.
length
());
...
@@ -318,7 +316,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username,
...
@@ -318,7 +316,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username,
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Address frame
// Address frame
rc
=
msg
.
init_size
(
peer_address
.
length
());
rc
=
msg
.
init_size
(
peer_address
.
length
());
...
@@ -327,7 +325,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username,
...
@@ -327,7 +325,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username,
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Identity frame
// Identity frame
rc
=
msg
.
init_size
(
options
.
identity_size
);
rc
=
msg
.
init_size
(
options
.
identity_size
);
...
@@ -336,7 +334,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username,
...
@@ -336,7 +334,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username,
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Mechanism frame
// Mechanism frame
rc
=
msg
.
init_size
(
5
);
rc
=
msg
.
init_size
(
5
);
...
@@ -345,7 +343,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username,
...
@@ -345,7 +343,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username,
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Username frame
// Username frame
rc
=
msg
.
init_size
(
username
.
length
());
rc
=
msg
.
init_size
(
username
.
length
());
...
@@ -354,7 +352,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username,
...
@@ -354,7 +352,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username,
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
// Password frame
// Password frame
rc
=
msg
.
init_size
(
password
.
length
());
rc
=
msg
.
init_size
(
password
.
length
());
...
@@ -362,7 +360,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username,
...
@@ -362,7 +360,7 @@ int zmq::plain_server_t::send_zap_request (const std::string &username,
memcpy
(
msg
.
data
(),
password
.
c_str
(),
password
.
length
());
memcpy
(
msg
.
data
(),
password
.
c_str
(),
password
.
length
());
rc
=
session
->
write_zap_msg
(
&
msg
);
rc
=
session
->
write_zap_msg
(
&
msg
);
if
(
rc
!=
0
)
if
(
rc
!=
0
)
return
-
1
;
return
send_failure
(
&
msg
)
;
return
0
;
return
0
;
}
}
...
@@ -381,26 +379,21 @@ int zmq::plain_server_t::receive_and_process_zap_reply ()
...
@@ -381,26 +379,21 @@ int zmq::plain_server_t::receive_and_process_zap_reply ()
for
(
int
i
=
0
;
i
<
7
;
i
++
)
{
for
(
int
i
=
0
;
i
<
7
;
i
++
)
{
rc
=
session
->
read_zap_msg
(
&
msg
[
i
]);
rc
=
session
->
read_zap_msg
(
&
msg
[
i
]);
if
(
rc
==
-
1
)
if
(
rc
==
-
1
)
break
;
return
send_failure
(
msg
)
;
if
((
msg
[
i
].
flags
()
&
msg_t
::
more
)
==
(
i
<
6
?
0
:
msg_t
::
more
))
{
if
((
msg
[
i
].
flags
()
&
msg_t
::
more
)
==
(
i
<
6
?
0
:
msg_t
::
more
))
{
// Temporary support for security debugging
// Temporary support for security debugging
puts
(
"PLAIN I: ZAP handler sent incomplete reply message"
);
puts
(
"PLAIN I: ZAP handler sent incomplete reply message"
);
errno
=
EPROTO
;
errno
=
EPROTO
;
rc
=
-
1
;
return
send_failure
(
msg
);
break
;
}
}
}
}
if
(
rc
!=
0
)
goto
error
;
// Address delimiter frame
// Address delimiter frame
if
(
msg
[
0
].
size
()
>
0
)
{
if
(
msg
[
0
].
size
()
>
0
)
{
// Temporary support for security debugging
// Temporary support for security debugging
puts
(
"PLAIN I: ZAP handler sent malformed reply message"
);
puts
(
"PLAIN I: ZAP handler sent malformed reply message"
);
errno
=
EPROTO
;
errno
=
EPROTO
;
rc
=
-
1
;
return
send_failure
(
msg
);
goto
error
;
}
}
// Version frame
// Version frame
...
@@ -408,17 +401,15 @@ int zmq::plain_server_t::receive_and_process_zap_reply ()
...
@@ -408,17 +401,15 @@ int zmq::plain_server_t::receive_and_process_zap_reply ()
// Temporary support for security debugging
// Temporary support for security debugging
puts
(
"PLAIN I: ZAP handler sent bad version number"
);
puts
(
"PLAIN I: ZAP handler sent bad version number"
);
errno
=
EPROTO
;
errno
=
EPROTO
;
rc
=
-
1
;
return
send_failure
(
msg
);
goto
error
;
}
}
// Request id frame
// Request id frame
if
(
msg
[
2
].
size
()
!=
1
||
memcmp
(
msg
[
2
].
data
(),
"1"
,
1
))
{
if
(
msg
[
2
].
size
()
!=
1
||
memcmp
(
msg
[
2
].
data
(),
"1"
,
1
))
{
// Temporary support for security debugging
// Temporary support for security debugging
puts
(
"PLAIN I: ZAP handler sent bad request ID"
);
puts
(
"PLAIN I: ZAP handler sent bad request ID"
);
rc
=
-
1
;
errno
=
EPROTO
;
errno
=
EPROTO
;
goto
error
;
return
send_failure
(
msg
)
;
}
}
// Status code frame
// Status code frame
...
@@ -426,8 +417,7 @@ int zmq::plain_server_t::receive_and_process_zap_reply ()
...
@@ -426,8 +417,7 @@ int zmq::plain_server_t::receive_and_process_zap_reply ()
// Temporary support for security debugging
// Temporary support for security debugging
puts
(
"PLAIN I: ZAP handler rejected client authentication"
);
puts
(
"PLAIN I: ZAP handler rejected client authentication"
);
errno
=
EACCES
;
errno
=
EACCES
;
rc
=
-
1
;
return
send_failure
(
msg
);
goto
error
;
}
}
// Save status code
// Save status code
...
@@ -440,11 +430,14 @@ int zmq::plain_server_t::receive_and_process_zap_reply ()
...
@@ -440,11 +430,14 @@ int zmq::plain_server_t::receive_and_process_zap_reply ()
rc
=
parse_metadata
(
static_cast
<
const
unsigned
char
*>
(
msg
[
6
].
data
()),
rc
=
parse_metadata
(
static_cast
<
const
unsigned
char
*>
(
msg
[
6
].
data
()),
msg
[
6
].
size
(),
true
);
msg
[
6
].
size
(),
true
);
error
:
if
(
rc
!=
0
)
return
send_failure
(
msg
);
// Close all reply frames
for
(
int
i
=
0
;
i
<
7
;
i
++
)
{
for
(
int
i
=
0
;
i
<
7
;
i
++
)
{
const
int
rc2
=
msg
[
i
].
close
();
const
int
rc2
=
msg
[
i
].
close
();
errno_assert
(
rc2
==
0
);
errno_assert
(
rc2
==
0
);
}
}
return
rc
;
return
0
;
}
}
src/session_base.cpp
View file @
7ce68da2
...
@@ -353,9 +353,10 @@ int zmq::session_base_t::zap_connect ()
...
@@ -353,9 +353,10 @@ int zmq::session_base_t::zap_connect ()
rc
=
id
.
init
();
rc
=
id
.
init
();
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
id
.
set_flags
(
msg_t
::
identity
);
id
.
set_flags
(
msg_t
::
identity
);
bool
ok
=
zap_pipe
->
write
(
&
id
);
if
(
zap_pipe
->
write
(
&
id
))
zmq_assert
(
ok
);
zap_pipe
->
flush
();
zap_pipe
->
flush
();
else
return
-
1
;
}
}
return
0
;
return
0
;
...
...
tests/test_heartbeats.cpp
View file @
7ce68da2
...
@@ -23,8 +23,10 @@
...
@@ -23,8 +23,10 @@
# include <ws2tcpip.h>
# include <ws2tcpip.h>
# include <stdexcept>
# include <stdexcept>
# define close closesocket
# define close closesocket
typedef
SOCKET
raw_socket
;
#else
#else
# include <arpa/inet.h>
# include <arpa/inet.h>
typedef
int
raw_socket
;
#endif
#endif
// Read one event off the monitor socket; return value and address
// Read one event off the monitor socket; return value and address
...
@@ -34,10 +36,11 @@
...
@@ -34,10 +36,11 @@
static
int
static
int
get_monitor_event
(
void
*
monitor
)
get_monitor_event
(
void
*
monitor
)
{
{
for
(
int
i
=
0
;
i
<
2
;
i
++
)
{
for
(
int
i
=
0
;
i
<
2
;
i
++
)
{
// First frame in message contains event number and value
// First frame in message contains event number and value
zmq_msg_t
msg
;
zmq_msg_t
msg
;
zmq_msg_init
(
&
msg
);
int
rc
=
zmq_msg_init
(
&
msg
);
assert
(
rc
==
0
);
if
(
zmq_msg_recv
(
&
msg
,
monitor
,
ZMQ_DONTWAIT
)
==
-
1
)
{
if
(
zmq_msg_recv
(
&
msg
,
monitor
,
ZMQ_DONTWAIT
)
==
-
1
)
{
msleep
(
SETTLE_TIME
);
msleep
(
SETTLE_TIME
);
continue
;
// Interruped, presumably
continue
;
// Interruped, presumably
...
@@ -48,7 +51,8 @@ get_monitor_event (void *monitor)
...
@@ -48,7 +51,8 @@ get_monitor_event (void *monitor)
uint16_t
event
=
*
(
uint16_t
*
)
(
data
);
uint16_t
event
=
*
(
uint16_t
*
)
(
data
);
// Second frame in message contains event address
// Second frame in message contains event address
zmq_msg_init
(
&
msg
);
rc
=
zmq_msg_init
(
&
msg
);
assert
(
rc
==
0
);
if
(
zmq_msg_recv
(
&
msg
,
monitor
,
0
)
==
-
1
)
{
if
(
zmq_msg_recv
(
&
msg
,
monitor
,
0
)
==
-
1
)
{
return
-
1
;
// Interruped, presumably
return
-
1
;
// Interruped, presumably
}
}
...
@@ -60,7 +64,7 @@ get_monitor_event (void *monitor)
...
@@ -60,7 +64,7 @@ get_monitor_event (void *monitor)
}
}
static
void
static
void
recv_with_retry
(
in
t
fd
,
char
*
buffer
,
int
bytes
)
{
recv_with_retry
(
raw_socke
t
fd
,
char
*
buffer
,
int
bytes
)
{
int
received
=
0
;
int
received
=
0
;
while
(
true
)
{
while
(
true
)
{
int
rc
=
recv
(
fd
,
buffer
+
received
,
bytes
-
received
,
0
);
int
rc
=
recv
(
fd
,
buffer
+
received
,
bytes
-
received
,
0
);
...
@@ -72,18 +76,18 @@ recv_with_retry (int fd, char *buffer, int bytes) {
...
@@ -72,18 +76,18 @@ recv_with_retry (int fd, char *buffer, int bytes) {
}
}
static
void
static
void
mock_handshake
(
in
t
fd
)
{
mock_handshake
(
raw_socke
t
fd
)
{
const
uint8_t
zmtp_greeting
[
33
]
=
{
0xff
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0x7f
,
3
,
0
,
'N'
,
'U'
,
'L'
,
'L'
,
0
};
const
uint8_t
zmtp_greeting
[
33
]
=
{
0xff
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0
,
0x7f
,
3
,
0
,
'N'
,
'U'
,
'L'
,
'L'
,
0
};
char
buffer
[
128
];
char
buffer
[
128
];
memset
(
buffer
,
0
,
sizeof
(
buffer
));
memset
(
buffer
,
0
,
sizeof
(
buffer
));
memcpy
(
buffer
,
zmtp_greeting
,
sizeof
(
zmtp_greeting
));
memcpy
(
buffer
,
zmtp_greeting
,
sizeof
(
zmtp_greeting
));
int
rc
=
send
(
fd
,
buffer
,
64
,
0
);
int
rc
=
send
(
fd
,
buffer
,
64
,
0
);
assert
(
rc
==
64
);
assert
(
rc
==
64
);
recv_with_retry
(
fd
,
buffer
,
64
);
recv_with_retry
(
fd
,
buffer
,
64
);
const
uint8_t
zmtp_ready
[
43
]
=
{
const
uint8_t
zmtp_ready
[
43
]
=
{
4
,
41
,
5
,
'R'
,
'E'
,
'A'
,
'D'
,
'Y'
,
11
,
'S'
,
'o'
,
'c'
,
'k'
,
'e'
,
't'
,
'-'
,
'T'
,
'y'
,
'p'
,
'e'
,
4
,
41
,
5
,
'R'
,
'E'
,
'A'
,
'D'
,
'Y'
,
11
,
'S'
,
'o'
,
'c'
,
'k'
,
'e'
,
't'
,
'-'
,
'T'
,
'y'
,
'p'
,
'e'
,
0
,
0
,
0
,
6
,
'D'
,
'E'
,
'A'
,
'L'
,
'E'
,
'R'
,
8
,
'I'
,
'd'
,
'e'
,
'n'
,
't'
,
'i'
,
't'
,
'y'
,
0
,
0
,
0
,
6
,
'D'
,
'E'
,
'A'
,
'L'
,
'E'
,
'R'
,
8
,
'I'
,
'd'
,
'e'
,
'n'
,
't'
,
'i'
,
't'
,
'y'
,
0
,
0
,
0
,
0
0
,
0
,
0
,
0
...
@@ -92,38 +96,13 @@ mock_handshake (int fd) {
...
@@ -92,38 +96,13 @@ mock_handshake (int fd) {
memset
(
buffer
,
0
,
sizeof
(
buffer
));
memset
(
buffer
,
0
,
sizeof
(
buffer
));
memcpy
(
buffer
,
zmtp_ready
,
43
);
memcpy
(
buffer
,
zmtp_ready
,
43
);
rc
=
send
(
fd
,
buffer
,
43
,
0
);
rc
=
send
(
fd
,
buffer
,
43
,
0
);
assert
(
rc
==
43
);
assert
(
rc
==
43
);
recv_with_retry
(
fd
,
buffer
,
43
);
recv_with_retry
(
fd
,
buffer
,
43
);
}
}
static
void
static
void
setup_curve
(
void
*
socket
,
int
is_server
)
{
prep_server_socket
(
void
*
ctx
,
int
set_heartbeats
,
void
**
server_out
,
void
**
mon_out
)
const
char
*
secret_key
;
const
char
*
public_key
;
const
char
*
server_key
;
if
(
is_server
)
{
secret_key
=
"JTKVSB%%)wK0E.X)V>+}o?pNmC{O&4W4b!Ni{Lh6"
;
public_key
=
"rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7"
;
server_key
=
NULL
;
}
else
{
secret_key
=
"D:)Q[IlAW!ahhC2ac:9*A}h:p?([4%wOTJ%JR%cs"
;
public_key
=
"Yne@$w-vo<fVvi]a<NY6T1ed:M$fCG*[IaLV{hID"
;
server_key
=
"rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7"
;
}
zmq_setsockopt
(
socket
,
ZMQ_CURVE_SECRETKEY
,
secret_key
,
strlen
(
secret_key
));
zmq_setsockopt
(
socket
,
ZMQ_CURVE_PUBLICKEY
,
public_key
,
strlen
(
public_key
));
if
(
is_server
)
zmq_setsockopt
(
socket
,
ZMQ_CURVE_SERVER
,
&
is_server
,
sizeof
(
is_server
));
else
zmq_setsockopt
(
socket
,
ZMQ_CURVE_SERVERKEY
,
server_key
,
strlen
(
server_key
));
}
static
void
prep_server_socket
(
void
*
ctx
,
int
set_heartbeats
,
int
is_curve
,
void
**
server_out
,
void
**
mon_out
)
{
{
int
rc
;
int
rc
;
// We'll be using this socket in raw mode
// We'll be using this socket in raw mode
...
@@ -134,15 +113,12 @@ prep_server_socket(void * ctx, int set_heartbeats, int is_curve, void ** server_
...
@@ -134,15 +113,12 @@ prep_server_socket(void * ctx, int set_heartbeats, int is_curve, void ** server_
rc
=
zmq_setsockopt
(
server
,
ZMQ_LINGER
,
&
value
,
sizeof
(
value
));
rc
=
zmq_setsockopt
(
server
,
ZMQ_LINGER
,
&
value
,
sizeof
(
value
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
if
(
set_heartbeats
)
{
if
(
set_heartbeats
)
{
value
=
50
;
value
=
50
;
rc
=
zmq_setsockopt
(
server
,
ZMQ_HEARTBEAT_IVL
,
&
value
,
sizeof
(
value
));
rc
=
zmq_setsockopt
(
server
,
ZMQ_HEARTBEAT_IVL
,
&
value
,
sizeof
(
value
));
assert
(
rc
==
0
);
assert
(
rc
==
0
);
}
}
if
(
is_curve
)
setup_curve
(
server
,
1
);
rc
=
zmq_bind
(
server
,
"tcp://127.0.0.1:5556"
);
rc
=
zmq_bind
(
server
,
"tcp://127.0.0.1:5556"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
...
@@ -175,10 +151,10 @@ test_heartbeat_timeout (void)
...
@@ -175,10 +151,10 @@ test_heartbeat_timeout (void)
assert
(
ctx
);
assert
(
ctx
);
void
*
server
,
*
server_mon
;
void
*
server
,
*
server_mon
;
prep_server_socket
(
ctx
,
1
,
0
,
&
server
,
&
server_mon
);
prep_server_socket
(
ctx
,
1
,
&
server
,
&
server_mon
);
struct
sockaddr_in
ip4addr
;
struct
sockaddr_in
ip4addr
;
in
t
s
;
raw_socke
t
s
;
ip4addr
.
sin_family
=
AF_INET
;
ip4addr
.
sin_family
=
AF_INET
;
ip4addr
.
sin_port
=
htons
(
5556
);
ip4addr
.
sin_port
=
htons
(
5556
);
...
@@ -193,15 +169,15 @@ test_heartbeat_timeout (void)
...
@@ -193,15 +169,15 @@ test_heartbeat_timeout (void)
assert
(
rc
>
-
1
);
assert
(
rc
>
-
1
);
// Mock a ZMTP 3 client so we can forcibly time out a connection
// Mock a ZMTP 3 client so we can forcibly time out a connection
mock_handshake
(
s
);
mock_handshake
(
s
);
// By now everything should report as connected
// By now everything should report as connected
rc
=
get_monitor_event
(
server_mon
);
rc
=
get_monitor_event
(
server_mon
);
assert
(
rc
==
ZMQ_EVENT_ACCEPTED
);
assert
(
rc
==
ZMQ_EVENT_ACCEPTED
);
// We should have been disconnected
// We should have been disconnected
rc
=
get_monitor_event
(
server_mon
);
rc
=
get_monitor_event
(
server_mon
);
assert
(
rc
==
ZMQ_EVENT_DISCONNECTED
);
assert
(
rc
==
ZMQ_EVENT_DISCONNECTED
);
close
(
s
);
close
(
s
);
...
@@ -230,32 +206,34 @@ test_heartbeat_ttl (void)
...
@@ -230,32 +206,34 @@ test_heartbeat_ttl (void)
assert
(
ctx
);
assert
(
ctx
);
void
*
server
,
*
server_mon
,
*
client
;
void
*
server
,
*
server_mon
,
*
client
;
prep_server_socket
(
ctx
,
0
,
0
,
&
server
,
&
server_mon
);
prep_server_socket
(
ctx
,
0
,
&
server
,
&
server_mon
);
client
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
client
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
assert
(
client
!=
NULL
);
assert
(
client
!=
NULL
);
// Set the heartbeat TTL to 0.1 seconds
// Set the heartbeat TTL to 0.1 seconds
value
=
100
;
value
=
100
;
zmq_setsockopt
(
client
,
ZMQ_HEARTBEAT_TTL
,
&
value
,
sizeof
(
value
));
rc
=
zmq_setsockopt
(
client
,
ZMQ_HEARTBEAT_TTL
,
&
value
,
sizeof
(
value
));
assert
(
rc
==
0
);
// Set the heartbeat interval to much longer than the TTL so that
// Set the heartbeat interval to much longer than the TTL so that
// the socket times out oon the remote side.
// the socket times out oon the remote side.
value
=
250
;
value
=
250
;
zmq_setsockopt
(
client
,
ZMQ_HEARTBEAT_IVL
,
&
value
,
sizeof
(
value
));
rc
=
zmq_setsockopt
(
client
,
ZMQ_HEARTBEAT_IVL
,
&
value
,
sizeof
(
value
));
assert
(
rc
==
0
);
rc
=
zmq_connect
(
client
,
"tcp://localhost:5556"
);
rc
=
zmq_connect
(
client
,
"tcp://localhost:5556"
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
// By now everything should report as connected
// By now everything should report as connected
rc
=
get_monitor_event
(
server_mon
);
rc
=
get_monitor_event
(
server_mon
);
assert
(
rc
==
ZMQ_EVENT_ACCEPTED
);
assert
(
rc
==
ZMQ_EVENT_ACCEPTED
);
msleep
(
SETTLE_TIME
);
msleep
(
SETTLE_TIME
);
// We should have been disconnected
// We should have been disconnected
rc
=
get_monitor_event
(
server_mon
);
rc
=
get_monitor_event
(
server_mon
);
assert
(
rc
==
ZMQ_EVENT_DISCONNECTED
);
assert
(
rc
==
ZMQ_EVENT_DISCONNECTED
);
rc
=
zmq_close
(
server
);
rc
=
zmq_close
(
server
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
...
@@ -274,7 +252,7 @@ test_heartbeat_ttl (void)
...
@@ -274,7 +252,7 @@ test_heartbeat_ttl (void)
// exchanged normally. There should be an accepted event on the server,
// exchanged normally. There should be an accepted event on the server,
// and then no event afterwards.
// and then no event afterwards.
static
void
static
void
test_heartbeat_notimeout
(
int
is_curve
)
test_heartbeat_notimeout
(
void
)
{
{
int
rc
;
int
rc
;
...
@@ -283,23 +261,21 @@ test_heartbeat_notimeout (int is_curve)
...
@@ -283,23 +261,21 @@ test_heartbeat_notimeout (int is_curve)
assert
(
ctx
);
assert
(
ctx
);
void
*
server
,
*
server_mon
;
void
*
server
,
*
server_mon
;
prep_server_socket
(
ctx
,
1
,
is_curve
,
&
server
,
&
server_mon
);
prep_server_socket
(
ctx
,
1
,
&
server
,
&
server_mon
);
void
*
client
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
void
*
client
=
zmq_socket
(
ctx
,
ZMQ_DEALER
);
if
(
is_curve
)
rc
=
zmq_connect
(
client
,
"tcp://127.0.0.1:5556"
);
setup_curve
(
client
,
0
);
rc
=
zmq_connect
(
client
,
"tcp://127.0.0.1:5556"
);
// Give it a sec to connect and handshake
// Give it a sec to connect and handshake
msleep
(
SETTLE_TIME
);
msleep
(
SETTLE_TIME
);
// By now everything should report as connected
// By now everything should report as connected
rc
=
get_monitor_event
(
server_mon
);
rc
=
get_monitor_event
(
server_mon
);
assert
(
rc
==
ZMQ_EVENT_ACCEPTED
);
assert
(
rc
==
ZMQ_EVENT_ACCEPTED
);
// We should still be connected because pings and pongs are happenin'
// We should still be connected because pings and pongs are happenin'
rc
=
get_monitor_event
(
server_mon
);
rc
=
get_monitor_event
(
server_mon
);
assert
(
rc
==
-
1
);
assert
(
rc
==
-
1
);
rc
=
zmq_close
(
client
);
rc
=
zmq_close
(
client
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
...
@@ -316,11 +292,8 @@ test_heartbeat_notimeout (int is_curve)
...
@@ -316,11 +292,8 @@ test_heartbeat_notimeout (int is_curve)
int
main
(
void
)
int
main
(
void
)
{
{
setup_test_environment
();
setup_test_environment
();
test_heartbeat_timeout
();
test_heartbeat_timeout
();
test_heartbeat_ttl
();
test_heartbeat_ttl
();
// Run this test without curve
test_heartbeat_notimeout
();
test_heartbeat_notimeout
(
0
);
// Then rerun it with curve
test_heartbeat_notimeout
(
1
);
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment