Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
ed291b02
Commit
ed291b02
authored
Mar 27, 2010
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
multi-part messages work with PUB/SUB
parent
0b9897b1
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
69 additions
and
12 deletions
+69
-12
fq.cpp
src/fq.cpp
+25
-6
fq.hpp
src/fq.hpp
+4
-0
pipe.cpp
src/pipe.cpp
+8
-2
pub.cpp
src/pub.cpp
+2
-1
sub.cpp
src/sub.cpp
+25
-2
sub.hpp
src/sub.hpp
+4
-0
ypipe.hpp
src/ypipe.hpp
+1
-1
No files found.
src/fq.cpp
View file @
ed291b02
...
@@ -25,7 +25,8 @@
...
@@ -25,7 +25,8 @@
zmq
::
fq_t
::
fq_t
()
:
zmq
::
fq_t
::
fq_t
()
:
active
(
0
),
active
(
0
),
current
(
0
)
current
(
0
),
tbc
(
false
)
{
{
}
}
...
@@ -44,6 +45,8 @@ void zmq::fq_t::attach (reader_t *pipe_)
...
@@ -44,6 +45,8 @@ void zmq::fq_t::attach (reader_t *pipe_)
void
zmq
::
fq_t
::
detach
(
reader_t
*
pipe_
)
void
zmq
::
fq_t
::
detach
(
reader_t
*
pipe_
)
{
{
zmq_assert
(
!
tbc
||
pipes
[
current
]
!=
pipe_
);
// Remove the pipe from the list; adjust number of active pipes
// Remove the pipe from the list; adjust number of active pipes
// accordingly.
// accordingly.
if
(
pipes
.
index
(
pipe_
)
<
active
)
{
if
(
pipes
.
index
(
pipe_
)
<
active
)
{
...
@@ -75,14 +78,26 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)
...
@@ -75,14 +78,26 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)
// Deallocate old content of the message.
// Deallocate old content of the message.
zmq_msg_close
(
msg_
);
zmq_msg_close
(
msg_
);
// Round-robin over the pipes to get next message.
// Round-robin over the pipes to get
the
next message.
for
(
int
count
=
active
;
count
!=
0
;
count
--
)
{
for
(
int
count
=
active
;
count
!=
0
;
count
--
)
{
// Try to fetch new message. If we've already read part of the message
// subsequent part should be immediately available.
bool
fetched
=
pipes
[
current
]
->
read
(
msg_
);
bool
fetched
=
pipes
[
current
]
->
read
(
msg_
);
current
++
;
zmq_assert
(
!
(
tbc
&&
!
fetched
));
if
(
current
>=
active
)
current
=
0
;
// Note that when message is not fetched, current pipe is killed and
if
(
fetched
)
// replaced by another active pipe. Thus we don't have to increase
// the 'current' pointer.
if
(
fetched
)
{
tbc
=
msg_
->
flags
&
ZMQ_MSG_TBC
;
if
(
!
tbc
)
{
current
++
;
if
(
current
>=
active
)
current
=
0
;
}
return
0
;
return
0
;
}
}
}
// No message is available. Initialise the output parameter
// No message is available. Initialise the output parameter
...
@@ -94,6 +109,10 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)
...
@@ -94,6 +109,10 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)
bool
zmq
::
fq_t
::
has_in
()
bool
zmq
::
fq_t
::
has_in
()
{
{
// There are subsequent parts of the partly-read message available.
if
(
tbc
)
return
true
;
// Note that messing with current doesn't break the fairness of fair
// Note that messing with current doesn't break the fairness of fair
// queueing algorithm. If there are no messages available current will
// queueing algorithm. If there are no messages available current will
// get back to its original value. Otherwise it'll point to the first
// get back to its original value. Otherwise it'll point to the first
...
...
src/fq.hpp
View file @
ed291b02
...
@@ -55,6 +55,10 @@ namespace zmq
...
@@ -55,6 +55,10 @@ namespace zmq
// Index of the next bound pipe to read a message from.
// Index of the next bound pipe to read a message from.
pipes_t
::
size_type
current
;
pipes_t
::
size_type
current
;
// If true, part of a multipart message was already received, but
// there are following parts still waiting in the current pipe.
bool
tbc
;
fq_t
(
const
fq_t
&
);
fq_t
(
const
fq_t
&
);
void
operator
=
(
const
fq_t
&
);
void
operator
=
(
const
fq_t
&
);
};
};
...
...
src/pipe.cpp
View file @
ed291b02
...
@@ -77,7 +77,9 @@ bool zmq::reader_t::read (zmq_msg_t *msg_)
...
@@ -77,7 +77,9 @@ bool zmq::reader_t::read (zmq_msg_t *msg_)
return
false
;
return
false
;
}
}
msgs_read
++
;
if
(
!
(
msg_
->
flags
&
ZMQ_MSG_TBC
))
msgs_read
++
;
if
(
lwm
>
0
&&
msgs_read
%
lwm
==
0
)
if
(
lwm
>
0
&&
msgs_read
%
lwm
==
0
)
send_reader_info
(
peer
,
msgs_read
);
send_reader_info
(
peer
,
msgs_read
);
...
@@ -161,7 +163,8 @@ bool zmq::writer_t::write (zmq_msg_t *msg_)
...
@@ -161,7 +163,8 @@ bool zmq::writer_t::write (zmq_msg_t *msg_)
}
}
pipe
->
write
(
*
msg_
);
pipe
->
write
(
*
msg_
);
msgs_written
++
;
if
(
!
(
msg_
->
flags
&
ZMQ_MSG_TBC
))
msgs_written
++
;
return
true
;
return
true
;
}
}
...
@@ -194,6 +197,9 @@ void zmq::writer_t::term ()
...
@@ -194,6 +197,9 @@ void zmq::writer_t::term ()
{
{
endpoint
=
NULL
;
endpoint
=
NULL
;
// Rollback any unfinished messages.
rollback
();
// Push delimiter into the pipe.
// Push delimiter into the pipe.
// Trick the compiler to belive that the tag is a valid pointer.
// Trick the compiler to belive that the tag is a valid pointer.
zmq_msg_t
msg
;
zmq_msg_t
msg
;
...
...
src/pub.cpp
View file @
ed291b02
...
@@ -170,7 +170,8 @@ bool zmq::pub_t::write (class writer_t *pipe_, zmq_msg_t *msg_)
...
@@ -170,7 +170,8 @@ bool zmq::pub_t::write (class writer_t *pipe_, zmq_msg_t *msg_)
pipes
.
swap
(
pipes
.
index
(
pipe_
),
active
);
pipes
.
swap
(
pipes
.
index
(
pipe_
),
active
);
return
false
;
return
false
;
}
}
pipe_
->
flush
();
if
(
!
(
msg_
->
flags
&
ZMQ_MSG_TBC
))
pipe_
->
flush
();
return
true
;
return
true
;
}
}
src/sub.cpp
View file @
ed291b02
...
@@ -26,7 +26,8 @@
...
@@ -26,7 +26,8 @@
zmq
::
sub_t
::
sub_t
(
class
app_thread_t
*
parent_
)
:
zmq
::
sub_t
::
sub_t
(
class
app_thread_t
*
parent_
)
:
socket_base_t
(
parent_
),
socket_base_t
(
parent_
),
has_message
(
false
)
has_message
(
false
),
tbc
(
false
)
{
{
options
.
requires_in
=
true
;
options
.
requires_in
=
true
;
options
.
requires_out
=
false
;
options
.
requires_out
=
false
;
...
@@ -105,6 +106,7 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
...
@@ -105,6 +106,7 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
if
(
has_message
)
{
if
(
has_message
)
{
zmq_msg_move
(
msg_
,
&
message
);
zmq_msg_move
(
msg_
,
&
message
);
has_message
=
false
;
has_message
=
false
;
tbc
=
msg_
->
flags
&
ZMQ_MSG_TBC
;
return
0
;
return
0
;
}
}
...
@@ -122,13 +124,27 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
...
@@ -122,13 +124,27 @@ int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
return
-
1
;
return
-
1
;
// Check whether the message matches at least one subscription.
// Check whether the message matches at least one subscription.
if
(
match
(
msg_
))
// Non-initial parts of the message are passed
if
(
tbc
||
match
(
msg_
))
{
tbc
=
msg_
->
flags
&
ZMQ_MSG_TBC
;
return
0
;
return
0
;
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
while
(
msg_
->
flags
&
ZMQ_MSG_TBC
)
{
rc
=
fq
.
recv
(
msg_
,
ZMQ_NOBLOCK
);
zmq_assert
(
rc
==
0
);
}
}
}
}
}
bool
zmq
::
sub_t
::
xhas_in
()
bool
zmq
::
sub_t
::
xhas_in
()
{
{
// There are subsequent parts of the partly-read message available.
if
(
tbc
)
return
true
;
// If there's already a message prepared by a previous call to zmq_poll,
// If there's already a message prepared by a previous call to zmq_poll,
// return straight ahead.
// return straight ahead.
if
(
has_message
)
if
(
has_message
)
...
@@ -153,6 +169,13 @@ bool zmq::sub_t::xhas_in ()
...
@@ -153,6 +169,13 @@ bool zmq::sub_t::xhas_in ()
has_message
=
true
;
has_message
=
true
;
return
true
;
return
true
;
}
}
// Message doesn't match. Pop any remaining parts of the message
// from the pipe.
while
(
message
.
flags
&
ZMQ_MSG_TBC
)
{
rc
=
fq
.
recv
(
&
message
,
ZMQ_NOBLOCK
);
zmq_assert
(
rc
==
0
);
}
}
}
}
}
...
...
src/sub.hpp
View file @
ed291b02
...
@@ -68,6 +68,10 @@ namespace zmq
...
@@ -68,6 +68,10 @@ namespace zmq
bool
has_message
;
bool
has_message
;
zmq_msg_t
message
;
zmq_msg_t
message
;
// If true, part of a multipart message was already received, but
// there are following parts still waiting.
bool
tbc
;
sub_t
(
const
sub_t
&
);
sub_t
(
const
sub_t
&
);
void
operator
=
(
const
sub_t
&
);
void
operator
=
(
const
sub_t
&
);
};
};
...
...
src/ypipe.hpp
View file @
ed291b02
...
@@ -84,8 +84,8 @@ namespace zmq
...
@@ -84,8 +84,8 @@ namespace zmq
{
{
if
(
w
==
&
queue
.
back
())
if
(
w
==
&
queue
.
back
())
return
false
;
return
false
;
*
value_
=
queue
.
back
();
queue
.
unpush
();
queue
.
unpush
();
*
value_
=
queue
.
back
();
return
true
;
return
true
;
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment