Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
5a497d7d
Commit
5a497d7d
authored
Sep 11, 2014
by
Martin Hurton
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Code cleanup
parent
41a9968c
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
15 additions
and
27 deletions
+15
-27
session_base.cpp
src/session_base.cpp
+11
-20
session_base.hpp
src/session_base.hpp
+1
-4
xpub.cpp
src/xpub.cpp
+3
-3
No files found.
src/session_base.cpp
View file @
5a497d7d
...
@@ -38,7 +38,6 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
...
@@ -38,7 +38,6 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
bool
active_
,
class
socket_base_t
*
socket_
,
const
options_t
&
options_
,
bool
active_
,
class
socket_base_t
*
socket_
,
const
options_t
&
options_
,
address_t
*
addr_
)
address_t
*
addr_
)
{
{
session_base_t
*
s
=
NULL
;
session_base_t
*
s
=
NULL
;
switch
(
options_
.
type
)
{
switch
(
options_
.
type
)
{
case
ZMQ_REQ
:
case
ZMQ_REQ
:
...
@@ -228,14 +227,16 @@ void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
...
@@ -228,14 +227,16 @@ void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
// If we are waiting for pending messages to be sent, at this point
// If we are waiting for pending messages to be sent, at this point
// we are sure that there will be no more messages and we can proceed
// we are sure that there will be no more messages and we can proceed
// with termination safely.
// with termination safely.
if
(
pending
&&
!
pipe
&&
!
zap_pipe
&&
terminating_pipes
.
empty
())
if
(
pending
&&
!
pipe
&&
!
zap_pipe
&&
terminating_pipes
.
empty
())
{
proceed_with_term
();
pending
=
false
;
own_t
::
process_term
(
0
);
}
}
}
void
zmq
::
session_base_t
::
read_activated
(
pipe_t
*
pipe_
)
void
zmq
::
session_base_t
::
read_activated
(
pipe_t
*
pipe_
)
{
{
// Skip activating if we're detaching this pipe
// Skip activating if we're detaching this pipe
if
(
unlikely
(
pipe_
!=
pipe
&&
pipe_
!=
zap_pipe
))
{
if
(
unlikely
(
pipe_
!=
pipe
&&
pipe_
!=
zap_pipe
))
{
zmq_assert
(
terminating_pipes
.
count
(
pipe_
)
==
1
);
zmq_assert
(
terminating_pipes
.
count
(
pipe_
)
==
1
);
return
;
return
;
}
}
...
@@ -354,9 +355,9 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
...
@@ -354,9 +355,9 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
// Remember the local end of the pipe.
// Remember the local end of the pipe.
zmq_assert
(
!
pipe
);
zmq_assert
(
!
pipe
);
pipe
=
pipes
[
0
];
pipe
=
pipes
[
0
];
// Store engine assoc_fd for li
lnking pipe to fd
// Store engine assoc_fd for li
nking pipe to fd
pipe
->
assoc_fd
=
engine_
->
get_assoc_fd
();
pipe
->
assoc_fd
=
engine_
->
get_assoc_fd
();
pipes
[
1
]
->
assoc_fd
=
pipe
->
assoc_fd
;
pipes
[
1
]
->
assoc_fd
=
pipe
->
assoc_fd
;
// Ask socket to plug into the remote end of the pipe.
// Ask socket to plug into the remote end of the pipe.
send_bind
(
socket
,
pipes
[
1
]);
send_bind
(
socket
,
pipes
[
1
]);
}
}
...
@@ -409,8 +410,8 @@ void zmq::session_base_t::process_term (int linger_)
...
@@ -409,8 +410,8 @@ void zmq::session_base_t::process_term (int linger_)
// If the termination of the pipe happens before the term command is
// If the termination of the pipe happens before the term command is
// delivered there's nothing much to do. We can proceed with the
// delivered there's nothing much to do. We can proceed with the
// standard termination immediately.
// standard termination immediately.
if
(
!
pipe
&&
!
zap_pipe
)
{
if
(
!
pipe
&&
!
zap_pipe
&&
terminating_pipes
.
empty
()
)
{
proceed_with_term
(
);
own_t
::
process_term
(
0
);
return
;
return
;
}
}
...
@@ -440,15 +441,6 @@ void zmq::session_base_t::process_term (int linger_)
...
@@ -440,15 +441,6 @@ void zmq::session_base_t::process_term (int linger_)
zap_pipe
->
terminate
(
false
);
zap_pipe
->
terminate
(
false
);
}
}
void
zmq
::
session_base_t
::
proceed_with_term
()
{
// The pending phase has just ended.
pending
=
false
;
// Continue with standard termination.
own_t
::
process_term
(
0
);
}
void
zmq
::
session_base_t
::
timer_event
(
int
id_
)
void
zmq
::
session_base_t
::
timer_event
(
int
id_
)
{
{
// Linger period expired. We can proceed with termination even though
// Linger period expired. We can proceed with termination even though
...
@@ -580,8 +572,7 @@ void zmq::session_base_t::start_connecting (bool wait_)
...
@@ -580,8 +572,7 @@ void zmq::session_base_t::start_connecting (bool wait_)
#endif
#endif
#ifdef ZMQ_HAVE_NORM
#ifdef ZMQ_HAVE_NORM
if
(
addr
->
protocol
==
"norm"
)
if
(
addr
->
protocol
==
"norm"
)
{
{
// At this point we'll create message pipes to the session straight
// At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect'
// away. There's no point in delaying it as no concept of 'connect'
// exists with NORM anyway.
// exists with NORM anyway.
...
...
src/session_base.hpp
View file @
5a497d7d
...
@@ -112,12 +112,9 @@ namespace zmq
...
@@ -112,12 +112,9 @@ namespace zmq
// Call this function when engine disconnect to get rid of leftovers.
// Call this function when engine disconnect to get rid of leftovers.
void
clean_pipes
();
void
clean_pipes
();
// Call this function to move on with the delayed process_term.
void
proceed_with_term
();
// If true, this session (re)connects to the peer. Otherwise, it's
// If true, this session (re)connects to the peer. Otherwise, it's
// a transient session created by the listener.
// a transient session created by the listener.
bool
active
;
const
bool
active
;
// Pipe connecting the session to its socket.
// Pipe connecting the session to its socket.
zmq
::
pipe_t
*
pipe
;
zmq
::
pipe_t
*
pipe
;
...
...
src/xpub.cpp
View file @
5a497d7d
...
@@ -26,11 +26,11 @@
...
@@ -26,11 +26,11 @@
zmq
::
xpub_t
::
xpub_t
(
class
ctx_t
*
parent_
,
uint32_t
tid_
,
int
sid_
)
:
zmq
::
xpub_t
::
xpub_t
(
class
ctx_t
*
parent_
,
uint32_t
tid_
,
int
sid_
)
:
socket_base_t
(
parent_
,
tid_
,
sid_
),
socket_base_t
(
parent_
,
tid_
,
sid_
),
verbose
(
false
),
verbose
(
false
),
more
(
false
)
more
(
false
),
lossy
(
true
)
{
{
options
.
type
=
ZMQ_XPUB
;
options
.
type
=
ZMQ_XPUB
;
lossy
=
true
;
}
}
zmq
::
xpub_t
::~
xpub_t
()
zmq
::
xpub_t
::~
xpub_t
()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment