Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
d90b4071
Commit
d90b4071
authored
Aug 28, 2010
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
refactoring of pipe/swap interaction
parent
42000d2c
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
120 additions
and
113 deletions
+120
-113
pipe.cpp
src/pipe.cpp
+100
-94
pipe.hpp
src/pipe.hpp
+20
-19
No files found.
src/pipe.cpp
View file @
d90b4071
...
...
@@ -27,6 +27,7 @@
zmq
::
reader_t
::
reader_t
(
object_t
*
parent_
,
pipe_t
*
pipe_
,
uint64_t
lwm_
)
:
object_t
(
parent_
),
active
(
true
),
pipe
(
pipe_
),
writer
(
NULL
),
lwm
(
lwm_
),
...
...
@@ -76,12 +77,14 @@ bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_)
bool
zmq
::
reader_t
::
check_read
()
{
if
(
unlikely
(
terminating
)
)
if
(
!
active
)
return
false
;
// Check if there's an item in the pipe.
if
(
!
pipe
->
check_read
())
if
(
!
pipe
->
check_read
())
{
active
=
false
;
return
false
;
}
// If the next item in the pipe is message delimiter,
// initiate its termination.
...
...
@@ -95,11 +98,13 @@ bool zmq::reader_t::check_read ()
bool
zmq
::
reader_t
::
read
(
zmq_msg_t
*
msg_
)
{
if
(
unlikely
(
terminating
)
)
if
(
!
active
)
return
false
;
if
(
!
pipe
->
read
(
msg_
))
if
(
!
pipe
->
read
(
msg_
))
{
active
=
false
;
return
false
;
}
// If delimiter was read, start termination process of the pipe.
unsigned
char
*
offset
=
0
;
...
...
@@ -123,6 +128,7 @@ void zmq::reader_t::terminate ()
if
(
terminating
)
return
;
active
=
false
;
terminating
=
true
;
send_pipe_term
(
writer
);
}
...
...
@@ -130,6 +136,7 @@ void zmq::reader_t::terminate ()
void
zmq
::
reader_t
::
process_activate_reader
()
{
// Forward the event to the sink (either socket or session).
active
=
true
;
sink
->
activated
(
this
);
}
...
...
@@ -150,38 +157,34 @@ void zmq::reader_t::process_pipe_term_ack ()
zmq
::
writer_t
::
writer_t
(
object_t
*
parent_
,
pipe_t
*
pipe_
,
reader_t
*
reader_
,
uint64_t
hwm_
,
int64_t
swap_size_
)
:
object_t
(
parent_
),
active
(
true
),
pipe
(
pipe_
),
reader
(
reader_
),
hwm
(
hwm_
),
msgs_read
(
0
),
msgs_written
(
0
),
msg_store
(
NULL
),
extra_msg_flag
(
false
),
stalled
(
false
),
swap
(
NULL
),
sink
(
NULL
),
terminating
(
false
),
pending_close
(
false
)
swapping
(
false
),
pending_delimiter
(
false
),
terminating
(
false
)
{
// Inform reader about the writer.
reader
->
set_writer
(
this
);
// Open the swap file, if required.
if
(
swap_size_
>
0
)
{
msg_store
=
new
(
std
::
nothrow
)
msg_store_t
(
swap_size_
);
if
(
msg_store
!=
NULL
)
{
if
(
msg_store
->
init
()
<
0
)
{
delete
msg_store
;
msg_store
=
NULL
;
}
}
swap
=
new
(
std
::
nothrow
)
msg_store_t
(
swap_size_
);
zmq_assert
(
swap
);
int
rc
=
swap
->
init
();
zmq_assert
(
rc
==
0
);
}
}
zmq
::
writer_t
::~
writer_t
()
{
if
(
extra_msg_flag
)
zmq_msg_close
(
&
extra_msg
);
delete
msg_store
;
if
(
swap
)
delete
swap
;
}
void
zmq
::
writer_t
::
set_event_sink
(
i_writer_events
*
sink_
)
...
...
@@ -192,13 +195,26 @@ void zmq::writer_t::set_event_sink (i_writer_events *sink_)
bool
zmq
::
writer_t
::
check_write
()
{
if
(
terminating
)
return
false
;
if
(
pipe_full
()
&&
(
msg_store
==
NULL
||
msg_store
->
full
()
||
extra_msg_flag
))
{
stalled
=
true
;
// We've already checked and there's no space free for the new message.
// There's no point in checking once again.
if
(
unlikely
(
!
active
))
return
false
;
if
(
unlikely
(
swapping
))
{
if
(
unlikely
(
swap
->
full
()))
{
active
=
false
;
return
false
;
}
}
else
{
if
(
unlikely
(
pipe_full
()))
{
if
(
swap
)
swapping
=
true
;
else
{
active
=
false
;
return
false
;
}
}
}
return
true
;
...
...
@@ -206,58 +222,44 @@ bool zmq::writer_t::check_write ()
bool
zmq
::
writer_t
::
write
(
zmq_msg_t
*
msg_
)
{
if
(
terminating
)
return
false
;
if
(
!
check_write
())
if
(
unlikely
(
!
check_write
()))
return
false
;
if
(
pipe_full
())
{
if
(
msg_store
->
store
(
msg_
))
{
if
(
!
(
msg_
->
flags
&
ZMQ_MSG_MORE
))
msg_store
->
commit
();
}
else
{
extra_msg
=
*
msg_
;
extra_msg_flag
=
true
;
}
}
else
{
pipe
->
write
(
*
msg_
,
msg_
->
flags
&
ZMQ_MSG_MORE
);
if
(
unlikely
(
swapping
))
{
bool
stored
=
swap
->
store
(
msg_
);
zmq_assert
(
stored
);
if
(
!
(
msg_
->
flags
&
ZMQ_MSG_MORE
))
msgs_written
++
;
swap
->
commit
();
return
true
;
}
pipe
->
write
(
*
msg_
,
msg_
->
flags
&
ZMQ_MSG_MORE
);
if
(
!
(
msg_
->
flags
&
ZMQ_MSG_MORE
))
msgs_written
++
;
return
true
;
}
void
zmq
::
writer_t
::
rollback
()
{
if
(
extra_msg_flag
&&
extra_msg
.
flags
&
ZMQ_MSG_MORE
)
{
zmq_msg_close
(
&
extra_msg
);
extra_msg_flag
=
false
;
// Remove incomplete message from the swap.
if
(
unlikely
(
swapping
))
{
swap
->
rollback
();
return
;
}
if
(
msg_store
!=
NULL
)
msg_store
->
rollback
();
// Remove incomplete message from the pipe.
zmq_msg_t
msg
;
// Remove all incomplete messages from the pipe.
while
(
pipe
->
unwrite
(
&
msg
))
{
zmq_assert
(
msg
.
flags
&
ZMQ_MSG_MORE
);
zmq_msg_close
(
&
msg
);
msgs_written
--
;
}
if
(
stalled
&&
check_write
())
{
stalled
=
false
;
zmq_assert
(
sink
);
sink
->
activated
(
this
);
}
}
void
zmq
::
writer_t
::
flush
()
{
if
(
!
pipe
->
flush
())
// In the swapping mode, flushing is automatically handled by swap object.
if
(
!
swapping
&&
!
pipe
->
flush
())
send_activate_reader
(
reader
);
}
...
...
@@ -267,19 +269,20 @@ void zmq::writer_t::terminate ()
if
(
terminating
)
return
;
if
(
msg_store
==
NULL
||
(
msg_store
->
empty
()
&&
!
extra_msg_flag
))
write_delimiter
();
else
pending_close
=
true
;
}
// Mark the pipe as not available for writing.
active
=
false
;
void
zmq
::
writer_t
::
write_delimiter
()
{
// Rollback any unfinished messages.
rollback
();
// Push delimiter into the pipe.
// Trick the compiler to belive that the tag is a valid pointer.
if
(
swapping
)
{
pending_delimiter
=
true
;
return
;
}
// Push delimiter into the pipe. Trick the compiler to belive that
// the tag is a valid pointer. Note that watermarks are not checked
// thus the delimiter can be written even though the pipe is full.
zmq_msg_t
msg
;
const
unsigned
char
*
offset
=
0
;
msg
.
content
=
(
void
*
)
(
offset
+
ZMQ_DELIMITER
);
...
...
@@ -290,44 +293,47 @@ void zmq::writer_t::write_delimiter ()
void
zmq
::
writer_t
::
process_activate_writer
(
uint64_t
msgs_read_
)
{
zmq_msg_t
msg
;
// Store the reader's message sequence number.
msgs_read
=
msgs_read_
;
if
(
msg_store
)
{
// Move messages from backing store into pipe.
while
(
!
pipe_full
()
&&
!
msg_store
->
empty
())
{
msg_store
->
fetch
(
&
msg
);
// Write message into the pipe.
// If we are in the swapping mode, we have some messages in the swap.
// Given that pipe is now ready for writing we can move part of the
// swap into the pipe.
if
(
swapping
)
{
zmq_msg_t
msg
;
while
(
!
pipe_full
()
&&
!
swap
->
empty
())
{
swap
->
fetch
(
&
msg
);
pipe
->
write
(
msg
,
msg
.
flags
&
ZMQ_MSG_MORE
);
if
(
!
(
msg
.
flags
&
ZMQ_MSG_MORE
))
msgs_written
++
;
}
if
(
!
pipe
->
flush
())
send_activate_reader
(
reader
);
}
if
(
extra_msg_flag
)
{
if
(
!
pipe_full
())
{
pipe
->
write
(
extra_msg
,
extra_msg
.
flags
&
ZMQ_MSG_MORE
);
if
(
!
(
extra_msg
.
flags
&
ZMQ_MSG_MORE
))
msgs_written
++
;
extra_msg_flag
=
false
;
}
else
if
(
msg_store
->
store
(
&
extra_msg
))
{
if
(
!
(
extra_msg
.
flags
&
ZMQ_MSG_MORE
))
msg_store
->
commit
();
extra_msg_flag
=
false
;
}
}
if
(
pending_close
&&
msg_store
->
empty
()
&&
!
extra_msg_flag
)
{
write_delimiter
();
pending_close
=
false
;
// There are no more messages in the swap. We can switch into
// standard in-memory mode.
if
(
swap
->
empty
())
{
swapping
=
false
;
// Push delimiter into the pipe. Trick the compiler to belive that
// the tag is a valid pointer. Note that watermarks are not checked
// thus the delimiter can be written even though the pipe is full.
if
(
pending_delimiter
)
{
zmq_msg_t
msg
;
const
unsigned
char
*
offset
=
0
;
msg
.
content
=
(
void
*
)
(
offset
+
ZMQ_DELIMITER
);
msg
.
flags
=
0
;
pipe
->
write
(
msg
,
false
);
flush
();
return
;
}
flush
();
}
if
(
stalled
)
{
stalled
=
false
;
// If the writer was non-active before, let's make it active
// (available for writing messages to).
if
(
!
active
)
{
active
=
true
;
zmq_assert
(
sink
);
sink
->
activated
(
this
);
}
...
...
src/pipe.hpp
View file @
d90b4071
...
...
@@ -87,6 +87,9 @@ namespace zmq
// Returns true if the message is delimiter; false otherwise.
static
bool
is_delimiter
(
zmq_msg_t
&
msg_
);
// True, if pipe can be read from.
bool
active
;
// The underlying pipe.
pipe_t
*
pipe
;
...
...
@@ -127,8 +130,8 @@ namespace zmq
void
set_event_sink
(
i_writer_events
*
endpoint_
);
// Checks whether a message can be written to the pipe.
// If writing the message would cause high watermark
to be
// exceeded, the function returns false.
// If writing the message would cause high watermark
and (optionally)
//
swap to be
exceeded, the function returns false.
bool
check_write
();
// Writes a message to the underlying pipe. Returns false if the
...
...
@@ -150,17 +153,17 @@ namespace zmq
uint64_t
hwm_
,
int64_t
swap_size_
);
~
writer_t
();
void
process_activate_writer
(
uint64_t
msgs_read_
);
// Command handlers.
void
process_activate_writer
(
uint64_t
msgs_read_
);
void
process_pipe_term
();
// Tests whether the pipe is already full.
// Tests whether underlying pipe is already full. The swap is not
// taken into account.
bool
pipe_full
();
//
Write special message to the pipe so that the reader
//
can find out we are finished
.
void
write_delimiter
()
;
//
True, if this object can be written to. Undelying ypipe may be full
//
but as long as there's swap space available, this flag is true
.
bool
active
;
// The underlying pipe.
pipe_t
*
pipe
;
...
...
@@ -178,26 +181,24 @@ namespace zmq
// Number of messages we have written so far.
uint64_t
msgs_written
;
// Pointer to
backing store
. If NULL, messages are always
// Pointer to
the message swap
. If NULL, messages are always
// kept in main memory.
msg_store_t
*
msg_store
;
bool
extra_msg_flag
;
zmq_msg_t
extra_msg
;
// True iff the last attempt to write a message has failed.
bool
stalled
;
msg_store_t
*
swap
;
// Sink for the events (either the socket or the session).
i_writer_events
*
sink
;
// If true, swap is active. New messages are to be written to the swap.
bool
swapping
;
// If true, there's a delimiter to be written to the pipe after the
// swap is empied.
bool
pending_delimiter
;
// True is 'terminate' method was called of 'pipe_term' command
// arrived from the reader.
bool
terminating
;
bool
pending_close
;
writer_t
(
const
writer_t
&
);
void
operator
=
(
const
writer_t
&
);
};
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment