Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
ee7313b4
Commit
ee7313b4
authored
May 31, 2011
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Subscriptions are processed immediately in XPUB socket
Signed-off-by:
Martin Sustrik
<
sustrik@250bpm.com
>
parent
a24a7c15
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
42 additions
and
93 deletions
+42
-93
xpub.cpp
src/xpub.cpp
+42
-85
xpub.hpp
src/xpub.hpp
+0
-8
No files found.
src/xpub.cpp
View file @
ee7313b4
...
...
@@ -39,12 +39,41 @@ void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
{
zmq_assert
(
pipe_
);
dist
.
attach
(
pipe_
);
fq
.
attach
(
pipe_
);
// The pipe is active when attached. Let's read the subscriptions from
// it, if any.
xread_activated
(
pipe_
);
}
void
zmq
::
xpub_t
::
xread_activated
(
pipe_t
*
pipe_
)
{
fq
.
activated
(
pipe_
);
// There are some subscriptions waiting. Let's process them.
msg_t
sub
;
sub
.
init
();
while
(
true
)
{
// Grab next subscription.
if
(
!
pipe_
->
read
(
&
sub
))
{
sub
.
close
();
return
;
}
// Apply the subscription to the trie.
unsigned
char
*
data
=
(
unsigned
char
*
)
sub
.
data
();
size_t
size
=
sub
.
size
();
zmq_assert
(
size
>
0
&&
(
*
data
==
0
||
*
data
==
1
));
bool
unique
;
if
(
*
data
==
0
)
unique
=
subscriptions
.
rm
(
data
+
1
,
size
-
1
,
pipe_
);
else
unique
=
subscriptions
.
add
(
data
+
1
,
size
-
1
,
pipe_
);
// If the subscription is not a duplicate store it so that it can be
// passed to used on next recv call.
if
(
unique
&&
options
.
type
!=
ZMQ_PUB
)
pending
.
push_back
(
blob_t
((
unsigned
char
*
)
sub
.
data
(),
sub
.
size
()));
}
}
void
zmq
::
xpub_t
::
xwrite_activated
(
pipe_t
*
pipe_
)
...
...
@@ -60,31 +89,10 @@ void zmq::xpub_t::xterminated (pipe_t *pipe_)
subscriptions
.
rm
(
pipe_
,
send_unsubscription
,
this
);
dist
.
terminated
(
pipe_
);
fq
.
terminated
(
pipe_
);
}
int
zmq
::
xpub_t
::
xsend
(
msg_t
*
msg_
,
int
flags_
)
{
// First, process any (un)subscriptions from downstream.
msg_t
sub
;
sub
.
init
();
while
(
true
)
{
// Grab next subscription.
pipe_t
*
pipe
;
int
rc
=
fq
.
recvpipe
(
&
sub
,
0
,
&
pipe
);
if
(
rc
!=
0
&&
errno
==
EAGAIN
)
break
;
errno_assert
(
rc
==
0
);
// Apply the subscription to the trie. If it's not a duplicate,
// store it so that it can be passed to used on next recv call.
if
(
apply_subscription
(
&
sub
,
pipe
)
&&
options
.
type
!=
ZMQ_PUB
)
pending
.
push_back
(
blob_t
((
unsigned
char
*
)
sub
.
data
(),
sub
.
size
()));
}
sub
.
close
();
{
return
dist
.
send
(
msg_
,
flags_
);
}
...
...
@@ -96,75 +104,24 @@ bool zmq::xpub_t::xhas_out ()
int
zmq
::
xpub_t
::
xrecv
(
msg_t
*
msg_
,
int
flags_
)
{
// If there is at least one
if
(
!
pending
.
empty
())
{
int
rc
=
msg_
->
close
();
errno_assert
(
rc
==
0
);
rc
=
msg_
->
init_size
(
pending
.
front
().
size
());
errno_assert
(
rc
==
0
);
memcpy
(
msg_
->
data
(),
pending
.
front
().
data
(),
pending
.
front
().
size
());
pending
.
pop_front
();
return
0
;
}
// Grab and apply next subscription.
pipe_t
*
pipe
;
int
rc
=
fq
.
recvpipe
(
msg_
,
0
,
&
pipe
);
if
(
rc
!=
0
)
return
-
1
;
if
(
!
apply_subscription
(
msg_
,
pipe
))
{
// TODO: This should be a loop rather!
msg_
->
close
();
msg_
->
init
();
if
(
pending
.
empty
())
{
errno
=
EAGAIN
;
return
-
1
;
}
int
rc
=
msg_
->
close
();
errno_assert
(
rc
==
0
);
rc
=
msg_
->
init_size
(
pending
.
front
().
size
());
errno_assert
(
rc
==
0
);
memcpy
(
msg_
->
data
(),
pending
.
front
().
data
(),
pending
.
front
().
size
());
pending
.
pop_front
();
return
0
;
}
bool
zmq
::
xpub_t
::
xhas_in
()
{
if
(
!
pending
.
empty
())
return
true
;
// Even if there are subscriptions in the fair-queuer they may be
// duplicates. Thus, we have to check by hand wheter there is any
// subscription available to pass upstream.
// First, process any (un)subscriptions from downstream.
msg_t
sub
;
sub
.
init
();
while
(
true
)
{
// Grab next subscription.
pipe_t
*
pipe
;
int
rc
=
fq
.
recvpipe
(
&
sub
,
0
,
&
pipe
);
if
(
rc
!=
0
&&
errno
==
EAGAIN
)
{
sub
.
close
();
return
false
;
}
errno_assert
(
rc
==
0
);
// Apply the subscription to the trie. If it's not a duplicate store
// it so that it can be passed to used on next recv call.
if
(
apply_subscription
(
&
sub
,
pipe
)
&&
options
.
type
!=
ZMQ_PUB
)
{
pending
.
push_back
(
blob_t
((
unsigned
char
*
)
sub
.
data
(),
sub
.
size
()));
sub
.
close
();
return
true
;
}
}
}
bool
zmq
::
xpub_t
::
apply_subscription
(
msg_t
*
sub_
,
pipe_t
*
pipe_
)
{
unsigned
char
*
data
=
(
unsigned
char
*
)
sub_
->
data
();
size_t
size
=
sub_
->
size
();
zmq_assert
(
size
>
0
&&
(
*
data
==
0
||
*
data
==
1
));
if
(
*
data
==
0
)
return
subscriptions
.
rm
(
data
+
1
,
size
-
1
,
pipe_
);
else
return
subscriptions
.
add
(
data
+
1
,
size
-
1
,
pipe_
);
return
!
pending
.
empty
();
}
void
zmq
::
xpub_t
::
send_unsubscription
(
unsigned
char
*
data_
,
size_t
size_
,
...
...
src/xpub.hpp
View file @
ee7313b4
...
...
@@ -28,7 +28,6 @@
#include "array.hpp"
#include "blob.hpp"
#include "dist.hpp"
#include "fq.hpp"
namespace
zmq
{
...
...
@@ -53,10 +52,6 @@ namespace zmq
private
:
// Applies the subscription to the trie. Return false if it is a
// duplicate.
bool
apply_subscription
(
class
msg_t
*
sub_
,
class
pipe_t
*
pipe_
);
// Function to be applied to the trie to send all the subsciptions
// upstream.
static
void
send_unsubscription
(
unsigned
char
*
data_
,
size_t
size_
,
...
...
@@ -68,9 +63,6 @@ namespace zmq
// Distributor of messages holding the list of outbound pipes.
dist_t
dist
;
// Object to fair-queue the subscription requests.
fq_t
fq
;
// List of pending (un)subscriptions, ie. those that were already
// applied to the trie, but not yet received by the user.
typedef
std
::
deque
<
blob_t
>
pending_t
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment