Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
235ed3a3
Commit
235ed3a3
authored
May 04, 2010
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
signaler transports commands per se rather than one-bit signals
parent
8b9bd057
Hide whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
174 additions
and
329 deletions
+174
-329
app_thread.cpp
src/app_thread.cpp
+10
-10
config.hpp
src/config.hpp
+0
-5
dispatcher.cpp
src/dispatcher.cpp
+28
-43
dispatcher.hpp
src/dispatcher.hpp
+8
-22
io_thread.cpp
src/io_thread.cpp
+8
-9
object.cpp
src/object.cpp
+2
-4
pipe.hpp
src/pipe.hpp
+1
-1
signaler.cpp
src/signaler.cpp
+85
-139
signaler.hpp
src/signaler.hpp
+13
-32
ypipe.hpp
src/ypipe.hpp
+19
-64
No files found.
src/app_thread.cpp
View file @
235ed3a3
...
...
@@ -82,9 +82,12 @@ zmq::signaler_t *zmq::app_thread_t::get_signaler ()
bool
zmq
::
app_thread_t
::
process_commands
(
bool
block_
,
bool
throttle_
)
{
uint32_t
signal
;
if
(
block_
)
signal
=
signaler
.
poll
();
bool
received
;
command_t
cmd
;
if
(
block_
)
{
received
=
signaler
.
recv
(
&
cmd
,
true
);
zmq_assert
(
received
);
}
else
{
#if defined ZMQ_DELAY_COMMANDS
...
...
@@ -117,15 +120,12 @@ bool zmq::app_thread_t::process_commands (bool block_, bool throttle_)
#endif
// Check whether there are any commands pending for this thread.
signal
=
signaler
.
check
(
);
received
=
signaler
.
recv
(
&
cmd
,
false
);
}
// Process all the commands from the signaling source if there is one.
if
(
signal
!=
signaler_t
::
no_signal
)
{
command_t
cmd
;
while
(
get_dispatcher
()
->
read
(
signal
,
get_thread_slot
(),
&
cmd
))
cmd
.
destination
->
process_command
(
cmd
);
}
// Process the command, if any.
if
(
received
)
cmd
.
destination
->
process_command
(
cmd
);
return
!
terminated
;
}
...
...
src/config.hpp
View file @
235ed3a3
...
...
@@ -32,11 +32,6 @@ namespace zmq
// memory allocation by approximately 99.6%
message_pipe_granularity
=
256
,
// Number of new commands in command pipe needed to trigger new memory
// allocation. The number should be kept low to decrease the memory
// footprint of dispatcher.
command_pipe_granularity
=
4
,
// Number of signals that can be read by the signaler
// using a single system call.
signal_buffer_size
=
8
,
...
...
src/dispatcher.cpp
View file @
235ed3a3
...
...
@@ -18,6 +18,7 @@
*/
#include <new>
#include <string.h>
#include "../include/zmq.h"
...
...
@@ -48,35 +49,30 @@ zmq::dispatcher_t::dispatcher_t (uint32_t app_threads_, uint32_t io_threads_) :
HIBYTE
(
wsa_data
.
wVersion
)
==
2
);
#endif
// Create application thread proxies.
for
(
uint32_t
i
=
0
;
i
!=
app_threads_
;
i
++
)
{
app_thread_info_t
info
;
info
.
associated
=
false
;
info
.
app_thread
=
new
(
std
::
nothrow
)
app_thread_t
(
this
,
i
);
zmq_assert
(
info
.
app_thread
);
app_threads
.
push_back
(
info
);
signalers
.
push_back
(
info
.
app_thread
->
get_signaler
());
}
// Initialise the array of signalers.
signalers_count
=
app_threads_
+
io_threads_
;
signalers
=
(
signaler_t
**
)
malloc
(
sizeof
(
signaler_t
*
)
*
signalers_count
);
zmq_assert
(
signalers
);
memset
(
signalers
,
0
,
sizeof
(
signaler_t
*
)
*
signalers_count
);
// Create I/O thread objects.
for
(
uint32_t
i
=
0
;
i
!=
io_threads_
;
i
++
)
{
io_thread_t
*
io_thread
=
new
(
std
::
nothrow
)
io_thread_t
(
this
,
i
+
app_threads_
);
io_thread_t
*
io_thread
=
new
(
std
::
nothrow
)
io_thread_t
(
this
,
i
);
zmq_assert
(
io_thread
);
io_threads
.
push_back
(
io_thread
);
signalers
.
push_back
(
io_thread
->
get_signaler
()
);
signalers
[
i
]
=
io_thread
->
get_signaler
(
);
}
// Create
the administrative thread. Nothing special is needed. NULL
// is used instead of signaler given that as for now, administrative
// thread doesn't receive any commands. The only thing it is used for
// is sending 'stop' command to I/O threads on shutdown.
signalers
.
push_back
(
NULL
);
// Create command pipe matrix.
command_pipes
=
new
(
std
::
nothrow
)
command_pipe_t
[
signalers
.
size
()
*
signalers
.
size
()]
;
zmq_assert
(
command_pipes
);
// Create
application thread proxies.
for
(
uint32_t
i
=
0
;
i
!=
app_threads_
;
i
++
)
{
app_thread_info_t
info
;
info
.
associated
=
false
;
info
.
app_thread
=
new
(
std
::
nothrow
)
app_thread_t
(
this
,
i
+
io_threads_
);
zmq_assert
(
info
.
app_thread
);
app_threads
.
push_back
(
info
);
signalers
[
i
+
io_threads_
]
=
info
.
app_thread
->
get_signaler
()
;
}
// Launch I/O threads.
for
(
uint32_t
i
=
0
;
i
!=
io_threads_
;
i
++
)
...
...
@@ -123,12 +119,11 @@ zmq::dispatcher_t::~dispatcher_t ()
while
(
!
pipes
.
empty
())
delete
*
pipes
.
begin
();
// TODO: Deallocate any commands still in the pipes. Keep in mind that
// simple reading from a pipe and deallocating commands won't do as
// command pipe has template parameter D set to true, meaning that
// read may return false even if there are still commands in the pipe.
delete
[]
command_pipes
;
// Deallocate the array of pointers to signalers. No special work is
// needed as signalers themselves were deallocated with their
// corresponding (app_/io_) thread objects.
free
(
signalers
);
#ifdef ZMQ_HAVE_WINDOWS
// On Windows, uninitialise socket layer.
int
rc
=
WSACleanup
();
...
...
@@ -136,11 +131,6 @@ zmq::dispatcher_t::~dispatcher_t ()
#endif
}
uint32_t
zmq
::
dispatcher_t
::
thread_slot_count
()
{
return
(
uint32_t
)
signalers
.
size
();
}
zmq
::
socket_base_t
*
zmq
::
dispatcher_t
::
create_socket
(
int
type_
)
{
app_threads_sync
.
lock
();
...
...
@@ -213,21 +203,16 @@ void zmq::dispatcher_t::no_sockets (app_thread_t *thread_)
app_threads_sync
.
unlock
();
}
void
zmq
::
dispatcher_t
::
write
(
uint32_t
source_
,
uint32_t
destination_
,
void
zmq
::
dispatcher_t
::
send_command
(
uint32_t
destination_
,
const
command_t
&
command_
)
{
command_pipe_t
&
pipe
=
command_pipes
[
source_
*
signalers
.
size
()
+
destination_
];
pipe
.
write
(
command_
);
if
(
!
pipe
.
flush
())
signalers
[
destination_
]
->
signal
(
source_
);
signalers
[
destination_
]
->
send
(
command_
);
}
bool
zmq
::
dispatcher_t
::
re
ad
(
uint32_t
source_
,
uint32_t
destination
_
,
command_t
*
command_
)
bool
zmq
::
dispatcher_t
::
re
cv_command
(
uint32_t
thread_slot
_
,
command_t
*
command_
,
bool
block_
)
{
return
command_pipes
[
source_
*
signalers
.
size
()
+
destination_
].
read
(
command_
);
return
signalers
[
thread_slot_
]
->
recv
(
command_
,
block_
);
}
zmq
::
io_thread_t
*
zmq
::
dispatcher_t
::
choose_io_thread
(
uint64_t
affinity_
)
...
...
src/dispatcher.hpp
View file @
235ed3a3
...
...
@@ -27,7 +27,6 @@
#include "signaler.hpp"
#include "ypipe.hpp"
#include "command.hpp"
#include "config.hpp"
#include "mutex.hpp"
#include "stdint.hpp"
...
...
@@ -69,19 +68,12 @@ namespace zmq
// should disassociate the object from the current OS thread.
void
no_sockets
(
class
app_thread_t
*
thread_
);
// Returns number of thread slots in the dispatcher. To be used by
// individual threads to find out how many distinct signals can be
// received.
uint32_t
thread_slot_count
();
// Send command to the destination thread.
void
send_command
(
uint32_t
destination_
,
const
command_t
&
command_
);
// Send command from the source to the destination.
void
write
(
uint32_t
source_
,
uint32_t
destination_
,
const
command_t
&
command_
);
// Receive command from the source. Returns false if there is no
// command available.
bool
read
(
uint32_t
source_
,
uint32_t
destination_
,
command_t
*
command_
);
// Receive command from another thread.
bool
recv_command
(
uint32_t
thread_slot_
,
command_t
*
command_
,
bool
block_
);
// Returns the I/O thread that is the least busy at the moment.
// Taskset specifies which I/O threads are eligible (0 = all).
...
...
@@ -126,15 +118,9 @@ namespace zmq
typedef
std
::
vector
<
class
io_thread_t
*>
io_threads_t
;
io_threads_t
io_threads
;
// Signalers for both application and I/O threads.
std
::
vector
<
signaler_t
*>
signalers
;
// Pipe to hold the commands.
typedef
ypipe_t
<
command_t
,
true
,
command_pipe_granularity
>
command_pipe_t
;
// NxN matrix of command pipes.
command_pipe_t
*
command_pipes
;
// Array of pointers to signalers for both application and I/O threads.
int
signalers_count
;
signaler_t
**
signalers
;
// As pipes may reside in orphaned state in particular moments
// of the pipe shutdown process, i.e. neither pipe reader nor
...
...
src/io_thread.cpp
View file @
235ed3a3
...
...
@@ -22,10 +22,8 @@
#include "../include/zmq.h"
#include "io_thread.hpp"
#include "command.hpp"
#include "platform.hpp"
#include "err.hpp"
#include "command.hpp"
#include "dispatcher.hpp"
zmq
::
io_thread_t
::
io_thread_t
(
dispatcher_t
*
dispatcher_
,
...
...
@@ -67,17 +65,18 @@ int zmq::io_thread_t::get_load ()
void
zmq
::
io_thread_t
::
in_event
()
{
// TODO: Do we want to limit number of commands I/O thread can
// process in a single go?
while
(
true
)
{
// Get the next
signal
.
uint32_t
signal
=
signaler
.
check
()
;
if
(
signal
==
signaler_t
::
no_signal
)
// Get the next
command. If there is none, exit
.
command_t
cmd
;
if
(
!
signaler
.
recv
(
&
cmd
,
false
)
)
break
;
// Process all the commands from the thread that sent the signal.
command_t
cmd
;
while
(
get_dispatcher
()
->
read
(
signal
,
get_thread_slot
(),
&
cmd
))
cmd
.
destination
->
process_command
(
cmd
);
// Process the command.
cmd
.
destination
->
process_command
(
cmd
);
}
}
...
...
src/object.cpp
View file @
235ed3a3
...
...
@@ -157,11 +157,10 @@ void zmq::object_t::send_stop ()
{
// 'stop' command goes always from administrative thread to
// the current object.
uint32_t
admin_thread_id
=
dispatcher
->
thread_slot_count
()
-
1
;
command_t
cmd
;
cmd
.
destination
=
this
;
cmd
.
type
=
command_t
::
stop
;
dispatcher
->
write
(
admin_thread_id
,
thread_slot
,
cmd
);
dispatcher
->
send_command
(
thread_slot
,
cmd
);
}
void
zmq
::
object_t
::
send_plug
(
owned_t
*
destination_
,
bool
inc_seqnum_
)
...
...
@@ -370,7 +369,6 @@ void zmq::object_t::process_seqnum ()
void
zmq
::
object_t
::
send_command
(
command_t
&
cmd_
)
{
uint32_t
destination_thread_slot
=
cmd_
.
destination
->
get_thread_slot
();
dispatcher
->
write
(
thread_slot
,
destination_thread_slot
,
cmd_
);
dispatcher
->
send_command
(
cmd_
.
destination
->
get_thread_slot
(),
cmd_
);
}
src/pipe.hpp
View file @
235ed3a3
...
...
@@ -145,7 +145,7 @@ namespace zmq
};
// Message pipe.
class
pipe_t
:
public
ypipe_t
<
zmq_msg_t
,
false
,
message_pipe_granularity
>
class
pipe_t
:
public
ypipe_t
<
zmq_msg_t
,
message_pipe_granularity
>
{
public
:
...
...
src/signaler.cpp
View file @
235ed3a3
...
...
@@ -30,52 +30,9 @@
#else
#include <unistd.h>
#include <fcntl.h>
#include <limits.h>
#endif
const
uint32_t
zmq
::
signaler_t
::
no_signal
=
0xffffffff
;
uint32_t
zmq
::
signaler_t
::
poll
()
{
// Return next signal.
if
(
current
!=
count
)
{
uint32_t
result
=
buffer
[
current
];
current
++
;
return
result
;
}
// If there is no signal buffered, poll for new signals.
xpoll
();
// Return first signal.
zmq_assert
(
current
!=
count
);
uint32_t
result
=
buffer
[
current
];
current
++
;
return
result
;
}
uint32_t
zmq
::
signaler_t
::
check
()
{
// Return next signal.
if
(
current
!=
count
)
{
uint32_t
result
=
buffer
[
current
];
current
++
;
return
result
;
}
// If there is no signal buffered, check whether more signals
// can be obtained.
xcheck
();
// Return first signal if any.
if
(
current
!=
count
)
{
uint32_t
result
=
buffer
[
current
];
current
++
;
return
result
;
}
return
no_signal
;
}
zmq
::
fd_t
zmq
::
signaler_t
::
get_fd
()
{
return
r
;
...
...
@@ -84,8 +41,6 @@ zmq::fd_t zmq::signaler_t::get_fd ()
#if defined ZMQ_HAVE_WINDOWS
zmq
::
signaler_t
::
signaler_t
()
:
current
(
0
),
count
(
0
)
{
// Windows have no 'socketpair' function. CreatePipe is no good as pipe
// handles cannot be polled on. Here we create the socketpair by hand.
...
...
@@ -146,51 +101,49 @@ zmq::signaler_t::~signaler_t ()
wsa_assert
(
rc
!=
SOCKET_ERROR
);
}
void
zmq
::
signaler_t
::
s
ignal
(
uint32_t
signal
_
)
void
zmq
::
signaler_t
::
s
end
(
const
command_t
&
cmd
_
)
{
// TODO: Note that send is a blocking operation.
// How should we behave if the signal cannot be written to the signaler?
int
rc
=
send
(
w
,
(
char
*
)
&
signal_
,
sizeof
(
signal_
),
0
);
// Even worse: What if half of a command is written?
int
rc
=
send
(
w
,
(
char
*
)
&
cmd_
,
sizeof
(
command_t
),
0
);
win_assert
(
rc
!=
SOCKET_ERROR
);
zmq_assert
(
rc
==
sizeof
(
signal_
));
zmq_assert
(
rc
==
sizeof
(
command_t
));
}
void
zmq
::
signaler_t
::
xpoll
(
)
bool
zmq
::
signaler_t
::
recv
(
command_t
*
cmd_
,
bool
block_
)
{
// Switch to blocking mode.
unsigned
long
argp
=
0
;
int
rc
=
ioctlsocket
(
r
,
FIONBIO
,
&
argp
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
if
(
block_
)
{
// Get the signals. Given that we are in the blocking mode now,
// there should be at least a single signal returned.
xcheck
();
zmq_assert
(
current
!=
count
);
// Switch to blocking mode.
unsigned
long
argp
=
0
;
int
rc
=
ioctlsocket
(
r
,
FIONBIO
,
&
argp
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
}
// Switch back to non-blocking mode.
argp
=
1
;
rc
=
ioctlsocket
(
r
,
FIONBIO
,
&
argp
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
}
bool
result
;
int
nbytes
=
recv
(
r
,
(
char
*
)
cmd_
,
sizeof
(
command_t
),
0
);
if
(
nbytes
==
-
1
&&
WSAGetLastError
()
==
WSAEWOULDBLOCK
)
{
result
=
false
;
}
else
{
wsa_assert
(
nbytes
!=
-
1
);
void
zmq
::
signaler_t
::
xcheck
()
{
int
nbytes
=
recv
(
r
,
(
char
*
)
buffer
,
sizeof
(
buffer
),
0
);
// Check whether we haven't got half of a signal.
zmq_assert
(
nbytes
%
sizeof
(
uint32_t
)
==
0
);
// No signals are available.
if
(
nbytes
==
-
1
&&
WSAGetLastError
()
==
WSAEWOULDBLOCK
)
{
current
=
0
;
count
=
0
;
return
;
result
=
true
;
}
wsa_assert
(
nbytes
!=
-
1
);
if
(
block_
)
{
// Check whether we haven't got half of a signal.
zmq_assert
(
nbytes
%
sizeof
(
uint32_t
)
==
0
);
// Switch back to non-blocking mode.
unsigned
long
argp
=
1
;
int
rc
=
ioctlsocket
(
r
,
FIONBIO
,
&
argp
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
}
current
=
0
;
count
=
nbytes
/
sizeof
(
uint32_t
);
return
result
;
}
#elif defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX
...
...
@@ -198,9 +151,7 @@ void zmq::signaler_t::xcheck ()
#include <sys/types.h>
#include <sys/socket.h>
zmq
::
signaler_t
::
signaler_t
()
:
current
(
0
),
count
(
0
)
zmq
::
signaler_t
::
signaler_t
()
{
int
sv
[
2
];
int
rc
=
socketpair
(
AF_UNIX
,
SOCK_STREAM
,
0
,
sv
);
...
...
@@ -222,49 +173,50 @@ zmq::signaler_t::~signaler_t ()
close
(
r
);
}
void
zmq
::
signaler_t
::
s
ignal
(
uint32_t
signal
_
)
void
zmq
::
signaler_t
::
s
end
(
const
command_t
&
cmd
_
)
{
ssize_t
nbytes
=
send
(
w
,
&
signal_
,
sizeof
(
signal_
),
0
);
ssize_t
nbytes
=
send
(
w
,
&
cmd_
,
sizeof
(
command_t
),
0
);
errno_assert
(
nbytes
!=
-
1
);
zmq_assert
(
nbytes
==
sizeof
(
signal_
);
zmq_assert
(
nbytes
==
sizeof
(
command_t
)
);
}
void
zmq
::
signaler_t
::
xpoll
(
)
bool
zmq
::
signaler_t
::
recv
(
command_t
&
cmd_
,
bool
block_
)
{
// Set the reader to blocking mode.
int
flags
=
fcntl
(
r
,
F_GETFL
,
0
);
if
(
flags
==
-
1
)
flags
=
0
;
int
rc
=
fcntl
(
r
,
F_SETFL
,
flags
&
~
O_NONBLOCK
);
errno_assert
(
rc
!=
-
1
);
if
(
block_
)
{
// Set the reader to blocking mode.
int
flags
=
fcntl
(
r
,
F_GETFL
,
0
);
if
(
flags
==
-
1
)
flags
=
0
;
int
rc
=
fcntl
(
r
,
F_SETFL
,
flags
&
~
O_NONBLOCK
);
errno_assert
(
rc
!=
-
1
);
}
// Poll for events.
xcheck
();
zmq_assert
(
current
!=
count
);
bool
result
;
ssize_t
nbytes
=
recv
(
r
,
buffer
,
sizeof
(
command_t
),
0
);
if
(
nbytes
==
-
1
&&
errno
==
EAGAIN
)
{
result
=
false
;
}
else
{
zmq_assert
(
nbytes
!=
-
1
);
// Set the reader to non-blocking mode.
flags
=
fcntl
(
r
,
F_GETFL
,
0
);
if
(
flags
==
-
1
)
flags
=
0
;
rc
=
fcntl
(
r
,
F_SETFL
,
flags
|
O_NONBLOCK
);
errno_assert
(
rc
!=
-
1
);
}
// Check whether we haven't got half of command.
zmq_assert
(
nbytes
==
sizeof
(
command_t
));
void
zmq
::
signaler_t
::
xcheck
()
{
ssize_t
nbytes
=
recv
(
r
,
buffer
,
sizeof
(
buffer
),
0
);
if
(
nbytes
==
-
1
&&
errno
==
EAGAIN
)
{
current
=
0
;
count
=
0
;
return
;
result
=
true
;
}
zmq_assert
(
nbytes
!=
-
1
);
// Check whether we haven't got half of a signal.
zmq_assert
(
nbytes
%
sizeof
(
uint32_t
)
==
0
);
if
(
block_
)
// Set the reader to non-blocking mode.
int
flags
=
fcntl
(
r
,
F_GETFL
,
0
);
if
(
flags
==
-
1
)
flags
=
0
;
int
rc
=
fcntl
(
r
,
F_SETFL
,
flags
|
O_NONBLOCK
);
errno_assert
(
rc
!=
-
1
);
}
current
=
0
;
count
=
nbytes
/
sizeof
(
uint32_t
);
return
result
;
}
#else
...
...
@@ -272,10 +224,13 @@ void zmq::signaler_t::xcheck ()
#include <sys/types.h>
#include <sys/socket.h>
zmq
::
signaler_t
::
signaler_t
()
:
current
(
0
),
count
(
0
)
zmq
::
signaler_t
::
signaler_t
()
{
// Make sure that command can be written to the socket in atomic fashion.
// If this wasn't guaranteed, commands from different threads would be
// interleaved.
zmq_assert
(
sizeof
(
command_t
)
<=
PIPE_BUF
);
int
sv
[
2
];
int
rc
=
socketpair
(
AF_UNIX
,
SOCK_STREAM
,
0
,
sv
);
errno_assert
(
rc
==
0
);
...
...
@@ -289,42 +244,33 @@ zmq::signaler_t::~signaler_t ()
close
(
r
);
}
void
zmq
::
signaler_t
::
s
ignal
(
uint32_t
signal
_
)
void
zmq
::
signaler_t
::
s
end
(
const
command_t
&
cmd
_
)
{
// TODO: Note that send is a blocking operation.
// How should we behave if the
signal
cannot be written to the signaler?
ssize_t
nbytes
=
send
(
w
,
&
signal_
,
sizeof
(
signal_
),
0
);
// How should we behave if the
command
cannot be written to the signaler?
ssize_t
nbytes
=
::
send
(
w
,
&
cmd_
,
sizeof
(
command_t
),
0
);
errno_assert
(
nbytes
!=
-
1
);
zmq_assert
(
nbytes
==
sizeof
(
signal_
));
// This should never happen as we've already checked that command size is
// less than PIPE_BUF.
zmq_assert
(
nbytes
==
sizeof
(
command_t
));
}
void
zmq
::
signaler_t
::
xpoll
(
)
bool
zmq
::
signaler_t
::
recv
(
command_t
*
cmd_
,
bool
block_
)
{
ssize_t
nbytes
=
recv
(
r
,
buffer
,
sizeof
(
buffer
),
0
);
errno_assert
(
nbytes
!=
-
1
);
// Check whether we haven't got half of a signal.
zmq_assert
(
nbytes
%
sizeof
(
uint32_t
)
==
0
);
ssize_t
nbytes
=
::
recv
(
r
,
cmd_
,
sizeof
(
command_t
),
block_
?
0
:
MSG_DONTWAIT
);
current
=
0
;
count
=
nbytes
/
sizeof
(
uint32_t
);
}
// If there's no signal available return false.
if
(
nbytes
==
-
1
&&
errno
==
EAGAIN
)
return
false
;
void
zmq
::
signaler_t
::
xcheck
()
{
ssize_t
nbytes
=
recv
(
r
,
buffer
,
64
,
MSG_DONTWAIT
);
if
(
nbytes
==
-
1
&&
errno
==
EAGAIN
)
{
current
=
0
;
count
=
0
;
return
;
}
errno_assert
(
nbytes
!=
-
1
);
// Check whether we haven't got half of
a signal
.
zmq_assert
(
nbytes
%
sizeof
(
uint32_t
)
==
0
);
// Check whether we haven't got half of
command
.
zmq_assert
(
nbytes
==
sizeof
(
command_t
)
);
current
=
0
;
count
=
nbytes
/
sizeof
(
uint32_t
);
return
true
;
}
#endif
...
...
src/signaler.hpp
View file @
235ed3a3
...
...
@@ -26,15 +26,11 @@
#include "fd.hpp"
#include "stdint.hpp"
#include "config.hpp"
#include "command.hpp"
namespace
zmq
{
// This object can be used to send individual signals from one thread to
// another. The specific of this pipe is that it has associated file
// descriptor and so it can be polled on. Same signal cannot be sent twice
// unless signals are retrieved by the reader side in the meantime.
class
signaler_t
{
public
:
...
...
@@ -42,44 +38,29 @@ namespace zmq
signaler_t
();
~
signaler_t
();
static
const
uint32_t
no_signal
;
void
signal
(
uint32_t
signal_
);
uint32_t
poll
();
uint32_t
check
();
fd_t
get_fd
();
void
send
(
const
command_t
&
cmd_
);
bool
recv
(
command_t
*
cmd_
,
bool
block_
);
private
:
void
xpoll
();
void
xcheck
();
#if defined ZMQ_HAVE_OPENVMS
// Whilst OpenVMS supports socketpair - it maps to AF_INET only.
// Further, it does not set the socket options TCP_NODELAY and
// TCP_NODELACK which can lead to performance problems. We'll
// overload the socketpair function for this class.
//
// The bug will be fixed in V5.6 ECO4 and beyond. In the
// meantime, we'll create the socket pair manually.
static
int
socketpair
(
int
domain_
,
int
type_
,
int
protocol_
,
int
sv_
[
2
]);
// Whilst OpenVMS supports socketpair - it maps to AF_INET only.
// Further, it does not set the socket options TCP_NODELAY and
// TCP_NODELACK which can lead to performance problems. We'll
// overload the socketpair function for this class.
//
// The bug will be fixed in V5.6 ECO4 and beyond. In the
// meantime, we'll create the socket pair manually.
static
int
socketpair
(
int
domain_
,
int
type_
,
int
protocol_
,
int
sv_
[
2
]);
#endif
// Write & read end of the socketpair.
fd_t
w
;
fd_t
r
;
// Signal buffer.
uint32_t
buffer
[
signal_buffer_size
];
// Position of the next signal in the buffer to return to the user.
size_t
current
;
// Number of signals in the signal buffer.
size_t
count
;
// Disable copying of fd_signeler object.
signaler_t
(
const
signaler_t
&
);
void
operator
=
(
const
signaler_t
&
);
...
...
src/ypipe.hpp
View file @
235ed3a3
...
...
@@ -30,31 +30,24 @@ namespace zmq
// Lock-free queue implementation.
// Only a single thread can read from the pipe at any specific moment.
// Only a single thread can write to the pipe at any specific moment.
//
// T is the type of the object in the queue.
// If the template parameter D is set to true, it is quaranteed that
// the pipe will die in a finite time (so that you can swich to some
// other task). If D is set to false, reading from the pipe may result
// in an infinite cycle (if the pipe is continuosly fed by new elements).
// N is granularity of the pipe (how many elements have to be inserted
// till actual memory allocation is required).
template
<
typename
T
,
bool
D
,
int
N
>
class
ypipe_t
// N is granularity of the pipe, i.e. how many messages are needed to
// perform next memory allocation.
template
<
typename
T
,
int
N
>
class
ypipe_t
{
public
:
// Initialises the pipe. In D scenario it is created in dead state.
// Otherwise it's alive.
inline
ypipe_t
()
:
stop
(
false
)
// Initialises the pipe.
inline
ypipe_t
()
{
// Insert terminator element into the queue.
queue
.
push
();
// Let all the pointers to point to the terminator
// Let all the pointers to point to the terminator
.
// (unless pipe is dead, in which case c is set to NULL).
r
=
w
=
&
queue
.
back
();
c
.
set
(
D
?
NULL
:
&
queue
.
back
());
c
.
set
(
&
queue
.
back
());
}
// Following function (write) deliberately copies uninitialised data
...
...
@@ -125,50 +118,17 @@ namespace zmq
return
true
;
// There's no prefetched value, so let us prefetch more values.
// (Note that D is a template parameter. Becaue of that one of
// the following branches will be completely optimised away
// by the compiler.)
if
(
D
)
{
// If one prefetch was already done since last sleeping,
// don't do a new one, rather ask caller to go asleep.
if
(
stop
)
{
stop
=
false
;
return
false
;
}
// Get new items. Perform the operation in atomic fashion.
r
=
c
.
xchg
(
NULL
);
// If there are no elements prefetched, exit and go asleep.
// During pipe's lifetime r should never be NULL, however,
// during pipe shutdown when retrieving messages from it
// to deallocate them, this can happen.
if
(
&
queue
.
front
()
==
r
||
!
r
)
{
stop
=
false
;
return
false
;
}
// We want to do only a single prefetch in D scenario
// before going asleep. Thus, we set stop variable to true
// so that we can return false next time the prefetch is
// attempted.
stop
=
true
;
}
else
{
// Prefetching in non-D scenario is to simply retrieve the
// pointer from c in atomic fashion. If there are no
// items to prefetch, set c to NULL (using compare-and-swap).
r
=
c
.
cas
(
&
queue
.
front
(),
NULL
);
// If there are no elements prefetched, exit.
// During pipe's lifetime r should never be NULL, however,
// it can happen during pipe shutdown when messages
// are being deallocated.
if
(
&
queue
.
front
()
==
r
||
!
r
)
return
false
;
}
// Prefetching is to simply retrieve the
// pointer from c in atomic fashion. If there are no
// items to prefetch, set c to NULL (using compare-and-swap).
r
=
c
.
cas
(
&
queue
.
front
(),
NULL
);
// If there are no elements prefetched, exit.
// During pipe's lifetime r should never be NULL, however,
// it can happen during pipe shutdown when messages
// are being deallocated.
if
(
&
queue
.
front
()
==
r
||
!
r
)
return
false
;
// There was at least one value prefetched.
return
true
;
...
...
@@ -211,11 +171,6 @@ namespace zmq
// atomic operations.
atomic_ptr_t
<
T
>
c
;
// Used only if 'D' template parameter is set to true. If true,
// prefetch was already done since last sleeping and the reader
// should go asleep instead of prefetching once more.
bool
stop
;
// Disable copying of ypipe object.
ypipe_t
(
const
ypipe_t
&
);
void
operator
=
(
const
ypipe_t
&
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment