Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
1dd9cac2
Commit
1dd9cac2
authored
7 years ago
by
evoskuil
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Problem: xpub_t.xattach_pipe no handle msg.copy or pipe_t.write fails.
parent
6436bc51
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
33 additions
and
32 deletions
+33
-32
xpub.cpp
src/xpub.cpp
+33
-32
No files found.
src/xpub.cpp
View file @
1dd9cac2
...
...
@@ -41,18 +41,18 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
verbose_unsubs
(
false
),
more
(
false
),
lossy
(
true
),
manual
(
false
),
manual
(
false
),
pending_pipes
(),
welcome_msg
()
{
last_pipe
=
NULL
;
options
.
type
=
ZMQ_XPUB
;
welcome_msg
.
init
();
welcome_msg
.
init
();
}
zmq
::
xpub_t
::~
xpub_t
()
{
welcome_msg
.
close
();
welcome_msg
.
close
();
}
void
zmq
::
xpub_t
::
xattach_pipe
(
pipe_t
*
pipe_
,
bool
subscribe_to_all_
)
...
...
@@ -65,15 +65,16 @@ void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
if
(
subscribe_to_all_
)
subscriptions
.
add
(
NULL
,
0
,
pipe_
);
// if welcome message exist
if
(
welcome_msg
.
size
()
>
0
)
// if welcome message exist
s, send a copy of it
if
(
welcome_msg
.
size
()
>
0
)
{
msg_t
copy
;
copy
.
init
();
copy
.
copy
(
welcome_msg
);
pipe_
->
write
(
&
copy
);
pipe_
->
flush
();
copy
.
init
();
int
rc
=
copy
.
copy
(
welcome_msg
);
errno_assert
(
rc
==
0
);
bool
ok
=
pipe_
->
write
(
&
copy
);
zmq_assert
(
ok
);
pipe_
->
flush
();
}
// The pipe is active when attached. Let's read the subscriptions from
...
...
@@ -95,35 +96,35 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
{
// Store manual subscription to use on termination
if
(
*
data
==
0
)
manual_subscriptions
.
rm
(
data
+
1
,
size
-
1
,
pipe_
);
manual_subscriptions
.
rm
(
data
+
1
,
size
-
1
,
pipe_
);
else
manual_subscriptions
.
add
(
data
+
1
,
size
-
1
,
pipe_
);
manual_subscriptions
.
add
(
data
+
1
,
size
-
1
,
pipe_
);
pending_pipes
.
push_back
(
pipe_
);
pending_data
.
push_back
(
blob_t
(
data
,
size
));
pending_pipes
.
push_back
(
pipe_
);
pending_data
.
push_back
(
blob_t
(
data
,
size
));
if
(
metadata
)
metadata
->
add_ref
();
pending_metadata
.
push_back
(
metadata
);
pending_flags
.
push_back
(
0
);
metadata
->
add_ref
();
pending_metadata
.
push_back
(
metadata
);
pending_flags
.
push_back
(
0
);
}
else
{
bool
unique
;
if
(
*
data
==
0
)
unique
=
subscriptions
.
rm
(
data
+
1
,
size
-
1
,
pipe_
);
unique
=
subscriptions
.
rm
(
data
+
1
,
size
-
1
,
pipe_
);
else
unique
=
subscriptions
.
add
(
data
+
1
,
size
-
1
,
pipe_
);
unique
=
subscriptions
.
add
(
data
+
1
,
size
-
1
,
pipe_
);
// If the (un)subscription is not a duplicate store it so that it can be
// passed to the user on next recv call unless verbose mode is enabled
// which makes to pass always these messages.
if
(
options
.
type
==
ZMQ_XPUB
&&
(
unique
||
(
*
data
==
1
&&
verbose_subs
)
||
(
*
data
==
0
&&
verbose_unsubs
&&
verbose_subs
)))
{
pending_data
.
push_back
(
blob_t
(
data
,
size
));
pending_data
.
push_back
(
blob_t
(
data
,
size
));
if
(
metadata
)
metadata
->
add_ref
();
pending_metadata
.
push_back
(
metadata
);
pending_flags
.
push_back
(
0
);
metadata
->
add_ref
();
pending_metadata
.
push_back
(
metadata
);
pending_flags
.
push_back
(
0
);
}
}
}
...
...
@@ -131,7 +132,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
// Process user message coming upstream from xsub socket
pending_data
.
push_back
(
blob_t
(
data
,
size
));
if
(
metadata
)
metadata
->
add_ref
();
metadata
->
add_ref
();
pending_metadata
.
push_back
(
metadata
);
pending_flags
.
push_back
(
sub
.
flags
());
}
...
...
@@ -151,7 +152,7 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
||
option_
==
ZMQ_XPUB_VERBOSER
||
option_
==
ZMQ_XPUB_NODROP
||
option_
==
ZMQ_XPUB_MANUAL
)
{
if
(
optvallen_
!=
sizeof
(
int
)
||
*
static_cast
<
const
int
*>
(
optval_
)
<
0
)
{
if
(
optvallen_
!=
sizeof
(
int
)
||
*
static_cast
<
const
int
*>
(
optval_
)
<
0
)
{
errno
=
EINVAL
;
return
-
1
;
}
...
...
@@ -174,26 +175,26 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
else
if
(
option_
==
ZMQ_SUBSCRIBE
&&
manual
)
{
if
(
last_pipe
!=
NULL
)
subscriptions
.
add
((
unsigned
char
*
)
optval_
,
optvallen_
,
last_pipe
);
subscriptions
.
add
((
unsigned
char
*
)
optval_
,
optvallen_
,
last_pipe
);
}
else
if
(
option_
==
ZMQ_UNSUBSCRIBE
&&
manual
)
{
if
(
last_pipe
!=
NULL
)
subscriptions
.
rm
((
unsigned
char
*
)
optval_
,
optvallen_
,
last_pipe
);
subscriptions
.
rm
((
unsigned
char
*
)
optval_
,
optvallen_
,
last_pipe
);
}
else
if
(
option_
==
ZMQ_XPUB_WELCOME_MSG
)
{
welcome_msg
.
close
();
welcome_msg
.
close
();
if
(
optvallen_
>
0
)
{
int
rc
=
welcome_msg
.
init_size
(
optvallen_
);
int
rc
=
welcome_msg
.
init_size
(
optvallen_
);
errno_assert
(
rc
==
0
);
unsigned
char
*
data
=
(
unsigned
char
*
)
welcome_msg
.
data
();
memcpy
(
data
,
optval_
,
optvallen_
);
unsigned
char
*
data
=
(
unsigned
char
*
)
welcome_msg
.
data
();
memcpy
(
data
,
optval_
,
optvallen_
);
}
else
welcome_msg
.
init
();
welcome_msg
.
init
();
}
else
{
errno
=
EINVAL
;
...
...
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment