Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
c573f6b8
Commit
c573f6b8
authored
Nov 07, 2013
by
Pieter Hintjens
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Fix for issue 574
parent
6ed1f476
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
89 additions
and
86 deletions
+89
-86
signaler.cpp
src/signaler.cpp
+45
-44
signaler.hpp
src/signaler.hpp
+2
-2
socket_base.cpp
src/socket_base.cpp
+42
-40
No files found.
src/signaler.cpp
View file @
c573f6b8
...
@@ -80,13 +80,10 @@
...
@@ -80,13 +80,10 @@
zmq
::
signaler_t
::
signaler_t
()
zmq
::
signaler_t
::
signaler_t
()
{
{
// Create the socketpair for signaling.
// Create the socketpair for signaling.
int
rc
=
make_fdpair
(
&
r
,
&
w
);
if
(
make_fdpair
(
&
r
,
&
w
)
==
0
)
{
errno_assert
(
rc
==
0
);
unblock_socket
(
w
);
unblock_socket
(
r
);
// Set both fds to non-blocking mode.
}
unblock_socket
(
w
);
unblock_socket
(
r
);
#ifdef HAVE_FORK
#ifdef HAVE_FORK
pid
=
getpid
();
pid
=
getpid
();
#endif
#endif
...
@@ -273,7 +270,7 @@ void zmq::signaler_t::forked()
...
@@ -273,7 +270,7 @@ void zmq::signaler_t::forked()
// replace the file descriptors created in the parent with new
// replace the file descriptors created in the parent with new
// ones, and close the inherited ones
// ones, and close the inherited ones
make_fdpair
(
&
r
,
&
w
);
make_fdpair
(
&
r
,
&
w
);
#if defined ZMQ_HAVE_EVENTFD
#if defined ZMQ_HAVE_EVENTFD
int
rc
=
close
(
oldr
);
int
rc
=
close
(
oldr
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
...
@@ -286,22 +283,23 @@ void zmq::signaler_t::forked()
...
@@ -286,22 +283,23 @@ void zmq::signaler_t::forked()
}
}
#endif
#endif
// Returns -1 if we could not make the socket pair successfully
int
zmq
::
signaler_t
::
make_fdpair
(
fd_t
*
r_
,
fd_t
*
w_
)
int
zmq
::
signaler_t
::
make_fdpair
(
fd_t
*
r_
,
fd_t
*
w_
)
{
{
#if defined ZMQ_HAVE_EVENTFD
#if defined ZMQ_HAVE_EVENTFD
// Create eventfd object.
fd_t
fd
=
eventfd
(
0
,
0
);
fd_t
fd
=
eventfd
(
0
,
0
);
errno_assert
(
fd
!=
-
1
);
if
(
fd
==
-
1
)
{
*
w_
=
fd
;
errno_assert
(
errno
==
ENFILE
||
errno
==
EMFILE
);
*
r_
=
fd
;
*
w_
=
*
r_
=
-
1
;
return
0
;
return
-
1
;
}
else
{
*
w_
=
*
r_
=
fd
;
return
0
;
}
#elif defined ZMQ_HAVE_WINDOWS
#elif defined ZMQ_HAVE_WINDOWS
#if !defined _WIN32_WCE
#
if !defined _WIN32_WCE
// Windows CE does not manage security attributes
// Windows CE does not manage security attributes
SECURITY_DESCRIPTOR
sd
;
SECURITY_DESCRIPTOR
sd
;
SECURITY_ATTRIBUTES
sa
;
SECURITY_ATTRIBUTES
sa
;
...
@@ -313,7 +311,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
...
@@ -313,7 +311,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
sa
.
nLength
=
sizeof
(
SECURITY_ATTRIBUTES
);
sa
.
nLength
=
sizeof
(
SECURITY_ATTRIBUTES
);
sa
.
lpSecurityDescriptor
=
&
sd
;
sa
.
lpSecurityDescriptor
=
&
sd
;
#endif
#
endif
// This function has to be in a system-wide critical section so that
// This function has to be in a system-wide critical section so that
// two instances of the library don't accidentally create signaler
// two instances of the library don't accidentally create signaler
...
@@ -322,13 +320,14 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
...
@@ -322,13 +320,14 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
// Note that if the event object already exists, the CreateEvent requests
// Note that if the event object already exists, the CreateEvent requests
// EVENT_ALL_ACCESS access right. If this fails, we try to open
// EVENT_ALL_ACCESS access right. If this fails, we try to open
// the event object asking for SYNCHRONIZE access only.
// the event object asking for SYNCHRONIZE access only.
#if !defined _WIN32_WCE
#
if !defined _WIN32_WCE
HANDLE
sync
=
CreateEvent
(
&
sa
,
FALSE
,
TRUE
,
TEXT
(
"Global
\\
zmq-signaler-port-sync"
));
HANDLE
sync
=
CreateEvent
(
&
sa
,
FALSE
,
TRUE
,
TEXT
(
"Global
\\
zmq-signaler-port-sync"
));
#else
#
else
HANDLE
sync
=
CreateEvent
(
NULL
,
FALSE
,
TRUE
,
TEXT
(
"Global
\\
zmq-signaler-port-sync"
));
HANDLE
sync
=
CreateEvent
(
NULL
,
FALSE
,
TRUE
,
TEXT
(
"Global
\\
zmq-signaler-port-sync"
));
#endif
#
endif
if
(
sync
==
NULL
&&
GetLastError
()
==
ERROR_ACCESS_DENIED
)
if
(
sync
==
NULL
&&
GetLastError
()
==
ERROR_ACCESS_DENIED
)
sync
=
OpenEvent
(
SYNCHRONIZE
|
EVENT_MODIFY_STATE
,
FALSE
,
TEXT
(
"Global
\\
zmq-signaler-port-sync"
));
sync
=
OpenEvent
(
SYNCHRONIZE
|
EVENT_MODIFY_STATE
,
FALSE
,
TEXT
(
"Global
\\
zmq-signaler-port-sync"
));
win_assert
(
sync
!=
NULL
);
win_assert
(
sync
!=
NULL
);
...
@@ -373,13 +372,13 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
...
@@ -373,13 +372,13 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
*
w_
=
WSASocket
(
AF_INET
,
SOCK_STREAM
,
0
,
NULL
,
0
,
0
);
*
w_
=
WSASocket
(
AF_INET
,
SOCK_STREAM
,
0
,
NULL
,
0
,
0
);
wsa_assert
(
*
w_
!=
INVALID_SOCKET
);
wsa_assert
(
*
w_
!=
INVALID_SOCKET
);
#if !defined _WIN32_WCE
#
if !defined _WIN32_WCE
// On Windows, preventing sockets to be inherited by child processes.
// On Windows, preventing sockets to be inherited by child processes.
BOOL
brc
=
SetHandleInformation
((
HANDLE
)
*
w_
,
HANDLE_FLAG_INHERIT
,
0
);
BOOL
brc
=
SetHandleInformation
((
HANDLE
)
*
w_
,
HANDLE_FLAG_INHERIT
,
0
);
win_assert
(
brc
);
win_assert
(
brc
);
#else
#
else
BOOL
brc
;
BOOL
brc
;
#endif
#
endif
// Set TCP_NODELAY on writer socket.
// Set TCP_NODELAY on writer socket.
rc
=
setsockopt
(
*
w_
,
IPPROTO_TCP
,
TCP_NODELAY
,
rc
=
setsockopt
(
*
w_
,
IPPROTO_TCP
,
TCP_NODELAY
,
...
@@ -391,17 +390,14 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
...
@@ -391,17 +390,14 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
// Save errno if connection fails
// Save errno if connection fails
int
conn_errno
=
0
;
int
conn_errno
=
0
;
if
(
rc
==
SOCKET_ERROR
)
{
if
(
rc
==
SOCKET_ERROR
)
conn_errno
=
WSAGetLastError
();
conn_errno
=
WSAGetLastError
();
}
else
{
else
{
// Accept connection from writer.
// Accept connection from writer.
*
r_
=
accept
(
listener
,
NULL
,
NULL
);
*
r_
=
accept
(
listener
,
NULL
,
NULL
);
if
(
*
r_
==
INVALID_SOCKET
)
if
(
*
r_
==
INVALID_SOCKET
)
{
conn_errno
=
WSAGetLastError
();
conn_errno
=
WSAGetLastError
();
}
}
}
// We don't need the listening socket anymore. Close it.
// We don't need the listening socket anymore. Close it.
rc
=
closesocket
(
listener
);
rc
=
closesocket
(
listener
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
...
@@ -415,13 +411,14 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
...
@@ -415,13 +411,14 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
win_assert
(
brc
!=
0
);
win_assert
(
brc
!=
0
);
if
(
*
r_
!=
INVALID_SOCKET
)
{
if
(
*
r_
!=
INVALID_SOCKET
)
{
#if !defined _WIN32_WCE
#
if !defined _WIN32_WCE
// On Windows, preventing sockets to be inherited by child processes.
// On Windows, preventing sockets to be inherited by child processes.
brc
=
SetHandleInformation
((
HANDLE
)
*
r_
,
HANDLE_FLAG_INHERIT
,
0
);
brc
=
SetHandleInformation
((
HANDLE
)
*
r_
,
HANDLE_FLAG_INHERIT
,
0
);
win_assert
(
brc
);
win_assert
(
brc
);
#endif
#
endif
return
0
;
return
0
;
}
else
{
}
else
{
// Cleanup writer if connection failed
// Cleanup writer if connection failed
rc
=
closesocket
(
*
w_
);
rc
=
closesocket
(
*
w_
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
...
@@ -435,7 +432,6 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
...
@@ -435,7 +432,6 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
// Unfortunately, it uses errno_assert() which gives "Unknown error"
// Unfortunately, it uses errno_assert() which gives "Unknown error"
// We might as well assert here and print the actual error message
// We might as well assert here and print the actual error message
wsa_assert_no
(
conn_errno
);
wsa_assert_no
(
conn_errno
);
return
-
1
;
return
-
1
;
}
}
...
@@ -463,7 +459,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
...
@@ -463,7 +459,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
rc
=
setsockopt
(
listener
,
IPPROTO_TCP
,
TCP_NODELACK
,
&
on
,
sizeof
(
on
));
rc
=
setsockopt
(
listener
,
IPPROTO_TCP
,
TCP_NODELACK
,
&
on
,
sizeof
(
on
));
errno_assert
(
rc
!=
-
1
);
errno_assert
(
rc
!=
-
1
);
rc
=
bind
(
listener
,
(
struct
sockaddr
*
)
&
lcladdr
,
sizeof
(
lcladdr
));
rc
=
bind
(
listener
,
(
struct
sockaddr
*
)
&
lcladdr
,
sizeof
(
lcladdr
));
errno_assert
(
rc
!=
-
1
);
errno_assert
(
rc
!=
-
1
);
socklen_t
lcladdr_len
=
sizeof
(
lcladdr
);
socklen_t
lcladdr_len
=
sizeof
(
lcladdr
);
...
@@ -493,15 +489,20 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
...
@@ -493,15 +489,20 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
return
0
;
return
0
;
#else
// All other implementations support socketpair()
#else
// All other implementations support socketpair()
int
sv
[
2
];
int
sv
[
2
];
int
rc
=
socketpair
(
AF_UNIX
,
SOCK_STREAM
,
0
,
sv
);
int
rc
=
socketpair
(
AF_UNIX
,
SOCK_STREAM
,
0
,
sv
);
errno_assert
(
rc
==
0
);
if
(
rc
==
-
1
)
{
*
w_
=
sv
[
0
];
errno_assert
(
errno
==
ENFILE
||
errno
==
EMFILE
);
*
r_
=
sv
[
1
];
sv
[
0
]
=
sv
[
1
]
=
-
1
;
return
0
;
return
-
1
;
}
else
{
*
w_
=
sv
[
0
];
*
r_
=
sv
[
1
];
return
0
;
}
#endif
#endif
}
}
...
...
src/signaler.hpp
View file @
c573f6b8
...
@@ -58,7 +58,8 @@ namespace zmq
...
@@ -58,7 +58,8 @@ namespace zmq
// to pass the signals.
// to pass the signals.
static
int
make_fdpair
(
fd_t
*
r_
,
fd_t
*
w_
);
static
int
make_fdpair
(
fd_t
*
r_
,
fd_t
*
w_
);
// Underlying write & read file descriptor.
// Underlying write & read file descriptor
// Will be -1 if we exceeded number of available handles
fd_t
w
;
fd_t
w
;
fd_t
r
;
fd_t
r
;
...
@@ -74,7 +75,6 @@ namespace zmq
...
@@ -74,7 +75,6 @@ namespace zmq
void
close_internal
();
void
close_internal
();
#endif
#endif
};
};
}
}
#endif
#endif
src/socket_base.cpp
View file @
c573f6b8
...
@@ -81,47 +81,49 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
...
@@ -81,47 +81,49 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
{
{
socket_base_t
*
s
=
NULL
;
socket_base_t
*
s
=
NULL
;
switch
(
type_
)
{
switch
(
type_
)
{
case
ZMQ_PAIR
:
case
ZMQ_PAIR
:
s
=
new
(
std
::
nothrow
)
pair_t
(
parent_
,
tid_
,
sid_
);
s
=
new
(
std
::
nothrow
)
pair_t
(
parent_
,
tid_
,
sid_
);
break
;
break
;
case
ZMQ_PUB
:
case
ZMQ_PUB
:
s
=
new
(
std
::
nothrow
)
pub_t
(
parent_
,
tid_
,
sid_
);
s
=
new
(
std
::
nothrow
)
pub_t
(
parent_
,
tid_
,
sid_
);
break
;
break
;
case
ZMQ_SUB
:
case
ZMQ_SUB
:
s
=
new
(
std
::
nothrow
)
sub_t
(
parent_
,
tid_
,
sid_
);
s
=
new
(
std
::
nothrow
)
sub_t
(
parent_
,
tid_
,
sid_
);
break
;
break
;
case
ZMQ_REQ
:
case
ZMQ_REQ
:
s
=
new
(
std
::
nothrow
)
req_t
(
parent_
,
tid_
,
sid_
);
s
=
new
(
std
::
nothrow
)
req_t
(
parent_
,
tid_
,
sid_
);
break
;
break
;
case
ZMQ_REP
:
case
ZMQ_REP
:
s
=
new
(
std
::
nothrow
)
rep_t
(
parent_
,
tid_
,
sid_
);
s
=
new
(
std
::
nothrow
)
rep_t
(
parent_
,
tid_
,
sid_
);
break
;
break
;
case
ZMQ_DEALER
:
case
ZMQ_DEALER
:
s
=
new
(
std
::
nothrow
)
dealer_t
(
parent_
,
tid_
,
sid_
);
s
=
new
(
std
::
nothrow
)
dealer_t
(
parent_
,
tid_
,
sid_
);
break
;
break
;
case
ZMQ_ROUTER
:
case
ZMQ_ROUTER
:
s
=
new
(
std
::
nothrow
)
router_t
(
parent_
,
tid_
,
sid_
);
s
=
new
(
std
::
nothrow
)
router_t
(
parent_
,
tid_
,
sid_
);
break
;
break
;
case
ZMQ_PULL
:
case
ZMQ_PULL
:
s
=
new
(
std
::
nothrow
)
pull_t
(
parent_
,
tid_
,
sid_
);
s
=
new
(
std
::
nothrow
)
pull_t
(
parent_
,
tid_
,
sid_
);
break
;
break
;
case
ZMQ_PUSH
:
case
ZMQ_PUSH
:
s
=
new
(
std
::
nothrow
)
push_t
(
parent_
,
tid_
,
sid_
);
s
=
new
(
std
::
nothrow
)
push_t
(
parent_
,
tid_
,
sid_
);
break
;
break
;
case
ZMQ_XPUB
:
case
ZMQ_XPUB
:
s
=
new
(
std
::
nothrow
)
xpub_t
(
parent_
,
tid_
,
sid_
);
s
=
new
(
std
::
nothrow
)
xpub_t
(
parent_
,
tid_
,
sid_
);
break
;
break
;
case
ZMQ_XSUB
:
case
ZMQ_XSUB
:
s
=
new
(
std
::
nothrow
)
xsub_t
(
parent_
,
tid_
,
sid_
);
s
=
new
(
std
::
nothrow
)
xsub_t
(
parent_
,
tid_
,
sid_
);
break
;
break
;
case
ZMQ_STREAM
:
case
ZMQ_STREAM
:
s
=
new
(
std
::
nothrow
)
stream_t
(
parent_
,
tid_
,
sid_
);
s
=
new
(
std
::
nothrow
)
stream_t
(
parent_
,
tid_
,
sid_
);
break
;
break
;
default
:
default
:
errno
=
EINVAL
;
errno
=
EINVAL
;
return
NULL
;
return
NULL
;
}
}
if
(
s
->
mailbox
.
get_fd
()
==
-
1
)
return
NULL
;
alloc_assert
(
s
);
alloc_assert
(
s
);
return
s
;
return
s
;
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment