Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
fc36c7d2
Unverified
Commit
fc36c7d2
authored
Dec 30, 2018
by
Simon Giesecke
Committed by
GitHub
Dec 30, 2018
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #3343 from bluca/pull_atomic_drop
Problem: race condition in PUSH might break atomicity
parents
b65b523a
9bd9cbbe
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
176 additions
and
3 deletions
+176
-3
lb.cpp
src/lb.cpp
+17
-2
socket_base.cpp
src/socket_base.cpp
+12
-0
test_spec_pushpull.cpp
tests/test_spec_pushpull.cpp
+147
-1
No files found.
src/lb.cpp
View file @
fc36c7d2
/*
/*
Copyright (c) 2007-201
6
Contributors as noted in the AUTHORS file
Copyright (c) 2007-201
8
Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
This file is part of libzmq, the ZeroMQ core engine in C++.
...
@@ -107,9 +107,24 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
...
@@ -107,9 +107,24 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
// Application should handle this as suitable
// Application should handle this as suitable
if
(
_more
)
{
if
(
_more
)
{
_pipes
[
_current
]
->
rollback
();
_pipes
[
_current
]
->
rollback
();
// At this point the pipe is already being deallocated
// and the first N frames are unreachable (_outpipe is
// most likely already NULL so rollback won't actually do
// anything and they can't be un-written to deliver later).
// Return EFAULT to socket_base caller to drop current message
// and any other subsequent frames to avoid them being
// "stuck" and received when a new client reconnects, which
// would break atomicity of multi-part messages (in blocking mode
// socket_base just tries again and again to send the same message)
// Note that given dropping mode returns 0, the user will
// never know that the message could not be delivered, but
// can't really fix it without breaking backward compatibility.
// -2/EAGAIN will make sure socket_base caller does not re-enter
// immediately or after a short sleep in blocking mode.
_dropping
=
(
msg_
->
flags
()
&
msg_t
::
more
)
!=
0
;
_more
=
false
;
_more
=
false
;
errno
=
EAGAIN
;
errno
=
EAGAIN
;
return
-
1
;
return
-
2
;
}
}
_active
--
;
_active
--
;
...
...
src/socket_base.cpp
View file @
fc36c7d2
...
@@ -1113,6 +1113,18 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
...
@@ -1113,6 +1113,18 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
if
(
rc
==
0
)
{
if
(
rc
==
0
)
{
return
0
;
return
0
;
}
}
// Special case for ZMQ_PUSH: -2 means pipe is dead while a
// multi-part send is in progress and can't be recovered, so drop
// silently when in blocking mode to keep backward compatibility.
if
(
unlikely
(
rc
==
-
2
))
{
if
(
!
((
flags_
&
ZMQ_DONTWAIT
)
||
options
.
sndtimeo
==
0
))
{
rc
=
msg_
->
close
();
errno_assert
(
rc
==
0
);
rc
=
msg_
->
init
();
errno_assert
(
rc
==
0
);
return
0
;
}
}
if
(
unlikely
(
errno
!=
EAGAIN
))
{
if
(
unlikely
(
errno
!=
EAGAIN
))
{
return
-
1
;
return
-
1
;
}
}
...
...
tests/test_spec_pushpull.cpp
View file @
fc36c7d2
/*
/*
Copyright (c) 2007-201
7
Contributors as noted in the AUTHORS file
Copyright (c) 2007-201
8
Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
This file is part of libzmq, the ZeroMQ core engine in C++.
...
@@ -240,6 +240,140 @@ void test_destroy_queue_on_disconnect (const char *bind_address_)
...
@@ -240,6 +240,140 @@ void test_destroy_queue_on_disconnect (const char *bind_address_)
test_context_socket_close_zero_linger
(
b
);
test_context_socket_close_zero_linger
(
b
);
}
}
// PUSH and PULL: SHALL either receive or drop multipart messages atomically.
void
test_push_multipart_atomic_drop
(
const
char
*
bind_address_
,
const
bool
block
)
{
int
linger
=
0
;
int
hwm
=
1
;
void
*
push
=
test_context_socket
(
ZMQ_PUSH
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
push
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
push
,
ZMQ_SNDHWM
,
&
hwm
,
sizeof
(
hwm
)));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
push
,
bind_address_
));
size_t
addr_len
=
MAX_SOCKET_STRING
;
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_getsockopt
(
push
,
ZMQ_LAST_ENDPOINT
,
connect_address
,
&
addr_len
));
void
*
pull
=
test_context_socket
(
ZMQ_PULL
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
pull
,
ZMQ_LINGER
,
&
linger
,
sizeof
(
linger
)));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
pull
,
ZMQ_RCVHWM
,
&
hwm
,
sizeof
(
hwm
)));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
pull
,
connect_address
));
// Wait for connections.
msleep
(
SETTLE_TIME
);
int
rc
;
zmq_msg_t
msg_data
;
// A large message is needed to overrun the TCP buffers
const
size_t
len
=
16
*
1024
*
1024
;
size_t
zmq_events_size
=
sizeof
(
int
);
int
zmq_events
;
// Normal case - excercise the queues
send_string_expect_success
(
push
,
"0"
,
ZMQ_SNDMORE
);
send_string_expect_success
(
push
,
"0"
,
ZMQ_SNDMORE
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_msg_init_size
(
&
msg_data
,
len
));
memset
(
zmq_msg_data
(
&
msg_data
),
'a'
,
len
);
TEST_ASSERT_EQUAL_INT
(
len
,
zmq_msg_send
(
&
msg_data
,
push
,
0
));
recv_string_expect_success
(
pull
,
"0"
,
0
);
recv_string_expect_success
(
pull
,
"0"
,
0
);
zmq_msg_init
(
&
msg_data
);
TEST_ASSERT_EQUAL_INT
(
len
,
zmq_msg_recv
(
&
msg_data
,
pull
,
0
));
zmq_msg_close
(
&
msg_data
);
// Fill the HWMs of sender and receiver, one message each
send_string_expect_success
(
push
,
"1"
,
0
);
send_string_expect_success
(
push
,
"2"
,
ZMQ_SNDMORE
);
send_string_expect_success
(
push
,
"2"
,
ZMQ_SNDMORE
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_msg_init_size
(
&
msg_data
,
len
));
memset
(
zmq_msg_data
(
&
msg_data
),
'b'
,
len
);
TEST_ASSERT_EQUAL_INT
(
len
,
zmq_msg_send
(
&
msg_data
,
push
,
0
));
// Disconnect and simulate a poll (doesn't work on Windows) to
// let the commands run and let the pipes start to be deallocated
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_disconnect
(
pull
,
connect_address
));
zmq_getsockopt
(
push
,
ZMQ_EVENTS
,
&
zmq_events
,
&
zmq_events_size
);
zmq_getsockopt
(
pull
,
ZMQ_EVENTS
,
&
zmq_events
,
&
zmq_events_size
);
msleep
(
SETTLE_TIME
);
zmq_getsockopt
(
push
,
ZMQ_EVENTS
,
&
zmq_events
,
&
zmq_events_size
);
zmq_getsockopt
(
pull
,
ZMQ_EVENTS
,
&
zmq_events
,
&
zmq_events_size
);
// Reconnect and immediately push a large message into the pipe,
// if the problem is reproduced the pipe is in the process of being
// terminated but still exists (state term_ack_sent) and had already
// accepted the frame, so with the first frames already gone and
// unreachable only the last is left, and is stuck in the lb.
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
pull
,
connect_address
));
send_string_expect_success
(
push
,
"3"
,
ZMQ_SNDMORE
);
send_string_expect_success
(
push
,
"3"
,
ZMQ_SNDMORE
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_msg_init_size
(
&
msg_data
,
len
));
memset
(
zmq_msg_data
(
&
msg_data
),
'c'
,
len
);
if
(
block
)
{
TEST_ASSERT_EQUAL_INT
(
len
,
zmq_msg_send
(
&
msg_data
,
push
,
ZMQ_SNDMORE
));
}
else
{
rc
=
zmq_msg_send
(
&
msg_data
,
push
,
ZMQ_SNDMORE
|
ZMQ_DONTWAIT
);
// inproc won't fail, much faster to connect/disconnect pipes than TCP
if
(
rc
==
-
1
)
{
// at this point the new pipe is there and it works
send_string_expect_success
(
push
,
"3"
,
ZMQ_SNDMORE
);
send_string_expect_success
(
push
,
"3"
,
ZMQ_SNDMORE
);
TEST_ASSERT_EQUAL_INT
(
len
,
zmq_msg_send
(
&
msg_data
,
push
,
ZMQ_SNDMORE
));
}
}
send_string_expect_success
(
push
,
"3b"
,
0
);
zmq_getsockopt
(
push
,
ZMQ_EVENTS
,
&
zmq_events
,
&
zmq_events_size
);
zmq_getsockopt
(
pull
,
ZMQ_EVENTS
,
&
zmq_events
,
&
zmq_events_size
);
msleep
(
SETTLE_TIME
);
zmq_getsockopt
(
push
,
ZMQ_EVENTS
,
&
zmq_events
,
&
zmq_events_size
);
zmq_getsockopt
(
pull
,
ZMQ_EVENTS
,
&
zmq_events
,
&
zmq_events_size
);
send_string_expect_success
(
push
,
"5"
,
ZMQ_SNDMORE
);
send_string_expect_success
(
push
,
"5"
,
ZMQ_SNDMORE
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_msg_init_size
(
&
msg_data
,
len
));
memset
(
zmq_msg_data
(
&
msg_data
),
'd'
,
len
);
TEST_ASSERT_EQUAL_INT
(
len
,
zmq_msg_send
(
&
msg_data
,
push
,
0
));
// On very slow machines the message will not be lost, as it will
// be sent when the new pipe is already in place, so avoid failing
// and simply carry on as it would be very noisy otherwise.
// Receive both to avoid leaking metadata.
// If only the "5" message is received, the problem is reproduced, and
// without the fix the first message received would be the last large
// frame of "3".
char
buffer
[
2
];
rc
=
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_recv
(
pull
,
buffer
,
sizeof
(
buffer
),
0
));
TEST_ASSERT_EQUAL_INT
(
1
,
rc
);
TEST_ASSERT_TRUE
(
buffer
[
0
]
==
'3'
||
buffer
[
0
]
==
'5'
);
if
(
buffer
[
0
]
==
'3'
)
{
recv_string_expect_success
(
pull
,
"3"
,
0
);
zmq_msg_init
(
&
msg_data
);
TEST_ASSERT_EQUAL_INT
(
len
,
zmq_msg_recv
(
&
msg_data
,
pull
,
0
));
zmq_msg_close
(
&
msg_data
);
recv_string_expect_success
(
pull
,
"3b"
,
0
);
recv_string_expect_success
(
pull
,
"5"
,
0
);
}
recv_string_expect_success
(
pull
,
"5"
,
0
);
zmq_msg_init
(
&
msg_data
);
TEST_ASSERT_EQUAL_INT
(
len
,
zmq_msg_recv
(
&
msg_data
,
pull
,
0
));
zmq_msg_close
(
&
msg_data
);
test_context_socket_close_zero_linger
(
pull
);
test_context_socket_close_zero_linger
(
push
);
}
#define def_test_spec_pushpull(name, bind_address_) \
#define def_test_spec_pushpull(name, bind_address_) \
void test_spec_pushpull_##name##_push_round_robin_out () \
void test_spec_pushpull_##name##_push_round_robin_out () \
{ \
{ \
...
@@ -256,6 +390,14 @@ void test_destroy_queue_on_disconnect (const char *bind_address_)
...
@@ -256,6 +390,14 @@ void test_destroy_queue_on_disconnect (const char *bind_address_)
void test_spec_pushpull_##name##_destroy_queue_on_disconnect () \
void test_spec_pushpull_##name##_destroy_queue_on_disconnect () \
{ \
{ \
test_destroy_queue_on_disconnect (bind_address_); \
test_destroy_queue_on_disconnect (bind_address_); \
} \
void test_spec_pushpull_##name##_push_multipart_atomic_drop_block () \
{ \
test_push_multipart_atomic_drop (bind_address_, true); \
} \
void test_spec_pushpull_##name##_push_multipart_atomic_drop_non_block () \
{ \
test_push_multipart_atomic_drop (bind_address_, false); \
}
}
def_test_spec_pushpull
(
inproc
,
"inproc://a"
)
def_test_spec_pushpull
(
inproc
,
"inproc://a"
)
...
@@ -276,5 +418,9 @@ def_test_spec_pushpull (inproc, "inproc://a")
...
@@ -276,5 +418,9 @@ def_test_spec_pushpull (inproc, "inproc://a")
// TODO Tests disabled until libzmq does this properly
// TODO Tests disabled until libzmq does this properly
//RUN_TEST (test_spec_pushpull_inproc_destroy_queue_on_disconnect);
//RUN_TEST (test_spec_pushpull_inproc_destroy_queue_on_disconnect);
//RUN_TEST (test_spec_pushpull_tcp_destroy_queue_on_disconnect);
//RUN_TEST (test_spec_pushpull_tcp_destroy_queue_on_disconnect);
RUN_TEST
(
test_spec_pushpull_inproc_push_multipart_atomic_drop_block
);
RUN_TEST
(
test_spec_pushpull_inproc_push_multipart_atomic_drop_non_block
);
RUN_TEST
(
test_spec_pushpull_tcp_push_multipart_atomic_drop_block
);
RUN_TEST
(
test_spec_pushpull_tcp_push_multipart_atomic_drop_non_block
);
return
UNITY_END
();
return
UNITY_END
();
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment