Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
b8d8c498
Commit
b8d8c498
authored
Jan 11, 2017
by
Constantin Rack
Committed by
GitHub
Jan 11, 2017
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #2302 from laplaceyang/pr_thread_safe_modify_pipe
Problem: Thread-safe solution for modify hwm of pipe
parents
4fc313d1
107f2441
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
43 additions
and
0 deletions
+43
-0
command.hpp
src/command.hpp
+7
-0
object.cpp
src/object.cpp
+19
-0
object.hpp
src/object.hpp
+2
-0
pipe.cpp
src/pipe.cpp
+10
-0
pipe.hpp
src/pipe.hpp
+4
-0
socket_base.cpp
src/socket_base.cpp
+1
-0
No files found.
src/command.hpp
View file @
b8d8c498
...
@@ -64,6 +64,7 @@ namespace zmq
...
@@ -64,6 +64,7 @@ namespace zmq
hiccup
,
hiccup
,
pipe_term
,
pipe_term
,
pipe_term_ack
,
pipe_term_ack
,
pipe_hwm
,
term_req
,
term_req
,
term
,
term
,
term_ack
,
term_ack
,
...
@@ -129,6 +130,12 @@ namespace zmq
...
@@ -129,6 +130,12 @@ namespace zmq
struct
{
struct
{
}
pipe_term_ack
;
}
pipe_term_ack
;
// Sent by one of pipe to another part for modify hwm
struct
{
int
inhwm
;
int
outhwm
;
}
pipe_hwm
;
// Sent by I/O object ot the socket to request the shutdown of
// Sent by I/O object ot the socket to request the shutdown of
// the I/O object.
// the I/O object.
struct
{
struct
{
...
...
src/object.cpp
View file @
b8d8c498
...
@@ -118,6 +118,10 @@ void zmq::object_t::process_command (command_t &cmd_)
...
@@ -118,6 +118,10 @@ void zmq::object_t::process_command (command_t &cmd_)
process_pipe_term_ack
();
process_pipe_term_ack
();
break
;
break
;
case
command_t
:
:
pipe_hwm
:
process_pipe_hwm
(
cmd_
.
args
.
pipe_hwm
.
inhwm
,
cmd_
.
args
.
pipe_hwm
.
outhwm
);
break
;
case
command_t
:
:
term_req
:
case
command_t
:
:
term_req
:
process_term_req
(
cmd_
.
args
.
term_req
.
object
);
process_term_req
(
cmd_
.
args
.
term_req
.
object
);
break
;
break
;
...
@@ -291,6 +295,16 @@ void zmq::object_t::send_pipe_term_ack (pipe_t *destination_)
...
@@ -291,6 +295,16 @@ void zmq::object_t::send_pipe_term_ack (pipe_t *destination_)
send_command
(
cmd
);
send_command
(
cmd
);
}
}
void
zmq
::
object_t
::
send_pipe_hwm
(
pipe_t
*
destination_
,
int
inhwm_
,
int
outhwm_
)
{
command_t
cmd
;
cmd
.
destination
=
destination_
;
cmd
.
type
=
command_t
::
pipe_hwm
;
cmd
.
args
.
pipe_hwm
.
inhwm
=
inhwm_
;
cmd
.
args
.
pipe_hwm
.
outhwm
=
outhwm_
;
send_command
(
cmd
);
}
void
zmq
::
object_t
::
send_term_req
(
own_t
*
destination_
,
void
zmq
::
object_t
::
send_term_req
(
own_t
*
destination_
,
own_t
*
object_
)
own_t
*
object_
)
{
{
...
@@ -401,6 +415,11 @@ void zmq::object_t::process_pipe_term_ack ()
...
@@ -401,6 +415,11 @@ void zmq::object_t::process_pipe_term_ack ()
zmq_assert
(
false
);
zmq_assert
(
false
);
}
}
void
zmq
::
object_t
::
process_pipe_hwm
(
int
,
int
)
{
zmq_assert
(
false
);
}
void
zmq
::
object_t
::
process_term_req
(
own_t
*
)
void
zmq
::
object_t
::
process_term_req
(
own_t
*
)
{
{
zmq_assert
(
false
);
zmq_assert
(
false
);
...
...
src/object.hpp
View file @
b8d8c498
...
@@ -102,6 +102,7 @@ namespace zmq
...
@@ -102,6 +102,7 @@ namespace zmq
void
send_hiccup
(
zmq
::
pipe_t
*
destination_
,
void
*
pipe_
);
void
send_hiccup
(
zmq
::
pipe_t
*
destination_
,
void
*
pipe_
);
void
send_pipe_term
(
zmq
::
pipe_t
*
destination_
);
void
send_pipe_term
(
zmq
::
pipe_t
*
destination_
);
void
send_pipe_term_ack
(
zmq
::
pipe_t
*
destination_
);
void
send_pipe_term_ack
(
zmq
::
pipe_t
*
destination_
);
void
send_pipe_hwm
(
zmq
::
pipe_t
*
destination_
,
int
inhwm_
,
int
outhwm_
);
void
send_term_req
(
zmq
::
own_t
*
destination_
,
void
send_term_req
(
zmq
::
own_t
*
destination_
,
zmq
::
own_t
*
object_
);
zmq
::
own_t
*
object_
);
void
send_term
(
zmq
::
own_t
*
destination_
,
int
linger_
);
void
send_term
(
zmq
::
own_t
*
destination_
,
int
linger_
);
...
@@ -122,6 +123,7 @@ namespace zmq
...
@@ -122,6 +123,7 @@ namespace zmq
virtual
void
process_hiccup
(
void
*
pipe_
);
virtual
void
process_hiccup
(
void
*
pipe_
);
virtual
void
process_pipe_term
();
virtual
void
process_pipe_term
();
virtual
void
process_pipe_term_ack
();
virtual
void
process_pipe_term_ack
();
virtual
void
process_pipe_hwm
(
int
inhwm_
,
int
outhwm_
);
virtual
void
process_term_req
(
zmq
::
own_t
*
object_
);
virtual
void
process_term_req
(
zmq
::
own_t
*
object_
);
virtual
void
process_term
(
int
linger_
);
virtual
void
process_term
(
int
linger_
);
virtual
void
process_term_ack
();
virtual
void
process_term_ack
();
...
...
src/pipe.cpp
View file @
b8d8c498
...
@@ -377,6 +377,11 @@ void zmq::pipe_t::process_pipe_term_ack ()
...
@@ -377,6 +377,11 @@ void zmq::pipe_t::process_pipe_term_ack ()
delete
this
;
delete
this
;
}
}
void
zmq
::
pipe_t
::
process_pipe_hwm
(
int
inhwm_
,
int
outhwm_
)
{
set_hwms
(
inhwm_
,
outhwm_
);
}
void
zmq
::
pipe_t
::
set_nodelay
()
void
zmq
::
pipe_t
::
set_nodelay
()
{
{
this
->
delay
=
false
;
this
->
delay
=
false
;
...
@@ -533,3 +538,8 @@ bool zmq::pipe_t::check_hwm () const
...
@@ -533,3 +538,8 @@ bool zmq::pipe_t::check_hwm () const
bool
full
=
hwm
>
0
&&
msgs_written
-
peers_msgs_read
>=
uint64_t
(
hwm
);
bool
full
=
hwm
>
0
&&
msgs_written
-
peers_msgs_read
>=
uint64_t
(
hwm
);
return
(
!
full
);
return
(
!
full
);
}
}
void
zmq
::
pipe_t
::
send_hwms_to_peer
(
int
inhwm_
,
int
outhwm_
)
{
send_pipe_hwm
(
peer
,
inhwm_
,
outhwm_
);
}
src/pipe.hpp
View file @
b8d8c498
...
@@ -136,6 +136,9 @@ namespace zmq
...
@@ -136,6 +136,9 @@ namespace zmq
// Set the boost to high water marks, used by inproc sockets so total hwm are sum of connect and bind sockets watermarks
// Set the boost to high water marks, used by inproc sockets so total hwm are sum of connect and bind sockets watermarks
void
set_hwms_boost
(
int
inhwmboost_
,
int
outhwmboost_
);
void
set_hwms_boost
(
int
inhwmboost_
,
int
outhwmboost_
);
// send command to peer for notify the change of hwm
void
send_hwms_to_peer
(
int
inhwm_
,
int
outhwm_
);
// Returns true if HWM is not reached
// Returns true if HWM is not reached
bool
check_hwm
()
const
;
bool
check_hwm
()
const
;
private
:
private
:
...
@@ -149,6 +152,7 @@ namespace zmq
...
@@ -149,6 +152,7 @@ namespace zmq
void
process_hiccup
(
void
*
pipe_
);
void
process_hiccup
(
void
*
pipe_
);
void
process_pipe_term
();
void
process_pipe_term
();
void
process_pipe_term_ack
();
void
process_pipe_term_ack
();
void
process_pipe_hwm
(
int
inhwm_
,
int
outhwm_
);
// Handler for delimiter read from the pipe.
// Handler for delimiter read from the pipe.
void
process_delimiter
();
void
process_delimiter
();
...
...
src/socket_base.cpp
View file @
b8d8c498
...
@@ -1414,6 +1414,7 @@ void zmq::socket_base_t::update_pipe_options(int option_)
...
@@ -1414,6 +1414,7 @@ void zmq::socket_base_t::update_pipe_options(int option_)
for
(
pipes_t
::
size_type
i
=
0
;
i
!=
pipes
.
size
();
++
i
)
for
(
pipes_t
::
size_type
i
=
0
;
i
!=
pipes
.
size
();
++
i
)
{
{
pipes
[
i
]
->
set_hwms
(
options
.
rcvhwm
,
options
.
sndhwm
);
pipes
[
i
]
->
set_hwms
(
options
.
rcvhwm
,
options
.
sndhwm
);
pipes
[
i
]
->
send_hwms_to_peer
(
options
.
sndhwm
,
options
.
rcvhwm
);
}
}
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment