Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
ded00177
Commit
ded00177
authored
May 02, 2014
by
Pieter Hintjens
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Revert "Add code to investigate data race"
This reverts commit
5e0facda
.
parent
d190325e
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
3 additions
and
70 deletions
+3
-70
zmq.h
include/zmq.h
+0
-6
fq.cpp
src/fq.cpp
+1
-28
lb.cpp
src/lb.cpp
+2
-36
No files found.
include/zmq.h
View file @
ded00177
...
@@ -220,12 +220,6 @@ ZMQ_EXPORT int zmq_msg_get (zmq_msg_t *msg, int property);
...
@@ -220,12 +220,6 @@ ZMQ_EXPORT int zmq_msg_get (zmq_msg_t *msg, int property);
ZMQ_EXPORT
int
zmq_msg_set
(
zmq_msg_t
*
msg
,
int
property
,
int
optval
);
ZMQ_EXPORT
int
zmq_msg_set
(
zmq_msg_t
*
msg
,
int
property
,
int
optval
);
ZMQ_EXPORT
const
char
*
zmq_msg_gets
(
zmq_msg_t
*
msg
,
const
char
*
property
);
ZMQ_EXPORT
const
char
*
zmq_msg_gets
(
zmq_msg_t
*
msg
,
const
char
*
property
);
// DAB - these are millisecond sleeps to bias data races
extern
ZMQ_EXPORT
int
zmq_lb_race_window_1_size
;
extern
ZMQ_EXPORT
int
zmq_lb_race_window_2_size
;
extern
ZMQ_EXPORT
int
zmq_fq_race_window_1_size
;
extern
ZMQ_EXPORT
int
zmq_fq_race_window_2_size
;
/******************************************************************************/
/******************************************************************************/
/* 0MQ socket definition. */
/* 0MQ socket definition. */
...
...
src/fq.cpp
View file @
ded00177
...
@@ -16,36 +16,18 @@
...
@@ -16,36 +16,18 @@
You should have received a copy of the GNU Lesser General Public License
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include <stdio.h>
#include "fq.hpp"
#include "fq.hpp"
#include "pipe.hpp"
#include "pipe.hpp"
#include "err.hpp"
#include "err.hpp"
#include "msg.hpp"
#include "msg.hpp"
#ifdef ZMQ_HAVE_WINDOWS
# define msleep(milliseconds) {if(milliseconds) Sleep (milliseconds);}
#else
# include <unistd.h>
# define msleep(milliseconds) {if(milliseconds) usleep (static_cast <useconds_t> (milliseconds) * 1000);}
#endif
#define DB_TRACE(tag) int my_seq = ++seq; \
pthread_t self = pthread_self(); \
fprintf(stderr, "=> %12.12s thread=%lu this=%p seq=%d active=%lu\n", tag, self, (void *)this, my_seq, active) ; \
#define DB_TRACE_EXIT(tag) fprintf(stderr, "<= %12.12s thread=%lu this=%p seq=%d active=%lu\n", tag, self, (void *)this, my_seq, active) ; \
int
zmq_fq_race_window_1_size
=
0
;
int
zmq_fq_race_window_2_size
=
0
;
static
int
seq
=
0
;
zmq
::
fq_t
::
fq_t
()
:
zmq
::
fq_t
::
fq_t
()
:
active
(
0
),
active
(
0
),
last_in
(
NULL
),
last_in
(
NULL
),
current
(
0
),
current
(
0
),
more
(
false
)
more
(
false
)
{
{
DB_TRACE
(
"fq_cons"
);
}
}
zmq
::
fq_t
::~
fq_t
()
zmq
::
fq_t
::~
fq_t
()
...
@@ -62,7 +44,6 @@ void zmq::fq_t::attach (pipe_t *pipe_)
...
@@ -62,7 +44,6 @@ void zmq::fq_t::attach (pipe_t *pipe_)
void
zmq
::
fq_t
::
pipe_terminated
(
pipe_t
*
pipe_
)
void
zmq
::
fq_t
::
pipe_terminated
(
pipe_t
*
pipe_
)
{
{
DB_TRACE
(
"fq_term"
)
;
const
pipes_t
::
size_type
index
=
pipes
.
index
(
pipe_
);
const
pipes_t
::
size_type
index
=
pipes
.
index
(
pipe_
);
// Remove the pipe from the list; adjust number of active pipes
// Remove the pipe from the list; adjust number of active pipes
...
@@ -79,7 +60,6 @@ void zmq::fq_t::pipe_terminated (pipe_t *pipe_)
...
@@ -79,7 +60,6 @@ void zmq::fq_t::pipe_terminated (pipe_t *pipe_)
saved_credential
=
last_in
->
get_credential
();
saved_credential
=
last_in
->
get_credential
();
last_in
=
NULL
;
last_in
=
NULL
;
}
}
DB_TRACE_EXIT
(
"fq_term"
)
;
}
}
void
zmq
::
fq_t
::
activated
(
pipe_t
*
pipe_
)
void
zmq
::
fq_t
::
activated
(
pipe_t
*
pipe_
)
...
@@ -96,22 +76,17 @@ int zmq::fq_t::recv (msg_t *msg_)
...
@@ -96,22 +76,17 @@ int zmq::fq_t::recv (msg_t *msg_)
int
zmq
::
fq_t
::
recvpipe
(
msg_t
*
msg_
,
pipe_t
**
pipe_
)
int
zmq
::
fq_t
::
recvpipe
(
msg_t
*
msg_
,
pipe_t
**
pipe_
)
{
{
DB_TRACE
(
"fq_recvpipe"
);
// Deallocate old content of the message.
// Deallocate old content of the message.
int
rc
=
msg_
->
close
();
int
rc
=
msg_
->
close
();
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
// Round-robin over the pipes to get the next message.
// Round-robin over the pipes to get the next message.
while
(
active
>
0
)
{
while
(
active
>
0
)
{
// DAB - bias the race to provoke problems with read
msleep
(
zmq_fq_race_window_1_size
)
;
// Try to fetch new message. If we've already read part of the message
// Try to fetch new message. If we've already read part of the message
// subsequent part should be immediately available.
// subsequent part should be immediately available.
bool
fetched
=
pipes
[
current
]
->
read
(
msg_
);
bool
fetched
=
pipes
[
current
]
->
read
(
msg_
);
// DAB - bias the race to provoke problems with %
msleep
(
zmq_fq_race_window_2_size
)
;
// Note that when message is not fetched, current pipe is deactivated
// Note that when message is not fetched, current pipe is deactivated
// and replaced by another active pipe. Thus we don't have to increase
// and replaced by another active pipe. Thus we don't have to increase
// the 'current' pointer.
// the 'current' pointer.
...
@@ -123,7 +98,6 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
...
@@ -123,7 +98,6 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
last_in
=
pipes
[
current
];
last_in
=
pipes
[
current
];
current
=
(
current
+
1
)
%
active
;
current
=
(
current
+
1
)
%
active
;
}
}
DB_TRACE_EXIT
(
"fq_recvpipe"
);
return
0
;
return
0
;
}
}
...
@@ -143,7 +117,6 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
...
@@ -143,7 +117,6 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
rc
=
msg_
->
init
();
rc
=
msg_
->
init
();
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
errno
=
EAGAIN
;
errno
=
EAGAIN
;
DB_TRACE_EXIT
(
"fq_recvpipe"
);
return
-
1
;
return
-
1
;
}
}
...
...
src/lb.cpp
View file @
ded00177
...
@@ -16,29 +16,18 @@
...
@@ -16,29 +16,18 @@
You should have received a copy of the GNU Lesser General Public License
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include <stdio.h>
#include "lb.hpp"
#include "lb.hpp"
#include "pipe.hpp"
#include "pipe.hpp"
#include "err.hpp"
#include "err.hpp"
#include "msg.hpp"
#include "msg.hpp"
#define DB_TRACE(tag) int my_seq = ++seq; \
pthread_t self = pthread_self(); \
fprintf(stderr, "=> %12.12s thread=%lu this=%p seq=%d active=%lu\n", tag, self, (void *)this, my_seq, active) ; \
#define DB_TRACE_EXIT(tag) fprintf(stderr, "<= %12.12s thread=%lu this=%p seq=%d active=%lu\n", tag, self, (void *)this, my_seq, active) ; \
int
zmq_lb_race_window_1_size
=
0
;
int
zmq_lb_race_window_2_size
=
0
;
static
int
seq
=
0
;
zmq
::
lb_t
::
lb_t
()
:
zmq
::
lb_t
::
lb_t
()
:
active
(
0
),
active
(
0
),
current
(
0
),
current
(
0
),
more
(
false
),
more
(
false
),
dropping
(
false
)
dropping
(
false
)
{
{
DB_TRACE
(
"lb_cons"
)
;
}
}
zmq
::
lb_t
::~
lb_t
()
zmq
::
lb_t
::~
lb_t
()
...
@@ -54,7 +43,6 @@ void zmq::lb_t::attach (pipe_t *pipe_)
...
@@ -54,7 +43,6 @@ void zmq::lb_t::attach (pipe_t *pipe_)
void
zmq
::
lb_t
::
pipe_terminated
(
pipe_t
*
pipe_
)
void
zmq
::
lb_t
::
pipe_terminated
(
pipe_t
*
pipe_
)
{
{
DB_TRACE
(
"lb_term"
)
;
pipes_t
::
size_type
index
=
pipes
.
index
(
pipe_
);
pipes_t
::
size_type
index
=
pipes
.
index
(
pipe_
);
// If we are in the middle of multipart message and current pipe
// If we are in the middle of multipart message and current pipe
...
@@ -71,7 +59,6 @@ void zmq::lb_t::pipe_terminated (pipe_t *pipe_)
...
@@ -71,7 +59,6 @@ void zmq::lb_t::pipe_terminated (pipe_t *pipe_)
current
=
0
;
current
=
0
;
}
}
pipes
.
erase
(
pipe_
);
pipes
.
erase
(
pipe_
);
DB_TRACE_EXIT
(
"lb_term"
)
;
}
}
void
zmq
::
lb_t
::
activated
(
pipe_t
*
pipe_
)
void
zmq
::
lb_t
::
activated
(
pipe_t
*
pipe_
)
...
@@ -86,16 +73,8 @@ int zmq::lb_t::send (msg_t *msg_)
...
@@ -86,16 +73,8 @@ int zmq::lb_t::send (msg_t *msg_)
return
sendpipe
(
msg_
,
NULL
);
return
sendpipe
(
msg_
,
NULL
);
}
}
#ifdef ZMQ_HAVE_WINDOWS
# define msleep(milliseconds) Sleep (milliseconds);
#else
# include <unistd.h>
# define msleep(milliseconds) usleep (static_cast <useconds_t> (milliseconds) * 1000);
#endif
int
zmq
::
lb_t
::
sendpipe
(
msg_t
*
msg_
,
pipe_t
**
pipe_
)
int
zmq
::
lb_t
::
sendpipe
(
msg_t
*
msg_
,
pipe_t
**
pipe_
)
{
{
DB_TRACE
(
"lb_sendpipe"
)
;
// Drop the message if required. If we are at the end of the message
// Drop the message if required. If we are at the end of the message
// switch back to non-dropping mode.
// switch back to non-dropping mode.
if
(
dropping
)
{
if
(
dropping
)
{
...
@@ -111,20 +90,13 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
...
@@ -111,20 +90,13 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
}
}
while
(
active
>
0
)
{
while
(
active
>
0
)
{
// DAB - bias the race to provoke problems with write
msleep
(
zmq_lb_race_window_1_size
)
;
if
(
pipes
[
current
]
->
write
(
msg_
))
if
(
pipes
[
current
]
->
write
(
msg_
))
{
{
if
(
pipe_
)
if
(
pipe_
)
*
pipe_
=
pipes
[
current
];
*
pipe_
=
pipes
[
current
];
break
;
break
;
}
}
if
(
!
(
!
more
))
{
DB_TRACE
(
"lb_assert"
);
fflush
(
stderr
)
;
}
zmq_assert
(
!
more
);
zmq_assert
(
!
more
);
active
--
;
active
--
;
if
(
current
<
active
)
if
(
current
<
active
)
...
@@ -136,17 +108,12 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
...
@@ -136,17 +108,12 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
// If there are no pipes we cannot send the message.
// If there are no pipes we cannot send the message.
if
(
active
==
0
)
{
if
(
active
==
0
)
{
errno
=
EAGAIN
;
errno
=
EAGAIN
;
DB_TRACE_EXIT
(
"lb_sendpipe"
)
;
return
-
1
;
return
-
1
;
}
}
// If it's final part of the message we can flush it downstream and
// If it's final part of the message we can flush it downstream and
// continue round-robining (load balance).
// continue round-robining (load balance).
more
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
more
=
msg_
->
flags
()
&
msg_t
::
more
?
true
:
false
;
// DAB - bias the race to provoke problems with %
msleep
(
zmq_lb_race_window_2_size
)
;
if
(
!
more
)
{
if
(
!
more
)
{
pipes
[
current
]
->
flush
();
pipes
[
current
]
->
flush
();
current
=
(
current
+
1
)
%
active
;
current
=
(
current
+
1
)
%
active
;
...
@@ -156,7 +123,6 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
...
@@ -156,7 +123,6 @@ int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
int
rc
=
msg_
->
init
();
int
rc
=
msg_
->
init
();
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
DB_TRACE_EXIT
(
"lb_sendpipe"
)
;
return
0
;
return
0
;
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment