Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
a24a7c15
Commit
a24a7c15
authored
May 31, 2011
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Session termination induced by socket fixed
Signed-off-by:
Martin Sustrik
<
sustrik@250bpm.com
>
parent
0b59866a
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
23 additions
and
25 deletions
+23
-25
pipe.cpp
src/pipe.cpp
+12
-6
pipe.hpp
src/pipe.hpp
+3
-2
session.cpp
src/session.cpp
+6
-15
socket_base.cpp
src/socket_base.cpp
+2
-2
No files found.
src/pipe.cpp
View file @
a24a7c15
...
@@ -301,8 +301,11 @@ void zmq::pipe_t::process_pipe_term_ack ()
...
@@ -301,8 +301,11 @@ void zmq::pipe_t::process_pipe_term_ack ()
delete
this
;
delete
this
;
}
}
void
zmq
::
pipe_t
::
terminate
()
void
zmq
::
pipe_t
::
terminate
(
bool
delay_
)
{
{
// Overload the value specified at pipe creation.
delay
=
delay_
;
// If terminate was already called, we can ignore the duplicit invocation.
// If terminate was already called, we can ignore the duplicit invocation.
if
(
state
==
terminated
||
state
==
double_terminated
)
if
(
state
==
terminated
||
state
==
double_terminated
)
return
;
return
;
...
@@ -321,9 +324,13 @@ void zmq::pipe_t::terminate ()
...
@@ -321,9 +324,13 @@ void zmq::pipe_t::terminate ()
// There are still pending messages available, but the user calls
// There are still pending messages available, but the user calls
// 'terminate'. We can act as if all the pending messages were read.
// 'terminate'. We can act as if all the pending messages were read.
else
if
(
state
==
pending
)
{
else
if
(
state
==
pending
&&
!
delay
)
{
send_pipe_term_ack
(
peer
);
send_pipe_term_ack
(
peer
);
state
=
terminating
;
state
=
terminating
;
}
// If there are pending messages still availabe, do nothing.
else
if
(
state
==
pending
&&
delay
)
{
}
}
// We've already got delimiter, but not term command yet. We can ignore
// We've already got delimiter, but not term command yet. We can ignore
...
@@ -338,8 +345,7 @@ void zmq::pipe_t::terminate ()
...
@@ -338,8 +345,7 @@ void zmq::pipe_t::terminate ()
else
else
zmq_assert
(
false
);
zmq_assert
(
false
);
// Stop inbound and outbound flow of messages.
// Stop outbound flow of messages.
in_active
=
false
;
out_active
=
false
;
out_active
=
false
;
// Rollback any unfinished outbound messages.
// Rollback any unfinished outbound messages.
...
...
src/pipe.hpp
View file @
a24a7c15
...
@@ -94,8 +94,9 @@ namespace zmq
...
@@ -94,8 +94,9 @@ namespace zmq
// Ask pipe to terminate. The termination will happen asynchronously
// Ask pipe to terminate. The termination will happen asynchronously
// and user will be notified about actual deallocation by 'terminated'
// and user will be notified about actual deallocation by 'terminated'
// event.
// event. If delay is true, the pending messages will be processed
void
terminate
();
// before actual shutdown.
void
terminate
(
bool
delay_
);
private
:
private
:
...
...
src/session.cpp
View file @
a24a7c15
...
@@ -228,13 +228,6 @@ void zmq::session_t::process_term (int linger_)
...
@@ -228,13 +228,6 @@ void zmq::session_t::process_term (int linger_)
return
;
return
;
}
}
// If linger is set to zero, we can ask pipe to terminate without
// waiting for pending messages to be read.
if
(
linger_
==
0
)
{
proceed_with_term
();
return
;
}
pending
=
true
;
pending
=
true
;
// If there's finite linger value, delay the termination.
// If there's finite linger value, delay the termination.
...
@@ -246,6 +239,11 @@ void zmq::session_t::process_term (int linger_)
...
@@ -246,6 +239,11 @@ void zmq::session_t::process_term (int linger_)
has_linger_timer
=
true
;
has_linger_timer
=
true
;
}
}
// Start pipe termination process. Delay the termination till all messages
// are processed in case the linger time is non-zero.
pipe
->
terminate
(
linger_
!=
0
);
// TODO: Should this go into pipe_t::terminate ?
// In case there's no engine and there's only delimiter in the
// In case there's no engine and there's only delimiter in the
// pipe it wouldn't be ever read. Thus we check for it explicitly.
// pipe it wouldn't be ever read. Thus we check for it explicitly.
pipe
->
check_read
();
pipe
->
check_read
();
...
@@ -256,13 +254,6 @@ void zmq::session_t::proceed_with_term ()
...
@@ -256,13 +254,6 @@ void zmq::session_t::proceed_with_term ()
// The pending phase have just ended.
// The pending phase have just ended.
pending
=
false
;
pending
=
false
;
// If there's pipe attached to the session, we have to wait till it
// terminates.
if
(
pipe
)
{
register_term_acks
(
1
);
pipe
->
terminate
();
}
// Continue with standard termination.
// Continue with standard termination.
own_t
::
process_term
(
0
);
own_t
::
process_term
(
0
);
}
}
...
@@ -276,7 +267,7 @@ void zmq::session_t::timer_event (int id_)
...
@@ -276,7 +267,7 @@ void zmq::session_t::timer_event (int id_)
// Ask pipe to terminate even though there may be pending messages in it.
// Ask pipe to terminate even though there may be pending messages in it.
zmq_assert
(
pipe
);
zmq_assert
(
pipe
);
p
roceed_with_term
(
);
p
ipe
->
terminate
(
false
);
}
}
bool
zmq
::
session_t
::
has_engine
()
bool
zmq
::
session_t
::
has_engine
()
...
...
src/socket_base.cpp
View file @
a24a7c15
...
@@ -233,7 +233,7 @@ void zmq::socket_base_t::attach_pipe (pipe_t *pipe_,
...
@@ -233,7 +233,7 @@ void zmq::socket_base_t::attach_pipe (pipe_t *pipe_,
// straight away.
// straight away.
if
(
is_terminating
())
{
if
(
is_terminating
())
{
register_term_acks
(
1
);
register_term_acks
(
1
);
pipe_
->
terminate
();
pipe_
->
terminate
(
false
);
}
}
}
}
...
@@ -740,7 +740,7 @@ void zmq::socket_base_t::process_term (int linger_)
...
@@ -740,7 +740,7 @@ void zmq::socket_base_t::process_term (int linger_)
// Ask all attached pipes to terminate.
// Ask all attached pipes to terminate.
for
(
pipes_t
::
size_type
i
=
0
;
i
!=
pipes
.
size
();
++
i
)
for
(
pipes_t
::
size_type
i
=
0
;
i
!=
pipes
.
size
();
++
i
)
pipes
[
i
]
->
terminate
();
pipes
[
i
]
->
terminate
(
false
);
register_term_acks
(
pipes
.
size
());
register_term_acks
(
pipes
.
size
());
// Continue the termination process immediately.
// Continue the termination process immediately.
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment