Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
c41daca3
Commit
c41daca3
authored
Nov 21, 2009
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
race condition in inproc transport shutdown fixed
parent
64634605
Show whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
29 additions
and
12 deletions
+29
-12
command.hpp
src/command.hpp
+3
-0
object.cpp
src/object.cpp
+6
-3
object.hpp
src/object.hpp
+3
-3
session.cpp
src/session.cpp
+3
-1
socket_base.cpp
src/socket_base.cpp
+12
-4
socket_base.hpp
src/socket_base.hpp
+2
-1
No files found.
src/command.hpp
View file @
c41daca3
...
@@ -69,9 +69,12 @@ namespace zmq
...
@@ -69,9 +69,12 @@ namespace zmq
}
attach
;
}
attach
;
// Sent from session to socket to establish pipe(s) between them.
// Sent from session to socket to establish pipe(s) between them.
// If adjust_seqnum is true, caller have used inc_seqnum beforehand
// and thus the callee should take care of catching up.
struct
{
struct
{
class
reader_t
*
in_pipe
;
class
reader_t
*
in_pipe
;
class
writer_t
*
out_pipe
;
class
writer_t
*
out_pipe
;
bool
adjust_seqnum
;
}
bind
;
}
bind
;
// Sent by pipe writer to inform dormant pipe reader that there
// Sent by pipe writer to inform dormant pipe reader that there
...
...
src/object.cpp
View file @
c41daca3
...
@@ -83,7 +83,8 @@ void zmq::object_t::process_command (command_t &cmd_)
...
@@ -83,7 +83,8 @@ void zmq::object_t::process_command (command_t &cmd_)
return
;
return
;
case
command_t
:
:
bind
:
case
command_t
:
:
bind
:
process_bind
(
cmd_
.
args
.
bind
.
in_pipe
,
cmd_
.
args
.
bind
.
out_pipe
);
process_bind
(
cmd_
.
args
.
bind
.
in_pipe
,
cmd_
.
args
.
bind
.
out_pipe
,
cmd_
.
args
.
bind
.
adjust_seqnum
);
return
;
return
;
case
command_t
:
:
pipe_term
:
case
command_t
:
:
pipe_term
:
...
@@ -183,13 +184,14 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_)
...
@@ -183,13 +184,14 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_)
}
}
void
zmq
::
object_t
::
send_bind
(
object_t
*
destination_
,
void
zmq
::
object_t
::
send_bind
(
object_t
*
destination_
,
reader_t
*
in_pipe_
,
writer_t
*
out_pipe_
)
reader_t
*
in_pipe_
,
writer_t
*
out_pipe_
,
bool
adjust_seqnum_
)
{
{
command_t
cmd
;
command_t
cmd
;
cmd
.
destination
=
destination_
;
cmd
.
destination
=
destination_
;
cmd
.
type
=
command_t
::
bind
;
cmd
.
type
=
command_t
::
bind
;
cmd
.
args
.
bind
.
in_pipe
=
in_pipe_
;
cmd
.
args
.
bind
.
in_pipe
=
in_pipe_
;
cmd
.
args
.
bind
.
out_pipe
=
out_pipe_
;
cmd
.
args
.
bind
.
out_pipe
=
out_pipe_
;
cmd
.
args
.
bind
.
adjust_seqnum
=
adjust_seqnum_
;
send_command
(
cmd
);
send_command
(
cmd
);
}
}
...
@@ -263,7 +265,8 @@ void zmq::object_t::process_attach (i_engine *engine_)
...
@@ -263,7 +265,8 @@ void zmq::object_t::process_attach (i_engine *engine_)
zmq_assert
(
false
);
zmq_assert
(
false
);
}
}
void
zmq
::
object_t
::
process_bind
(
reader_t
*
in_pipe_
,
writer_t
*
out_pipe_
)
void
zmq
::
object_t
::
process_bind
(
reader_t
*
in_pipe_
,
writer_t
*
out_pipe_
,
bool
adjust_seqnum_
)
{
{
zmq_assert
(
false
);
zmq_assert
(
false
);
}
}
...
...
src/object.hpp
View file @
c41daca3
...
@@ -68,8 +68,8 @@ namespace zmq
...
@@ -68,8 +68,8 @@ namespace zmq
class
owned_t
*
object_
);
class
owned_t
*
object_
);
void
send_attach
(
class
session_t
*
destination_
,
void
send_attach
(
class
session_t
*
destination_
,
struct
i_engine
*
engine_
);
struct
i_engine
*
engine_
);
void
send_bind
(
object_t
*
destination_
,
void
send_bind
(
object_t
*
destination_
,
class
reader_t
*
in_pipe_
,
class
reader_t
*
in_pipe_
,
class
writer_t
*
out_pipe
_
);
class
writer_t
*
out_pipe_
,
bool
adjust_seqnum
_
);
void
send_revive
(
class
object_t
*
destination_
);
void
send_revive
(
class
object_t
*
destination_
);
void
send_pipe_term
(
class
writer_t
*
destination_
);
void
send_pipe_term
(
class
writer_t
*
destination_
);
void
send_pipe_term_ack
(
class
reader_t
*
destination_
);
void
send_pipe_term_ack
(
class
reader_t
*
destination_
);
...
@@ -85,7 +85,7 @@ namespace zmq
...
@@ -85,7 +85,7 @@ namespace zmq
virtual
void
process_own
(
class
owned_t
*
object_
);
virtual
void
process_own
(
class
owned_t
*
object_
);
virtual
void
process_attach
(
struct
i_engine
*
engine_
);
virtual
void
process_attach
(
struct
i_engine
*
engine_
);
virtual
void
process_bind
(
class
reader_t
*
in_pipe_
,
virtual
void
process_bind
(
class
reader_t
*
in_pipe_
,
class
writer_t
*
out_pipe_
);
class
writer_t
*
out_pipe_
,
bool
adjust_seqnum_
);
virtual
void
process_revive
();
virtual
void
process_revive
();
virtual
void
process_pipe_term
();
virtual
void
process_pipe_term
();
virtual
void
process_pipe_term_ack
();
virtual
void
process_pipe_term_ack
();
...
...
src/session.cpp
View file @
c41daca3
...
@@ -155,8 +155,10 @@ void zmq::session_t::process_plug ()
...
@@ -155,8 +155,10 @@ void zmq::session_t::process_plug ()
out_pipe
->
set_endpoint
(
this
);
out_pipe
->
set_endpoint
(
this
);
}
}
// Note that initial call to inc_seqnum was optimised out. Last
// parameter conveys the fact to the callee.
send_bind
(
owner
,
outbound
?
&
outbound
->
reader
:
NULL
,
send_bind
(
owner
,
outbound
?
&
outbound
->
reader
:
NULL
,
inbound
?
&
inbound
->
writer
:
NULL
);
inbound
?
&
inbound
->
writer
:
NULL
,
false
);
}
}
owned_t
::
process_plug
();
owned_t
::
process_plug
();
...
...
src/socket_base.cpp
View file @
c41daca3
...
@@ -158,10 +158,10 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -158,10 +158,10 @@ int zmq::socket_base_t::connect (const char *addr_)
out_pipe
?
&
out_pipe
->
writer
:
NULL
);
out_pipe
?
&
out_pipe
->
writer
:
NULL
);
// Attach the pipes to the peer socket. Note that peer's seqnum
// Attach the pipes to the peer socket. Note that peer's seqnum
// was incremented in find_endpoint function.
When this comman
d
// was incremented in find_endpoint function.
The callee is notifie
d
//
is delivered, peer will consider the seqnum to be processed
.
//
about the fact via the last parameter
.
send_bind
(
peer
,
out_pipe
?
&
out_pipe
->
reader
:
NULL
,
send_bind
(
peer
,
out_pipe
?
&
out_pipe
->
reader
:
NULL
,
in_pipe
?
&
in_pipe
->
writer
:
NULL
);
in_pipe
?
&
in_pipe
->
writer
:
NULL
,
true
);
return
0
;
return
0
;
}
}
...
@@ -509,8 +509,16 @@ void zmq::socket_base_t::process_own (owned_t *object_)
...
@@ -509,8 +509,16 @@ void zmq::socket_base_t::process_own (owned_t *object_)
io_objects
.
insert
(
object_
);
io_objects
.
insert
(
object_
);
}
}
void
zmq
::
socket_base_t
::
process_bind
(
reader_t
*
in_pipe_
,
writer_t
*
out_pipe_
)
void
zmq
::
socket_base_t
::
process_bind
(
reader_t
*
in_pipe_
,
writer_t
*
out_pipe_
,
bool
adjust_seqnum_
)
{
{
// In case of inproc transport, the seqnum should catch up here.
// For other transports the seqnum modification can be optimised out
// because final handshaking between the socket and the session ensures
// that no 'bind' command will be left unprocessed.
if
(
adjust_seqnum_
)
processed_seqnum
++
;
attach_pipes
(
in_pipe_
,
out_pipe_
);
attach_pipes
(
in_pipe_
,
out_pipe_
);
}
}
...
...
src/socket_base.hpp
View file @
c41daca3
...
@@ -114,7 +114,8 @@ namespace zmq
...
@@ -114,7 +114,8 @@ namespace zmq
// Handlers for incoming commands.
// Handlers for incoming commands.
void
process_own
(
class
owned_t
*
object_
);
void
process_own
(
class
owned_t
*
object_
);
void
process_bind
(
class
reader_t
*
in_pipe_
,
class
writer_t
*
out_pipe_
);
void
process_bind
(
class
reader_t
*
in_pipe_
,
class
writer_t
*
out_pipe_
,
bool
adjust_seqnum_
);
void
process_term_req
(
class
owned_t
*
object_
);
void
process_term_req
(
class
owned_t
*
object_
);
void
process_term_ack
();
void
process_term_ack
();
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment