Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
428cf025
Commit
428cf025
authored
Jul 09, 2014
by
Richard Newton
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #1125 from hurtonm/master
Code cleanup
parents
bf74c0cf
816299f1
Show whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
63 additions
and
64 deletions
+63
-64
ctx.cpp
src/ctx.cpp
+6
-5
mailbox.cpp
src/mailbox.cpp
+3
-3
mailbox.hpp
src/mailbox.hpp
+2
-2
signaler.cpp
src/signaler.cpp
+36
-36
signaler.hpp
src/signaler.hpp
+3
-3
socket_base.cpp
src/socket_base.cpp
+13
-15
No files found.
src/ctx.cpp
View file @
428cf025
...
...
@@ -115,9 +115,9 @@ int zmq::ctx_t::terminate ()
// we are a forked child process. Close all file descriptors
// inherited from the parent.
for
(
sockets_t
::
size_type
i
=
0
;
i
!=
sockets
.
size
();
i
++
)
sockets
[
i
]
->
get_mailbox
()
->
forked
();
sockets
[
i
]
->
get_mailbox
()
->
forked
();
term_mailbox
.
forked
();
term_mailbox
.
forked
();
}
#endif
...
...
@@ -374,8 +374,8 @@ int zmq::ctx_t::register_endpoint (const char *addr_,
{
endpoints_sync
.
lock
();
bool
inserted
=
endpoints
.
insert
(
endpoints_t
::
value_type
(
std
::
string
(
addr_
),
endpoint_
)).
second
;
const
bool
inserted
=
endpoints
.
insert
(
endpoints_t
::
value_type
(
std
::
string
(
addr_
),
endpoint_
)).
second
;
endpoints_sync
.
unlock
();
...
...
@@ -420,6 +420,7 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
}
++
it
;
}
endpoints_sync
.
unlock
();
}
...
...
@@ -484,7 +485,7 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
options_t
&
bind_options
,
const
pending_connection_t
&
pending_connection_
,
side
side_
)
{
bind_socket_
->
inc_seqnum
();
pending_connection_
.
bind_pipe
->
set_tid
(
bind_socket_
->
get_tid
());
pending_connection_
.
bind_pipe
->
set_tid
(
bind_socket_
->
get_tid
());
if
(
!
bind_options
.
recv_identity
)
{
msg_t
msg
;
...
...
src/mailbox.cpp
View file @
428cf025
...
...
@@ -25,7 +25,7 @@ zmq::mailbox_t::mailbox_t ()
// Get the pipe into passive state. That way, if the users starts by
// polling on the associated file descriptor it will get woken up when
// new command is posted.
bool
ok
=
cpipe
.
read
(
NULL
);
const
bool
ok
=
cpipe
.
read
(
NULL
);
zmq_assert
(
!
ok
);
active
=
false
;
}
...
...
@@ -40,7 +40,7 @@ zmq::mailbox_t::~mailbox_t ()
sync
.
unlock
();
}
zmq
::
fd_t
zmq
::
mailbox_t
::
get_fd
()
zmq
::
fd_t
zmq
::
mailbox_t
::
get_fd
()
const
{
return
signaler
.
get_fd
();
}
...
...
@@ -49,7 +49,7 @@ void zmq::mailbox_t::send (const command_t &cmd_)
{
sync
.
lock
();
cpipe
.
write
(
cmd_
,
false
);
bool
ok
=
cpipe
.
flush
();
const
bool
ok
=
cpipe
.
flush
();
sync
.
unlock
();
if
(
!
ok
)
signaler
.
send
();
...
...
src/mailbox.hpp
View file @
428cf025
...
...
@@ -40,7 +40,7 @@ namespace zmq
mailbox_t
();
~
mailbox_t
();
fd_t
get_fd
();
fd_t
get_fd
()
const
;
void
send
(
const
command_t
&
cmd_
);
int
recv
(
command_t
*
cmd_
,
int
timeout_
);
...
...
@@ -48,7 +48,7 @@ namespace zmq
// close the file descriptors in the signaller. This is used in a forked
// child process to close the file descriptors so that they do not interfere
// with the context in the parent process.
void
forked
()
{
signaler
.
forked
();
}
void
forked
()
{
signaler
.
forked
();
}
#endif
private
:
...
...
src/signaler.cpp
View file @
428cf025
...
...
@@ -70,7 +70,7 @@ zmq::signaler_t::signaler_t ()
unblock_socket
(
r
);
}
#ifdef HAVE_FORK
pid
=
getpid
();
pid
=
getpid
();
#endif
}
...
...
@@ -80,9 +80,9 @@ zmq::signaler_t::~signaler_t ()
int
rc
=
close
(
r
);
errno_assert
(
rc
==
0
);
#elif defined ZMQ_HAVE_WINDOWS
struct
linger
so_linger
=
{
1
,
0
};
const
struct
linger
so_linger
=
{
1
,
0
};
int
rc
=
setsockopt
(
w
,
SOL_SOCKET
,
SO_LINGER
,
(
c
har
*
)
&
so_linger
,
sizeof
(
so_linger
)
);
(
c
onst
char
*
)
&
so_linger
,
sizeof
so_linger
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
rc
=
closesocket
(
w
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
...
...
@@ -96,15 +96,15 @@ zmq::signaler_t::~signaler_t ()
#endif
}
zmq
::
fd_t
zmq
::
signaler_t
::
get_fd
()
zmq
::
fd_t
zmq
::
signaler_t
::
get_fd
()
const
{
return
r
;
}
void
zmq
::
signaler_t
::
send
()
{
#if defined
(HAVE_FORK)
if
(
unlikely
(
pid
!=
getpid
()))
{
#if defined
HAVE_FORK
if
(
unlikely
(
pid
!=
getpid
()))
{
//printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
return
;
// do not send anything in forked child context
}
...
...
@@ -125,13 +125,13 @@ void zmq::signaler_t::send ()
if
(
unlikely
(
nbytes
==
-
1
&&
errno
==
EINTR
))
continue
;
#if defined(HAVE_FORK)
if
(
unlikely
(
pid
!=
getpid
()))
{
if
(
unlikely
(
pid
!=
getpid
()))
{
//printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
errno
=
EINTR
;
break
;
}
#endif
zmq_assert
(
nbytes
==
sizeof
(
dummy
)
);
zmq_assert
(
nbytes
==
sizeof
dummy
);
break
;
}
#endif
...
...
@@ -140,8 +140,7 @@ void zmq::signaler_t::send ()
int
zmq
::
signaler_t
::
wait
(
int
timeout_
)
{
#ifdef HAVE_FORK
if
(
unlikely
(
pid
!=
getpid
()))
{
if
(
unlikely
(
pid
!=
getpid
()))
{
// we have forked and the file descriptor is closed. Emulate an interupt
// response.
//printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
...
...
@@ -151,7 +150,6 @@ int zmq::signaler_t::wait (int timeout_)
#endif
#ifdef ZMQ_POLL_BASED_ON_POLL
struct
pollfd
pfd
;
pfd
.
fd
=
r
;
pfd
.
events
=
POLLIN
;
...
...
@@ -166,7 +164,8 @@ int zmq::signaler_t::wait (int timeout_)
return
-
1
;
}
#ifdef HAVE_FORK
if
(
unlikely
(
pid
!=
getpid
()))
{
else
if
(
unlikely
(
pid
!=
getpid
()))
{
// we have forked and the file descriptor is closed. Emulate an interupt
// response.
//printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
...
...
@@ -245,7 +244,7 @@ void zmq::signaler_t::recv ()
}
#ifdef HAVE_FORK
void
zmq
::
signaler_t
::
forked
()
void
zmq
::
signaler_t
::
forked
()
{
// Close file descriptors created in the parent and create new pair
close
(
r
);
...
...
@@ -274,13 +273,13 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
// Windows CE does not manage security attributes
SECURITY_DESCRIPTOR
sd
;
SECURITY_ATTRIBUTES
sa
;
memset
(
&
sd
,
0
,
sizeof
(
sd
)
);
memset
(
&
sa
,
0
,
sizeof
(
sa
)
);
memset
(
&
sd
,
0
,
sizeof
sd
);
memset
(
&
sa
,
0
,
sizeof
sa
);
InitializeSecurityDescriptor
(
&
sd
,
SECURITY_DESCRIPTOR_REVISION
);
SetSecurityDescriptorDacl
(
&
sd
,
TRUE
,
0
,
FALSE
);
InitializeSecurityDescriptor
(
&
sd
,
SECURITY_DESCRIPTOR_REVISION
);
SetSecurityDescriptorDacl
(
&
sd
,
TRUE
,
0
,
FALSE
);
sa
.
nLength
=
sizeof
(
SECURITY_ATTRIBUTES
);
sa
.
nLength
=
sizeof
(
SECURITY_ATTRIBUTES
);
sa
.
lpSecurityDescriptor
=
&
sd
;
# endif
...
...
@@ -310,9 +309,10 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
win_assert
(
sync
!=
NULL
);
}
else
if
(
signaler_port
!=
0
)
{
wchar_t
mutex_name
[
MAX_PATH
];
swprintf
(
mutex_name
,
MAX_PATH
,
L"Global
\\
zmq-signaler-port-%d"
,
signaler_port
);
else
if
(
signaler_port
!=
0
)
{
wchar_t
mutex_name
[
MAX_PATH
];
swprintf
(
mutex_name
,
MAX_PATH
,
L"Global
\\
zmq-signaler-port-%d"
,
signaler_port
);
# if !defined _WIN32_WCE
sync
=
CreateMutexW
(
&
sa
,
FALSE
,
mutex_name
);
...
...
@@ -338,16 +338,16 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
// Set SO_REUSEADDR and TCP_NODELAY on listening socket.
BOOL
so_reuseaddr
=
1
;
int
rc
=
setsockopt
(
listener
,
SOL_SOCKET
,
SO_REUSEADDR
,
(
char
*
)
&
so_reuseaddr
,
sizeof
(
so_reuseaddr
)
);
(
char
*
)
&
so_reuseaddr
,
sizeof
so_reuseaddr
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
BOOL
tcp_nodelay
=
1
;
rc
=
setsockopt
(
listener
,
IPPROTO_TCP
,
TCP_NODELAY
,
(
char
*
)
&
tcp_nodelay
,
sizeof
(
tcp_nodelay
)
);
(
char
*
)
&
tcp_nodelay
,
sizeof
tcp_nodelay
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
// Init sockaddr to signaler port.
struct
sockaddr_in
addr
;
memset
(
&
addr
,
0
,
sizeof
(
addr
)
);
memset
(
&
addr
,
0
,
sizeof
addr
);
addr
.
sin_family
=
AF_INET
;
addr
.
sin_addr
.
s_addr
=
htonl
(
INADDR_LOOPBACK
);
addr
.
sin_port
=
htons
(
signaler_port
);
...
...
@@ -358,7 +358,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
// Set TCP_NODELAY on writer socket.
rc
=
setsockopt
(
*
w_
,
IPPROTO_TCP
,
TCP_NODELAY
,
(
char
*
)
&
tcp_nodelay
,
sizeof
(
tcp_nodelay
)
);
(
char
*
)
&
tcp_nodelay
,
sizeof
tcp_nodelay
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
if
(
sync
!=
NULL
)
{
...
...
@@ -368,11 +368,11 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
}
// Bind listening socket to signaler port.
rc
=
bind
(
listener
,
(
const
struct
sockaddr
*
)
&
addr
,
sizeof
(
addr
)
);
rc
=
bind
(
listener
,
(
const
struct
sockaddr
*
)
&
addr
,
sizeof
addr
);
if
(
rc
!=
SOCKET_ERROR
&&
signaler_port
==
0
)
{
// Retrieve ephemeral port number
int
addrlen
=
sizeof
(
addr
)
;
int
addrlen
=
sizeof
addr
;
rc
=
getsockname
(
listener
,
(
struct
sockaddr
*
)
&
addr
,
&
addrlen
);
}
...
...
@@ -382,7 +382,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
// Connect writer to the listener.
if
(
rc
!=
SOCKET_ERROR
)
rc
=
connect
(
*
w_
,
(
struct
sockaddr
*
)
&
addr
,
sizeof
(
addr
)
);
rc
=
connect
(
*
w_
,
(
struct
sockaddr
*
)
&
addr
,
sizeof
addr
);
// Accept connection from writer.
if
(
rc
!=
SOCKET_ERROR
)
...
...
@@ -439,7 +439,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
// The bug will be fixed in V5.6 ECO4 and beyond. In the meantime, we'll
// create the socket pair manually.
struct
sockaddr_in
lcladdr
;
memset
(
&
lcladdr
,
0
,
sizeof
(
lcladdr
)
);
memset
(
&
lcladdr
,
0
,
sizeof
lcladdr
);
lcladdr
.
sin_family
=
AF_INET
;
lcladdr
.
sin_addr
.
s_addr
=
htonl
(
INADDR_LOOPBACK
);
lcladdr
.
sin_port
=
0
;
...
...
@@ -448,16 +448,16 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
errno_assert
(
listener
!=
-
1
);
int
on
=
1
;
int
rc
=
setsockopt
(
listener
,
IPPROTO_TCP
,
TCP_NODELAY
,
&
on
,
sizeof
(
on
)
);
int
rc
=
setsockopt
(
listener
,
IPPROTO_TCP
,
TCP_NODELAY
,
&
on
,
sizeof
on
);
errno_assert
(
rc
!=
-
1
);
rc
=
setsockopt
(
listener
,
IPPROTO_TCP
,
TCP_NODELACK
,
&
on
,
sizeof
(
on
)
);
rc
=
setsockopt
(
listener
,
IPPROTO_TCP
,
TCP_NODELACK
,
&
on
,
sizeof
on
);
errno_assert
(
rc
!=
-
1
);
rc
=
bind
(
listener
,
(
struct
sockaddr
*
)
&
lcladdr
,
sizeof
(
lcladdr
)
);
rc
=
bind
(
listener
,
(
struct
sockaddr
*
)
&
lcladdr
,
sizeof
lcladdr
);
errno_assert
(
rc
!=
-
1
);
socklen_t
lcladdr_len
=
sizeof
(
lcladdr
)
;
socklen_t
lcladdr_len
=
sizeof
lcladdr
;
rc
=
getsockname
(
listener
,
(
struct
sockaddr
*
)
&
lcladdr
,
&
lcladdr_len
);
errno_assert
(
rc
!=
-
1
);
...
...
@@ -468,13 +468,13 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
*
w_
=
open_socket
(
AF_INET
,
SOCK_STREAM
,
0
);
errno_assert
(
*
w_
!=
-
1
);
rc
=
setsockopt
(
*
w_
,
IPPROTO_TCP
,
TCP_NODELAY
,
&
on
,
sizeof
(
on
)
);
rc
=
setsockopt
(
*
w_
,
IPPROTO_TCP
,
TCP_NODELAY
,
&
on
,
sizeof
on
);
errno_assert
(
rc
!=
-
1
);
rc
=
setsockopt
(
*
w_
,
IPPROTO_TCP
,
TCP_NODELACK
,
&
on
,
sizeof
(
on
)
);
rc
=
setsockopt
(
*
w_
,
IPPROTO_TCP
,
TCP_NODELACK
,
&
on
,
sizeof
on
);
errno_assert
(
rc
!=
-
1
);
rc
=
connect
(
*
w_
,
(
struct
sockaddr
*
)
&
lcladdr
,
sizeof
(
lcladdr
)
);
rc
=
connect
(
*
w_
,
(
struct
sockaddr
*
)
&
lcladdr
,
sizeof
lcladdr
);
errno_assert
(
rc
!=
-
1
);
*
r_
=
accept
(
listener
,
NULL
,
NULL
);
...
...
src/signaler.hpp
View file @
428cf025
...
...
@@ -41,7 +41,7 @@ namespace zmq
signaler_t
();
~
signaler_t
();
fd_t
get_fd
();
fd_t
get_fd
()
const
;
void
send
();
int
wait
(
int
timeout_
);
void
recv
();
...
...
@@ -49,7 +49,7 @@ namespace zmq
#ifdef HAVE_FORK
// close the file descriptors in a forked child process so that they
// do not interfere with the context in the parent process.
void
forked
();
void
forked
();
#endif
private
:
...
...
@@ -72,7 +72,7 @@ namespace zmq
pid_t
pid
;
// idempotent close of file descriptors that is safe to use by destructor
// and forked().
void
close_internal
();
void
close_internal
();
#endif
};
}
...
...
src/socket_base.cpp
View file @
428cf025
...
...
@@ -365,9 +365,9 @@ int zmq::socket_base_t::bind (const char *addr_)
if
(
protocol
==
"inproc"
)
{
const
endpoint_t
endpoint
=
{
this
,
options
};
int
rc
=
register_endpoint
(
addr_
,
endpoint
);
const
int
rc
=
register_endpoint
(
addr_
,
endpoint
);
if
(
rc
==
0
)
{
connect_pending
(
addr_
,
this
);
connect_pending
(
addr_
,
this
);
last_endpoint
.
assign
(
addr_
);
}
return
rc
;
...
...
@@ -485,7 +485,8 @@ int zmq::socket_base_t::connect (const char *addr_)
int
rcvhwm
=
0
;
if
(
peer
.
socket
==
NULL
)
rcvhwm
=
options
.
rcvhwm
;
else
if
(
options
.
rcvhwm
!=
0
&&
peer
.
options
.
sndhwm
!=
0
)
else
if
(
options
.
rcvhwm
!=
0
&&
peer
.
options
.
sndhwm
!=
0
)
rcvhwm
=
options
.
rcvhwm
+
peer
.
options
.
sndhwm
;
// Create a bi-directional pipe to connect the peers.
...
...
@@ -524,11 +525,9 @@ int zmq::socket_base_t::connect (const char *addr_)
const
endpoint_t
endpoint
=
{
this
,
options
};
pend_connection
(
std
::
string
(
addr_
),
endpoint
,
new_pipes
);
}
else
{
else
{
// If required, send the identity of the local socket to the peer.
if
(
peer
.
options
.
recv_identity
)
{
msg_t
id
;
rc
=
id
.
init_size
(
options
.
identity_size
);
errno_assert
(
rc
==
0
);
...
...
@@ -561,7 +560,7 @@ int zmq::socket_base_t::connect (const char *addr_)
last_endpoint
.
assign
(
addr_
);
// remember inproc connections for disconnect
inprocs
.
insert
(
inprocs_t
::
value_type
(
std
::
string
(
addr_
),
new_pipes
[
0
]));
inprocs
.
insert
(
inprocs_t
::
value_type
(
std
::
string
(
addr_
),
new_pipes
[
0
]));
return
0
;
}
...
...
@@ -569,7 +568,7 @@ int zmq::socket_base_t::connect (const char *addr_)
options
.
type
==
ZMQ_SUB
||
options
.
type
==
ZMQ_REQ
);
if
(
unlikely
(
is_single_connect
))
{
endpoints_t
::
iterator
it
=
endpoints
.
find
(
addr_
);
const
endpoints_t
::
iterator
it
=
endpoints
.
find
(
addr_
);
if
(
it
!=
endpoints
.
end
())
{
// There is no valid use for multiple connects for SUB-PUB nor
// DEALER-ROUTER nor REQ-REP. Multiple connects produces
...
...
@@ -712,7 +711,7 @@ void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe
{
// Activate the session. Make it a child of this socket.
launch_child
(
endpoint_
);
endpoints
.
insert
(
endpoints_t
::
value_type
(
std
::
string
(
addr_
),
endpoint_pipe_t
(
endpoint_
,
pipe
)));
endpoints
.
insert
(
endpoints_t
::
value_type
(
std
::
string
(
addr_
),
endpoint_pipe_t
(
endpoint_
,
pipe
)));
}
int
zmq
::
socket_base_t
::
term_endpoint
(
const
char
*
addr_
)
...
...
@@ -752,7 +751,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
}
for
(
inprocs_t
::
iterator
it
=
range
.
first
;
it
!=
range
.
second
;
++
it
)
it
->
second
->
terminate
(
true
);
it
->
second
->
terminate
(
true
);
inprocs
.
erase
(
range
.
first
,
range
.
second
);
return
0
;
}
...
...
@@ -767,7 +766,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
for
(
endpoints_t
::
iterator
it
=
range
.
first
;
it
!=
range
.
second
;
++
it
)
{
// If we have an associated pipe, terminate it.
if
(
it
->
second
.
second
!=
NULL
)
it
->
second
.
second
->
terminate
(
false
);
it
->
second
.
second
->
terminate
(
false
);
term_child
(
it
->
second
.
first
);
}
endpoints
.
erase
(
range
.
first
,
range
.
second
);
...
...
@@ -983,7 +982,7 @@ int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
// commands recently, so that we can throttle the new commands.
// Get the CPU's tick counter. If 0, the counter is not available.
uint64_t
tsc
=
zmq
::
clock_t
::
rdtsc
();
const
uint64_t
tsc
=
zmq
::
clock_t
::
rdtsc
();
// Optimised version of command processing - it doesn't have to check
// for incoming commands each time. It does so only if certain time
...
...
@@ -1176,12 +1175,11 @@ void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
xpipe_terminated
(
pipe_
);
// Remove pipe from inproc pipes
for
(
inprocs_t
::
iterator
it
=
inprocs
.
begin
();
it
!=
inprocs
.
end
();
++
it
)
{
for
(
inprocs_t
::
iterator
it
=
inprocs
.
begin
();
it
!=
inprocs
.
end
();
++
it
)
if
(
it
->
second
==
pipe_
)
{
inprocs
.
erase
(
it
);
inprocs
.
erase
(
it
);
break
;
}
}
// Remove the pipe from the list of attached pipes and confirm its
// termination if we are already shutting down.
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment