Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
42000d2c
Commit
42000d2c
authored
Aug 28, 2010
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
terminology unified: revive->activate
parent
92923cd4
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
34 additions
and
34 deletions
+34
-34
command.hpp
src/command.hpp
+6
-7
fq.cpp
src/fq.cpp
+2
-2
object.cpp
src/object.cpp
+13
-13
object.hpp
src/object.hpp
+4
-4
pipe.cpp
src/pipe.cpp
+4
-4
pipe.hpp
src/pipe.hpp
+2
-2
socket_base.cpp
src/socket_base.cpp
+3
-2
No files found.
src/command.hpp
View file @
42000d2c
...
...
@@ -39,8 +39,8 @@ namespace zmq
own
,
attach
,
bind
,
revive
,
reader_info
,
activate_reader
,
activate_writer
,
pipe_term
,
pipe_term_ack
,
term_req
,
...
...
@@ -83,14 +83,13 @@ namespace zmq
// Sent by pipe writer to inform dormant pipe reader that there
// are messages in the pipe.
struct
{
}
revive
;
}
activate_reader
;
// Sent by pipe reader to inform pipe writer
// about how many messages it has read so far.
// Used to implement the flow control.
// Sent by pipe reader to inform pipe writer about how many
// messages it has read so far.
struct
{
uint64_t
msgs_read
;
}
reader_info
;
}
activate_writer
;
// Sent by pipe reader to pipe writer to ask it to terminate
// its end of the pipe.
...
...
src/fq.cpp
View file @
42000d2c
...
...
@@ -103,8 +103,8 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)
// without blocking.
zmq_assert
(
!
(
more
&&
!
fetched
));
// Note that when message is not fetched, current pipe is
killed an
d
// replaced by another active pipe. Thus we don't have to increase
// Note that when message is not fetched, current pipe is
deactivate
d
//
and
replaced by another active pipe. Thus we don't have to increase
// the 'current' pointer.
if
(
fetched
)
{
more
=
msg_
->
flags
&
ZMQ_MSG_MORE
;
...
...
src/object.cpp
View file @
42000d2c
...
...
@@ -57,8 +57,12 @@ void zmq::object_t::process_command (command_t &cmd_)
{
switch
(
cmd_
.
type
)
{
case
command_t
:
:
revive
:
process_revive
();
case
command_t
:
:
activate_reader
:
process_activate_reader
();
break
;
case
command_t
:
:
activate_writer
:
process_activate_writer
(
cmd_
.
args
.
activate_writer
.
msgs_read
);
break
;
case
command_t
:
:
stop
:
...
...
@@ -89,10 +93,6 @@ void zmq::object_t::process_command (command_t &cmd_)
process_seqnum
();
break
;
case
command_t
:
:
reader_info
:
process_reader_info
(
cmd_
.
args
.
reader_info
.
msgs_read
);
break
;
case
command_t
:
:
pipe_term
:
process_pipe_term
();
return
;
...
...
@@ -248,18 +248,18 @@ void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_,
send_command
(
cmd
);
}
void
zmq
::
object_t
::
send_
revive
(
object
_t
*
destination_
)
void
zmq
::
object_t
::
send_
activate_reader
(
reader
_t
*
destination_
)
{
command_t
cmd
;
#if defined ZMQ_MAKE_VALGRIND_HAPPY
memset
(
&
cmd
,
0
,
sizeof
(
cmd
));
#endif
cmd
.
destination
=
destination_
;
cmd
.
type
=
command_t
::
revive
;
cmd
.
type
=
command_t
::
activate_reader
;
send_command
(
cmd
);
}
void
zmq
::
object_t
::
send_
reader_info
(
writer_t
*
destination_
,
void
zmq
::
object_t
::
send_
activate_writer
(
writer_t
*
destination_
,
uint64_t
msgs_read_
)
{
command_t
cmd
;
...
...
@@ -267,8 +267,8 @@ void zmq::object_t::send_reader_info (writer_t *destination_,
memset
(
&
cmd
,
0
,
sizeof
(
cmd
));
#endif
cmd
.
destination
=
destination_
;
cmd
.
type
=
command_t
::
reader_info
;
cmd
.
args
.
reader_info
.
msgs_read
=
msgs_read_
;
cmd
.
type
=
command_t
::
activate_writer
;
cmd
.
args
.
activate_writer
.
msgs_read
=
msgs_read_
;
send_command
(
cmd
);
}
...
...
@@ -356,12 +356,12 @@ void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
zmq_assert
(
false
);
}
void
zmq
::
object_t
::
process_
revive
()
void
zmq
::
object_t
::
process_
activate_reader
()
{
zmq_assert
(
false
);
}
void
zmq
::
object_t
::
process_
reader_info
(
uint64_t
msgs_read_
)
void
zmq
::
object_t
::
process_
activate_writer
(
uint64_t
msgs_read_
)
{
zmq_assert
(
false
);
}
...
...
src/object.hpp
View file @
42000d2c
...
...
@@ -68,8 +68,8 @@ namespace zmq
void
send_bind
(
class
own_t
*
destination_
,
class
reader_t
*
in_pipe_
,
class
writer_t
*
out_pipe_
,
const
blob_t
&
peer_identity_
,
bool
inc_seqnum_
=
true
);
void
send_
revive
(
class
object
_t
*
destination_
);
void
send_
reader_info
(
class
writer_t
*
destination_
,
void
send_
activate_reader
(
class
reader
_t
*
destination_
);
void
send_
activate_writer
(
class
writer_t
*
destination_
,
uint64_t
msgs_read_
);
void
send_pipe_term
(
class
writer_t
*
destination_
);
void
send_pipe_term_ack
(
class
reader_t
*
destination_
);
...
...
@@ -87,8 +87,8 @@ namespace zmq
const
blob_t
&
peer_identity_
);
virtual
void
process_bind
(
class
reader_t
*
in_pipe_
,
class
writer_t
*
out_pipe_
,
const
blob_t
&
peer_identity_
);
virtual
void
process_
revive
();
virtual
void
process_
reader_info
(
uint64_t
msgs_read_
);
virtual
void
process_
activate_reader
();
virtual
void
process_
activate_writer
(
uint64_t
msgs_read_
);
virtual
void
process_pipe_term
();
virtual
void
process_pipe_term_ack
();
virtual
void
process_term_req
(
class
own_t
*
object_
);
...
...
src/pipe.cpp
View file @
42000d2c
...
...
@@ -112,7 +112,7 @@ bool zmq::reader_t::read (zmq_msg_t *msg_)
msgs_read
++
;
if
(
lwm
>
0
&&
msgs_read
%
lwm
==
0
)
send_
reader_info
(
writer
,
msgs_read
);
send_
activate_writer
(
writer
,
msgs_read
);
return
true
;
}
...
...
@@ -127,7 +127,7 @@ void zmq::reader_t::terminate ()
send_pipe_term
(
writer
);
}
void
zmq
::
reader_t
::
process_
revive
()
void
zmq
::
reader_t
::
process_
activate_reader
()
{
// Forward the event to the sink (either socket or session).
sink
->
activated
(
this
);
...
...
@@ -258,7 +258,7 @@ void zmq::writer_t::rollback ()
void
zmq
::
writer_t
::
flush
()
{
if
(
!
pipe
->
flush
())
send_
revive
(
reader
);
send_
activate_reader
(
reader
);
}
void
zmq
::
writer_t
::
terminate
()
...
...
@@ -288,7 +288,7 @@ void zmq::writer_t::write_delimiter ()
flush
();
}
void
zmq
::
writer_t
::
process_
reader_info
(
uint64_t
msgs_read_
)
void
zmq
::
writer_t
::
process_
activate_writer
(
uint64_t
msgs_read_
)
{
zmq_msg_t
msg
;
...
...
src/pipe.hpp
View file @
42000d2c
...
...
@@ -81,7 +81,7 @@ namespace zmq
void
set_writer
(
class
writer_t
*
writer_
);
// Command handlers.
void
process_
revive
();
void
process_
activate_reader
();
void
process_pipe_term_ack
();
// Returns true if the message is delimiter; false otherwise.
...
...
@@ -150,7 +150,7 @@ namespace zmq
uint64_t
hwm_
,
int64_t
swap_size_
);
~
writer_t
();
void
process_
reader_info
(
uint64_t
msgs_read_
);
void
process_
activate_writer
(
uint64_t
msgs_read_
);
// Command handlers.
void
process_pipe_term
();
...
...
src/socket_base.cpp
View file @
42000d2c
...
...
@@ -495,8 +495,9 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
errno
=
err
;
// If the message cannot be fetched immediately, there are two scenarios.
// For non-blocking recv, commands are processed in case there's a revive
// command already waiting int a command pipe. If it's not, return EAGAIN.
// For non-blocking recv, commands are processed in case there's an
// activate_reader command already waiting int a command pipe.
// If it's not, return EAGAIN.
if
(
flags_
&
ZMQ_NOBLOCK
)
{
if
(
errno
!=
EAGAIN
)
return
-
1
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment