Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
51bf2aff
Unverified
Commit
51bf2aff
authored
Dec 04, 2019
by
Luca Boccassi
Committed by
GitHub
Dec 04, 2019
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #3746 from drolevar/master
Change XSUB -> XPUB message processing.
parents
85df7558
5a854780
Show whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
107 additions
and
24 deletions
+107
-24
zmq_setsockopt.txt
doc/zmq_setsockopt.txt
+16
-0
zmq.h
include/zmq.h
+1
-0
xpub.cpp
src/xpub.cpp
+12
-3
xpub.hpp
src/xpub.hpp
+9
-0
xsub.cpp
src/xsub.cpp
+28
-3
xsub.hpp
src/xsub.hpp
+10
-0
zmq_draft.h
src/zmq_draft.h
+2
-0
test_xpub_manual.cpp
tests/test_xpub_manual.cpp
+29
-18
No files found.
doc/zmq_setsockopt.txt
View file @
51bf2aff
...
@@ -1154,6 +1154,22 @@ Default value:: NULL
...
@@ -1154,6 +1154,22 @@ Default value:: NULL
Applicable socket types:: ZMQ_XPUB
Applicable socket types:: ZMQ_XPUB
ZMQ_ONLY_FIRST_SUBSCRIBE: Process only fist subscribe/unsubscribe in a multipart message
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If set, only the first part of the multipart message is processed as
a subscribe/unsubscribe message. The rest are forwarded as user data
regardless of message contents.
It not set (default), subscribe/unsubscribe messages in a multipart message
are processed as such regardless of their number and order.
[horizontal]
Option value type:: int
Option value unit:: boolean
Default value:: 0 (false)
Applicable socket types:: ZMQ_XSUB, ZMQ_XPUB
ZMQ_ZAP_DOMAIN: Set RFC 27 authentication domain
ZMQ_ZAP_DOMAIN: Set RFC 27 authentication domain
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the domain for ZAP (ZMQ RFC 27) authentication. A ZAP domain must be
Sets the domain for ZAP (ZMQ RFC 27) authentication. A ZAP domain must be
...
...
include/zmq.h
View file @
51bf2aff
...
@@ -675,6 +675,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_);
...
@@ -675,6 +675,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_);
#define ZMQ_WSS_TRUST_PEM 105
#define ZMQ_WSS_TRUST_PEM 105
#define ZMQ_WSS_HOSTNAME 106
#define ZMQ_WSS_HOSTNAME 106
#define ZMQ_WSS_TRUST_SYSTEM 107
#define ZMQ_WSS_TRUST_SYSTEM 107
#define ZMQ_ONLY_FIRST_SUBSCRIBE 108
/* DRAFT Context options */
/* DRAFT Context options */
...
...
src/xpub.cpp
View file @
51bf2aff
...
@@ -43,6 +43,8 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
...
@@ -43,6 +43,8 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
_verbose_unsubs
(
false
),
_verbose_unsubs
(
false
),
_more_send
(
false
),
_more_send
(
false
),
_more_recv
(
false
),
_more_recv
(
false
),
_process_subscribe
(
false
),
_only_first_subscribe
(
false
),
_lossy
(
true
),
_lossy
(
true
),
_manual
(
false
),
_manual
(
false
),
_send_last_pipe
(
false
),
_send_last_pipe
(
false
),
...
@@ -101,7 +103,10 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
...
@@ -101,7 +103,10 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
bool
subscribe
=
false
;
bool
subscribe
=
false
;
bool
is_subscribe_or_cancel
=
false
;
bool
is_subscribe_or_cancel
=
false
;
if
(
!
_more_recv
)
{
bool
first_part
=
!
_more_recv
;
_more_recv
=
(
msg
.
flags
()
&
msg_t
::
more
)
!=
0
;
if
(
first_part
||
_process_subscribe
)
{
// Apply the subscription to the trie
// Apply the subscription to the trie
if
(
msg
.
is_subscribe
()
||
msg
.
is_cancel
())
{
if
(
msg
.
is_subscribe
()
||
msg
.
is_cancel
())
{
data
=
static_cast
<
unsigned
char
*>
(
msg
.
command_body
());
data
=
static_cast
<
unsigned
char
*>
(
msg
.
command_body
());
...
@@ -116,7 +121,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
...
@@ -116,7 +121,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
}
}
}
}
_more_recv
=
(
msg
.
flags
()
&
msg_t
::
more
)
!=
0
;
if
(
first_part
)
_process_subscribe
=
!
_only_first_subscribe
||
is_subscribe_or_cancel
;
if
(
!
is_subscribe_or_cancel
)
{
if
(
!
is_subscribe_or_cancel
)
{
// Process user message coming upstream from xsub socket
// Process user message coming upstream from xsub socket
...
@@ -199,7 +206,7 @@ int zmq::xpub_t::xsetsockopt (int option_,
...
@@ -199,7 +206,7 @@ int zmq::xpub_t::xsetsockopt (int option_,
{
{
if
(
option_
==
ZMQ_XPUB_VERBOSE
||
option_
==
ZMQ_XPUB_VERBOSER
if
(
option_
==
ZMQ_XPUB_VERBOSE
||
option_
==
ZMQ_XPUB_VERBOSER
||
option_
==
ZMQ_XPUB_MANUAL_LAST_VALUE
||
option_
==
ZMQ_XPUB_NODROP
||
option_
==
ZMQ_XPUB_MANUAL_LAST_VALUE
||
option_
==
ZMQ_XPUB_NODROP
||
option_
==
ZMQ_XPUB_MANUAL
)
{
||
option_
==
ZMQ_XPUB_MANUAL
||
option_
==
ZMQ_ONLY_FIRST_SUBSCRIBE
)
{
if
(
optvallen_
!=
sizeof
(
int
)
if
(
optvallen_
!=
sizeof
(
int
)
||
*
static_cast
<
const
int
*>
(
optval_
)
<
0
)
{
||
*
static_cast
<
const
int
*>
(
optval_
)
<
0
)
{
errno
=
EINVAL
;
errno
=
EINVAL
;
...
@@ -218,6 +225,8 @@ int zmq::xpub_t::xsetsockopt (int option_,
...
@@ -218,6 +225,8 @@ int zmq::xpub_t::xsetsockopt (int option_,
_lossy
=
(
*
static_cast
<
const
int
*>
(
optval_
)
==
0
);
_lossy
=
(
*
static_cast
<
const
int
*>
(
optval_
)
==
0
);
else
if
(
option_
==
ZMQ_XPUB_MANUAL
)
else
if
(
option_
==
ZMQ_XPUB_MANUAL
)
_manual
=
(
*
static_cast
<
const
int
*>
(
optval_
)
!=
0
);
_manual
=
(
*
static_cast
<
const
int
*>
(
optval_
)
!=
0
);
else
if
(
option_
==
ZMQ_ONLY_FIRST_SUBSCRIBE
)
_only_first_subscribe
=
(
*
static_cast
<
const
int
*>
(
optval_
)
!=
0
);
}
else
if
(
option_
==
ZMQ_SUBSCRIBE
&&
_manual
)
{
}
else
if
(
option_
==
ZMQ_SUBSCRIBE
&&
_manual
)
{
if
(
_last_pipe
!=
NULL
)
if
(
_last_pipe
!=
NULL
)
_subscriptions
.
add
((
unsigned
char
*
)
optval_
,
optvallen_
,
_subscriptions
.
add
((
unsigned
char
*
)
optval_
,
optvallen_
,
...
...
src/xpub.hpp
View file @
51bf2aff
...
@@ -96,6 +96,15 @@ class xpub_t : public socket_base_t
...
@@ -96,6 +96,15 @@ class xpub_t : public socket_base_t
// True if we are in the middle of receiving a multi-part message.
// True if we are in the middle of receiving a multi-part message.
bool
_more_recv
;
bool
_more_recv
;
// If true, subscribe and cancel messages are processed for the rest
// of multipart message.
bool
_process_subscribe
;
// This option is enabled with ZMQ_ONLY_FIRST_SUBSCRIBE.
// If true, messages following subscribe/unsubscribe in a multipart
// message are treated as user data regardless of the first byte.
bool
_only_first_subscribe
;
// Drop messages if HWM reached, otherwise return with EAGAIN
// Drop messages if HWM reached, otherwise return with EAGAIN
bool
_lossy
;
bool
_lossy
;
...
...
src/xsub.cpp
View file @
51bf2aff
...
@@ -38,7 +38,9 @@ zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
...
@@ -38,7 +38,9 @@ zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t
(
parent_
,
tid_
,
sid_
),
socket_base_t
(
parent_
,
tid_
,
sid_
),
_has_message
(
false
),
_has_message
(
false
),
_more_send
(
false
),
_more_send
(
false
),
_more_recv
(
false
)
_more_recv
(
false
),
_process_subscribe
(
false
),
_only_first_subscribe
(
false
)
{
{
options
.
type
=
ZMQ_XSUB
;
options
.
type
=
ZMQ_XSUB
;
...
@@ -95,17 +97,38 @@ void zmq::xsub_t::xhiccuped (pipe_t *pipe_)
...
@@ -95,17 +97,38 @@ void zmq::xsub_t::xhiccuped (pipe_t *pipe_)
pipe_
->
flush
();
pipe_
->
flush
();
}
}
int
zmq
::
xsub_t
::
xsetsockopt
(
int
option_
,
const
void
*
optval_
,
size_t
optvallen_
)
{
if
(
option_
==
ZMQ_ONLY_FIRST_SUBSCRIBE
)
{
if
(
optvallen_
!=
sizeof
(
int
)
||
*
static_cast
<
const
int
*>
(
optval_
)
<
0
)
{
errno
=
EINVAL
;
return
-
1
;
}
_only_first_subscribe
=
(
*
static_cast
<
const
int
*>
(
optval_
)
!=
0
);
return
0
;
}
else
{
errno
=
EINVAL
;
return
-
1
;
}
}
int
zmq
::
xsub_t
::
xsend
(
msg_t
*
msg_
)
int
zmq
::
xsub_t
::
xsend
(
msg_t
*
msg_
)
{
{
size_t
size
=
msg_
->
size
();
size_t
size
=
msg_
->
size
();
unsigned
char
*
data
=
static_cast
<
unsigned
char
*>
(
msg_
->
data
());
unsigned
char
*
data
=
static_cast
<
unsigned
char
*>
(
msg_
->
data
());
bool
send_more
=
_more_send
;
bool
first_part
=
!
_more_send
;
_more_send
=
(
msg_
->
flags
()
&
msg_t
::
more
)
!=
0
;
_more_send
=
(
msg_
->
flags
()
&
msg_t
::
more
)
!=
0
;
if
(
send_more
)
if
(
first_part
)
{
_process_subscribe
=
!
_only_first_subscribe
;
}
else
if
(
!
_process_subscribe
)
{
// User message sent upstream to XPUB socket
// User message sent upstream to XPUB socket
return
_dist
.
send_to_all
(
msg_
);
return
_dist
.
send_to_all
(
msg_
);
}
if
(
msg_
->
is_subscribe
()
||
(
size
>
0
&&
*
data
==
1
))
{
if
(
msg_
->
is_subscribe
()
||
(
size
>
0
&&
*
data
==
1
))
{
// Process subscribe message
// Process subscribe message
...
@@ -121,6 +144,7 @@ int zmq::xsub_t::xsend (msg_t *msg_)
...
@@ -121,6 +144,7 @@ int zmq::xsub_t::xsend (msg_t *msg_)
size
=
size
-
1
;
size
=
size
-
1
;
}
}
_subscriptions
.
add
(
data
,
size
);
_subscriptions
.
add
(
data
,
size
);
_process_subscribe
=
true
;
return
_dist
.
send_to_all
(
msg_
);
return
_dist
.
send_to_all
(
msg_
);
}
}
if
(
msg_
->
is_cancel
()
||
(
size
>
0
&&
*
data
==
0
))
{
if
(
msg_
->
is_cancel
()
||
(
size
>
0
&&
*
data
==
0
))
{
...
@@ -132,6 +156,7 @@ int zmq::xsub_t::xsend (msg_t *msg_)
...
@@ -132,6 +156,7 @@ int zmq::xsub_t::xsend (msg_t *msg_)
data
=
data
+
1
;
data
=
data
+
1
;
size
=
size
-
1
;
size
=
size
-
1
;
}
}
_process_subscribe
=
true
;
if
(
_subscriptions
.
rm
(
data
,
size
))
if
(
_subscriptions
.
rm
(
data
,
size
))
return
_dist
.
send_to_all
(
msg_
);
return
_dist
.
send_to_all
(
msg_
);
}
else
}
else
...
...
src/xsub.hpp
100755 → 100644
View file @
51bf2aff
...
@@ -57,6 +57,7 @@ class xsub_t : public socket_base_t
...
@@ -57,6 +57,7 @@ class xsub_t : public socket_base_t
void
xattach_pipe
(
zmq
::
pipe_t
*
pipe_
,
void
xattach_pipe
(
zmq
::
pipe_t
*
pipe_
,
bool
subscribe_to_all_
,
bool
subscribe_to_all_
,
bool
locally_initiated_
);
bool
locally_initiated_
);
int
xsetsockopt
(
int
option_
,
const
void
*
optval_
,
size_t
optvallen_
);
int
xsend
(
zmq
::
msg_t
*
msg_
);
int
xsend
(
zmq
::
msg_t
*
msg_
);
bool
xhas_out
();
bool
xhas_out
();
int
xrecv
(
zmq
::
msg_t
*
msg_
);
int
xrecv
(
zmq
::
msg_t
*
msg_
);
...
@@ -101,6 +102,15 @@ class xsub_t : public socket_base_t
...
@@ -101,6 +102,15 @@ class xsub_t : public socket_base_t
// there are following parts still waiting.
// there are following parts still waiting.
bool
_more_recv
;
bool
_more_recv
;
// If true, subscribe and cancel messages are processed for the rest
// of multipart message.
bool
_process_subscribe
;
// This option is enabled with ZMQ_ONLY_FIRST_SUBSCRIBE.
// If true, messages following subscribe/unsubscribe in a multipart
// message are treated as user data regardless of the first byte.
bool
_only_first_subscribe
;
xsub_t
(
const
xsub_t
&
);
xsub_t
(
const
xsub_t
&
);
const
xsub_t
&
operator
=
(
const
xsub_t
&
);
const
xsub_t
&
operator
=
(
const
xsub_t
&
);
};
};
...
...
src/zmq_draft.h
View file @
51bf2aff
...
@@ -57,6 +57,8 @@
...
@@ -57,6 +57,8 @@
#define ZMQ_SOCKS_PASSWORD 100
#define ZMQ_SOCKS_PASSWORD 100
#define ZMQ_IN_BATCH_SIZE 101
#define ZMQ_IN_BATCH_SIZE 101
#define ZMQ_OUT_BATCH_SIZE 102
#define ZMQ_OUT_BATCH_SIZE 102
#define ZMQ_ONLY_FIRST_SUBSCRIBE 108
/* DRAFT Context options */
/* DRAFT Context options */
#define ZMQ_ZERO_COPY_RECV 10
#define ZMQ_ZERO_COPY_RECV 10
...
...
tests/test_xpub_manual.cpp
View file @
51bf2aff
...
@@ -456,57 +456,66 @@ void test_user_message ()
...
@@ -456,57 +456,66 @@ void test_user_message ()
test_context_socket_close
(
sub
);
test_context_socket_close
(
sub
);
}
}
#ifdef ZMQ_ONLY_FIRST_SUBSCRIBE
void
test_user_message_multi
()
void
test_user_message_multi
()
{
{
const
int
only_first_subscribe
=
1
;
// Create a publisher
// Create a publisher
void
*
pub
=
test_context_socket
(
ZMQ_XPUB
);
void
*
pub
=
test_context_socket
(
ZMQ_XPUB
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
pub
,
"inproc://soname"
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_bind
(
pub
,
"inproc://soname"
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
pub
,
ZMQ_ONLY_FIRST_SUBSCRIBE
,
&
only_first_subscribe
,
sizeof
(
only_first_subscribe
)));
// Create a subscriber
// Create a subscriber
void
*
sub
=
test_context_socket
(
ZMQ_XSUB
);
void
*
sub
=
test_context_socket
(
ZMQ_XSUB
);
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
sub
,
"inproc://soname"
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_connect
(
sub
,
"inproc://soname"
));
TEST_ASSERT_SUCCESS_ERRNO
(
zmq_setsockopt
(
sub
,
ZMQ_ONLY_FIRST_SUBSCRIBE
,
&
only_first_subscribe
,
sizeof
(
only_first_subscribe
)));
// Send some data that is neither sub nor unsub
// Send some data that is neither sub nor unsub
const
uint8_t
msg_common
[]
=
{
'A'
,
'B'
,
'C'
};
const
uint8_t
msg_common
[]
=
{
'A'
,
'B'
,
'C'
};
// Message starts with 0 but should still treated as user
// Message starts with 0 but should still treated as user
const
uint8_t
msg_0
[]
=
{
0
,
'B'
,
'C'
};
const
uint8_t
msg_0a
[]
=
{
0
,
'B'
,
'C'
};
const
uint8_t
msg_0b
[]
=
{
0
,
'C'
,
'D'
};
// Message starts with 1 but should still treated as user
// Message starts with 1 but should still treated as user
const
uint8_t
msg_1
[]
=
{
1
,
'B'
,
'C'
};
const
uint8_t
msg_1a
[]
=
{
1
,
'B'
,
'C'
};
const
uint8_t
msg_1b
[]
=
{
1
,
'C'
,
'D'
};
// Test second message starting with 0
// Test second message starting with 0
send_array_expect_success
(
sub
,
msg_common
,
ZMQ_SNDMORE
);
send_array_expect_success
(
sub
,
msg_common
,
ZMQ_SNDMORE
);
send_array_expect_success
(
sub
,
msg_0
,
0
);
send_array_expect_success
(
sub
,
msg_0
a
,
0
);
// Receive messages from subscriber
// Receive messages from subscriber
recv_array_expect_success
(
pub
,
msg_common
,
0
);
recv_array_expect_success
(
pub
,
msg_common
,
0
);
recv_array_expect_success
(
pub
,
msg_0
,
0
);
recv_array_expect_success
(
pub
,
msg_0
a
,
0
);
// Test second message starting with 1
// Test second message starting with 1
send_array_expect_success
(
sub
,
msg_common
,
ZMQ_SNDMORE
);
send_array_expect_success
(
sub
,
msg_common
,
ZMQ_SNDMORE
);
send_array_expect_success
(
sub
,
msg_1
,
0
);
send_array_expect_success
(
sub
,
msg_1
a
,
0
);
// Receive messages from subscriber
// Receive messages from subscriber
recv_array_expect_success
(
pub
,
msg_common
,
0
);
recv_array_expect_success
(
pub
,
msg_common
,
0
);
recv_array_expect_success
(
pub
,
msg_1
,
0
);
recv_array_expect_success
(
pub
,
msg_1a
,
0
);
char
buffer
[
255
];
// Test first message starting with 0
send_array_expect_success
(
sub
,
msg_0
,
0
);
// wait
msleep
(
SETTLE_TIME
);
int
rc
=
zmq_recv
(
pub
,
buffer
,
sizeof
(
buffer
),
ZMQ_DONTWAIT
);
TEST_ASSERT_EQUAL_INT
(
-
1
,
rc
);
// Test first message starting with 1
// Test first message starting with 1
send_array_expect_success
(
sub
,
msg_1
,
0
);
send_array_expect_success
(
sub
,
msg_1a
,
ZMQ_SNDMORE
);
recv_array_expect_success
(
pub
,
msg_1
,
0
);
send_array_expect_success
(
sub
,
msg_1b
,
0
);
recv_array_expect_success
(
pub
,
msg_1a
,
0
);
recv_array_expect_success
(
pub
,
msg_1b
,
0
);
send_array_expect_success
(
sub
,
msg_0a
,
ZMQ_SNDMORE
);
send_array_expect_success
(
sub
,
msg_0b
,
0
);
recv_array_expect_success
(
pub
,
msg_0a
,
0
);
recv_array_expect_success
(
pub
,
msg_0b
,
0
);
// Clean up.
// Clean up.
test_context_socket_close
(
pub
);
test_context_socket_close
(
pub
);
test_context_socket_close
(
sub
);
test_context_socket_close
(
sub
);
}
}
#endif
int
main
()
int
main
()
{
{
...
@@ -519,7 +528,9 @@ int main ()
...
@@ -519,7 +528,9 @@ int main ()
RUN_TEST
(
test_missing_subscriptions
);
RUN_TEST
(
test_missing_subscriptions
);
RUN_TEST
(
test_unsubscribe_cleanup
);
RUN_TEST
(
test_unsubscribe_cleanup
);
RUN_TEST
(
test_user_message
);
RUN_TEST
(
test_user_message
);
#ifdef ZMQ_ONLY_FIRST_SUBSCRIBE
RUN_TEST
(
test_user_message_multi
);
RUN_TEST
(
test_user_message_multi
);
#endif
return
UNITY_END
();
return
UNITY_END
();
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment