Commit 0bc669b1 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #1000 from daveab/feature/divide_by_zero_investigation

Feature/divide by zero investigation
parents 2db7cdc6 2a84d259
...@@ -220,6 +220,12 @@ ZMQ_EXPORT int zmq_msg_get (zmq_msg_t *msg, int property); ...@@ -220,6 +220,12 @@ ZMQ_EXPORT int zmq_msg_get (zmq_msg_t *msg, int property);
ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int property, int optval); ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int property, int optval);
ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property); ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
// DAB - these are millisecond sleeps to bias data races
extern ZMQ_EXPORT int zmq_lb_race_window_1_size;
extern ZMQ_EXPORT int zmq_lb_race_window_2_size;
extern ZMQ_EXPORT int zmq_fq_race_window_1_size;
extern ZMQ_EXPORT int zmq_fq_race_window_2_size;
/******************************************************************************/ /******************************************************************************/
/* 0MQ socket definition. */ /* 0MQ socket definition. */
......
...@@ -16,18 +16,36 @@ ...@@ -16,18 +16,36 @@
You should have received a copy of the GNU Lesser General Public License You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <stdio.h>
#include "fq.hpp" #include "fq.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp" #include "err.hpp"
#include "msg.hpp" #include "msg.hpp"
#ifdef ZMQ_HAVE_WINDOWS
# define msleep(milliseconds) {if(milliseconds) Sleep (milliseconds);}
#else
# include <unistd.h>
# define msleep(milliseconds) {if(milliseconds) usleep (static_cast <useconds_t> (milliseconds) * 1000);}
#endif
#define DB_TRACE(tag) int my_seq = ++seq; \
pthread_t self = pthread_self(); \
fprintf(stderr, "=> %12.12s thread=%lu this=%p seq=%d active=%lu\n", tag, self, (void *)this, my_seq, active) ; \
#define DB_TRACE_EXIT(tag) fprintf(stderr, "<= %12.12s thread=%lu this=%p seq=%d active=%lu\n", tag, self, (void *)this, my_seq, active) ; \
int zmq_fq_race_window_1_size = 0 ;
int zmq_fq_race_window_2_size = 0 ;
static int seq = 0 ;
zmq::fq_t::fq_t () : zmq::fq_t::fq_t () :
active (0), active (0),
last_in (NULL), last_in (NULL),
current (0), current (0),
more (false) more (false)
{ {
DB_TRACE("fq_cons");
} }
zmq::fq_t::~fq_t () zmq::fq_t::~fq_t ()
...@@ -44,6 +62,7 @@ void zmq::fq_t::attach (pipe_t *pipe_) ...@@ -44,6 +62,7 @@ void zmq::fq_t::attach (pipe_t *pipe_)
void zmq::fq_t::pipe_terminated (pipe_t *pipe_) void zmq::fq_t::pipe_terminated (pipe_t *pipe_)
{ {
DB_TRACE("fq_term") ;
const pipes_t::size_type index = pipes.index (pipe_); const pipes_t::size_type index = pipes.index (pipe_);
// Remove the pipe from the list; adjust number of active pipes // Remove the pipe from the list; adjust number of active pipes
...@@ -60,6 +79,7 @@ void zmq::fq_t::pipe_terminated (pipe_t *pipe_) ...@@ -60,6 +79,7 @@ void zmq::fq_t::pipe_terminated (pipe_t *pipe_)
saved_credential = last_in->get_credential (); saved_credential = last_in->get_credential ();
last_in = NULL; last_in = NULL;
} }
DB_TRACE_EXIT("fq_term") ;
} }
void zmq::fq_t::activated (pipe_t *pipe_) void zmq::fq_t::activated (pipe_t *pipe_)
...@@ -76,17 +96,22 @@ int zmq::fq_t::recv (msg_t *msg_) ...@@ -76,17 +96,22 @@ int zmq::fq_t::recv (msg_t *msg_)
int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_) int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
{ {
DB_TRACE("fq_recvpipe");
// Deallocate old content of the message. // Deallocate old content of the message.
int rc = msg_->close (); int rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
// Round-robin over the pipes to get the next message. // Round-robin over the pipes to get the next message.
while (active > 0) { while (active > 0) {
// DAB - bias the race to provoke problems with read
msleep(zmq_fq_race_window_1_size) ;
// Try to fetch new message. If we've already read part of the message // Try to fetch new message. If we've already read part of the message
// subsequent part should be immediately available. // subsequent part should be immediately available.
bool fetched = pipes [current]->read (msg_); bool fetched = pipes [current]->read (msg_);
// DAB - bias the race to provoke problems with %
msleep(zmq_fq_race_window_2_size) ;
// Note that when message is not fetched, current pipe is deactivated // Note that when message is not fetched, current pipe is deactivated
// and replaced by another active pipe. Thus we don't have to increase // and replaced by another active pipe. Thus we don't have to increase
// the 'current' pointer. // the 'current' pointer.
...@@ -98,6 +123,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_) ...@@ -98,6 +123,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
last_in = pipes [current]; last_in = pipes [current];
current = (current + 1) % active; current = (current + 1) % active;
} }
DB_TRACE_EXIT("fq_recvpipe");
return 0; return 0;
} }
...@@ -117,6 +143,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_) ...@@ -117,6 +143,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
rc = msg_->init (); rc = msg_->init ();
errno_assert (rc == 0); errno_assert (rc == 0);
errno = EAGAIN; errno = EAGAIN;
DB_TRACE_EXIT("fq_recvpipe");
return -1; return -1;
} }
......
...@@ -16,18 +16,29 @@ ...@@ -16,18 +16,29 @@
You should have received a copy of the GNU Lesser General Public License You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <stdio.h>
#include "lb.hpp" #include "lb.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp" #include "err.hpp"
#include "msg.hpp" #include "msg.hpp"
#define DB_TRACE(tag) int my_seq = ++seq; \
pthread_t self = pthread_self(); \
fprintf(stderr, "=> %12.12s thread=%lu this=%p seq=%d active=%lu\n", tag, self, (void *)this, my_seq, active) ; \
#define DB_TRACE_EXIT(tag) fprintf(stderr, "<= %12.12s thread=%lu this=%p seq=%d active=%lu\n", tag, self, (void *)this, my_seq, active) ; \
int zmq_lb_race_window_1_size = 0 ;
int zmq_lb_race_window_2_size = 0 ;
static int seq = 0 ;
zmq::lb_t::lb_t () : zmq::lb_t::lb_t () :
active (0), active (0),
current (0), current (0),
more (false), more (false),
dropping (false) dropping (false)
{ {
DB_TRACE("lb_cons") ;
} }
zmq::lb_t::~lb_t () zmq::lb_t::~lb_t ()
...@@ -43,6 +54,7 @@ void zmq::lb_t::attach (pipe_t *pipe_) ...@@ -43,6 +54,7 @@ void zmq::lb_t::attach (pipe_t *pipe_)
void zmq::lb_t::pipe_terminated (pipe_t *pipe_) void zmq::lb_t::pipe_terminated (pipe_t *pipe_)
{ {
DB_TRACE("lb_term") ;
pipes_t::size_type index = pipes.index (pipe_); pipes_t::size_type index = pipes.index (pipe_);
// If we are in the middle of multipart message and current pipe // If we are in the middle of multipart message and current pipe
...@@ -59,6 +71,7 @@ void zmq::lb_t::pipe_terminated (pipe_t *pipe_) ...@@ -59,6 +71,7 @@ void zmq::lb_t::pipe_terminated (pipe_t *pipe_)
current = 0; current = 0;
} }
pipes.erase (pipe_); pipes.erase (pipe_);
DB_TRACE_EXIT("lb_term") ;
} }
void zmq::lb_t::activated (pipe_t *pipe_) void zmq::lb_t::activated (pipe_t *pipe_)
...@@ -73,8 +86,16 @@ int zmq::lb_t::send (msg_t *msg_) ...@@ -73,8 +86,16 @@ int zmq::lb_t::send (msg_t *msg_)
return sendpipe (msg_, NULL); return sendpipe (msg_, NULL);
} }
#ifdef ZMQ_HAVE_WINDOWS
# define msleep(milliseconds) Sleep (milliseconds);
#else
# include <unistd.h>
# define msleep(milliseconds) usleep (static_cast <useconds_t> (milliseconds) * 1000);
#endif
int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_) int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
{ {
DB_TRACE("lb_sendpipe") ;
// Drop the message if required. If we are at the end of the message // Drop the message if required. If we are at the end of the message
// switch back to non-dropping mode. // switch back to non-dropping mode.
if (dropping) { if (dropping) {
...@@ -90,13 +111,20 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_) ...@@ -90,13 +111,20 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
} }
while (active > 0) { while (active > 0) {
// DAB - bias the race to provoke problems with write
msleep(zmq_lb_race_window_1_size ) ;
if (pipes [current]->write (msg_)) if (pipes [current]->write (msg_))
{ {
if (pipe_) if (pipe_)
*pipe_ = pipes [current]; *pipe_ = pipes [current];
break; break;
} }
if (!(!more))
{
DB_TRACE("lb_assert");
fflush(stderr) ;
}
zmq_assert (!more); zmq_assert (!more);
active--; active--;
if (current < active) if (current < active)
...@@ -108,12 +136,17 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_) ...@@ -108,12 +136,17 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
// If there are no pipes we cannot send the message. // If there are no pipes we cannot send the message.
if (active == 0) { if (active == 0) {
errno = EAGAIN; errno = EAGAIN;
DB_TRACE_EXIT("lb_sendpipe") ;
return -1; return -1;
} }
// If it's final part of the message we can flush it downstream and // If it's final part of the message we can flush it downstream and
// continue round-robining (load balance). // continue round-robining (load balance).
more = msg_->flags () & msg_t::more? true: false; more = msg_->flags () & msg_t::more? true: false;
// DAB - bias the race to provoke problems with %
msleep(zmq_lb_race_window_2_size) ;
if (!more) { if (!more) {
pipes [current]->flush (); pipes [current]->flush ();
current = (current + 1) % active; current = (current + 1) % active;
...@@ -123,6 +156,7 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_) ...@@ -123,6 +156,7 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
int rc = msg_->init (); int rc = msg_->init ();
errno_assert (rc == 0); errno_assert (rc == 0);
DB_TRACE_EXIT("lb_sendpipe") ;
return 0; return 0;
} }
......
...@@ -256,6 +256,8 @@ int main (void) ...@@ -256,6 +256,8 @@ int main (void)
{ {
setup_test_environment(); setup_test_environment();
no_race_tests_please() ;
int count; int count;
// Default values are 1000 on send and 1000 one receive, so 2000 total // Default values are 1000 on send and 1000 one receive, so 2000 total
......
...@@ -222,6 +222,8 @@ int main (void) ...@@ -222,6 +222,8 @@ int main (void)
{ {
setup_test_environment (); setup_test_environment ();
no_race_tests_please() ;
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
// Control socket receives terminate command from main over inproc // Control socket receives terminate command from main over inproc
......
...@@ -263,6 +263,12 @@ void setup_test_environment() ...@@ -263,6 +263,12 @@ void setup_test_environment()
_CrtSetReportFile( _CRT_ASSERT, _CRTDBG_FILE_STDERR ); _CrtSetReportFile( _CRT_ASSERT, _CRTDBG_FILE_STDERR );
# endif # endif
#endif #endif
zmq_lb_race_window_2_size = 1 ;
}
void no_race_tests_please()
{
zmq_lb_race_window_2_size = 0 ;
} }
// Provide portable millisecond sleep // Provide portable millisecond sleep
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment