Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
d5f3628a
Commit
d5f3628a
authored
Jul 29, 2011
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Different connecters simplified
Signed-off-by:
Martin Sustrik
<
sustrik@250bpm.com
>
parent
f63db009
Show whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
100 additions
and
332 deletions
+100
-332
ip.cpp
src/ip.cpp
+31
-0
ip.hpp
src/ip.hpp
+4
-4
ipc_connecter.cpp
src/ipc_connecter.cpp
+10
-150
ipc_connecter.hpp
src/ipc_connecter.hpp
+7
-0
signaler.cpp
src/signaler.cpp
+2
-18
tcp_connecter.cpp
src/tcp_connecter.cpp
+39
-138
tcp_connecter.hpp
src/tcp_connecter.hpp
+1
-1
tcp_engine.cpp
src/tcp_engine.cpp
+6
-21
No files found.
src/ip.cpp
View file @
d5f3628a
...
@@ -28,6 +28,18 @@
...
@@ -28,6 +28,18 @@
#include "platform.hpp"
#include "platform.hpp"
#include "stdint.hpp"
#include "stdint.hpp"
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
#include <sys/un.h>
#endif
#if !defined ZMQ_HAVE_WINDOWS
#include <fcntl.h>
#endif
#if defined ZMQ_HAVE_OPENVMS
#include <ioctl.h>
#endif
#if defined ZMQ_HAVE_SOLARIS
#if defined ZMQ_HAVE_SOLARIS
#include <sys/sockio.h>
#include <sys/sockio.h>
...
@@ -367,4 +379,23 @@ void zmq::tune_tcp_socket (fd_t s_)
...
@@ -367,4 +379,23 @@ void zmq::tune_tcp_socket (fd_t s_)
#endif
#endif
}
}
void
zmq
::
unblock_socket
(
fd_t
s_
)
{
#ifdef ZMQ_HAVE_WINDOWS
u_long
nonblock
=
1
;
int
rc
=
ioctlsocket
(
s_
,
FIONBIO
,
&
nonblock
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
#elif ZMQ_HAVE_OPENVMS
int
nonblock
=
1
;
int
rc
=
ioctl
(
s_
,
FIONBIO
,
&
nonblock
);
errno_assert
(
rc
!=
-
1
);
#else
int
flags
=
fcntl
(
s_
,
F_GETFL
,
0
);
if
(
flags
==
-
1
)
flags
=
0
;
int
rc
=
fcntl
(
s_
,
F_SETFL
,
flags
|
O_NONBLOCK
);
errno_assert
(
rc
!=
-
1
);
#endif
}
src/ip.hpp
View file @
d5f3628a
...
@@ -35,10 +35,6 @@
...
@@ -35,10 +35,6 @@
#include <netdb.h>
#include <netdb.h>
#endif
#endif
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
#include <sys/un.h>
#endif
// Some platforms (notably Darwin/OSX and NetBSD) do not define all AI_
// Some platforms (notably Darwin/OSX and NetBSD) do not define all AI_
// flags for getaddrinfo(). This can be worked around safely by defining
// flags for getaddrinfo(). This can be worked around safely by defining
// these to 0.
// these to 0.
...
@@ -68,6 +64,10 @@ namespace zmq
...
@@ -68,6 +64,10 @@ namespace zmq
// Tunes the supplied TCP socket for the best latency.
// Tunes the supplied TCP socket for the best latency.
void
tune_tcp_socket
(
fd_t
s_
);
void
tune_tcp_socket
(
fd_t
s_
);
// Sets the socket into non-blocking mode.
void
unblock_socket
(
fd_t
s_
);
}
}
#endif
#endif
src/ipc_connecter.cpp
View file @
d5f3628a
...
@@ -18,32 +18,24 @@
...
@@ -18,32 +18,24 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "ipc_connecter.hpp"
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
#include <new>
#include <new>
#include <string>
#include <string>
#include "ipc_connecter.hpp"
#include "tcp_engine.hpp"
#include "tcp_engine.hpp"
#include "io_thread.hpp"
#include "io_thread.hpp"
#include "platform.hpp"
#include "platform.hpp"
#include "random.hpp"
#include "random.hpp"
#include "ip.hpp"
#include "err.hpp"
#include "err.hpp"
#include "ip.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/un.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#ifdef ZMQ_HAVE_OPENVMS
#include <ioctl.h>
#endif
#endif
zmq
::
ipc_connecter_t
::
ipc_connecter_t
(
class
io_thread_t
*
io_thread_
,
zmq
::
ipc_connecter_t
::
ipc_connecter_t
(
class
io_thread_t
*
io_thread_
,
class
session_t
*
session_
,
const
options_t
&
options_
,
class
session_t
*
session_
,
const
options_t
&
options_
,
...
@@ -175,84 +167,6 @@ int zmq::ipc_connecter_t::get_new_reconnect_ivl ()
...
@@ -175,84 +167,6 @@ int zmq::ipc_connecter_t::get_new_reconnect_ivl ()
return
this_interval
;
return
this_interval
;
}
}
#ifdef ZMQ_HAVE_WINDOWS
int
zmq
::
ipc_connecter_t
::
set_address
(
const
char
*
protocol_
,
const
char
*
addr_
)
{
errno
=
EPROTONOSUPPORT
;
return
-
1
;
}
int
zmq
::
ipc_connecter_t
::
open
()
{
zmq_assert
(
s
==
retired_fd
);
// Create the socket.
s
=
socket
(
addr
.
ss_family
,
SOCK_STREAM
,
IPPROTO_TCP
);
if
(
s
==
INVALID_SOCKET
)
{
wsa_error_to_errno
();
return
-
1
;
}
// Set to non-blocking mode.
unsigned
long
argp
=
1
;
int
rc
=
ioctlsocket
(
s
,
FIONBIO
,
&
argp
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
// Connect to the remote peer.
rc
=
::
connect
(
s
,
(
sockaddr
*
)
&
addr
,
addr_len
);
// Connect was successfull immediately.
if
(
rc
==
0
)
return
0
;
// Asynchronous connect was launched.
if
(
rc
==
SOCKET_ERROR
&&
(
WSAGetLastError
()
==
WSAEINPROGRESS
||
WSAGetLastError
()
==
WSAEWOULDBLOCK
))
{
errno
=
EAGAIN
;
return
-
1
;
}
wsa_error_to_errno
();
return
-
1
;
}
int
zmq
::
ipc_connecter_t
::
close
()
{
zmq_assert
(
s
!=
retired_fd
);
int
rc
=
closesocket
(
s
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
s
=
retired_fd
;
return
0
;
}
zmq
::
fd_t
zmq
::
ipc_connecter_t
::
connect
()
{
// Nonblocking connect have finished. Check whether an error occured.
int
err
=
0
;
socklen_t
len
=
sizeof
err
;
int
rc
=
getsockopt
(
s
,
SOL_SOCKET
,
SO_ERROR
,
(
char
*
)
&
err
,
&
len
);
zmq_assert
(
rc
==
0
);
if
(
err
!=
0
)
{
// Assert that the error was caused by the networking problems
// rather than 0MQ bug.
if
(
err
==
WSAECONNREFUSED
||
err
==
WSAETIMEDOUT
||
err
==
WSAECONNABORTED
||
err
==
WSAEHOSTUNREACH
||
err
==
WSAENETUNREACH
||
err
==
WSAENETDOWN
)
return
retired_fd
;
wsa_assert_no
(
err
);
}
// Return the newly connected socket.
fd_t
result
=
s
;
s
=
retired_fd
;
return
result
;
}
#else
int
zmq
::
ipc_connecter_t
::
set_address
(
const
char
*
addr_
)
int
zmq
::
ipc_connecter_t
::
set_address
(
const
char
*
addr_
)
{
{
return
resolve_local_path
(
&
addr
,
&
addr_len
,
addr_
);
return
resolve_local_path
(
&
addr
,
&
addr_len
,
addr_
);
...
@@ -263,49 +177,6 @@ int zmq::ipc_connecter_t::open ()
...
@@ -263,49 +177,6 @@ int zmq::ipc_connecter_t::open ()
zmq_assert
(
s
==
retired_fd
);
zmq_assert
(
s
==
retired_fd
);
struct
sockaddr
*
sa
=
(
struct
sockaddr
*
)
&
addr
;
struct
sockaddr
*
sa
=
(
struct
sockaddr
*
)
&
addr
;
if
(
AF_UNIX
!=
sa
->
sa_family
)
{
// Create the socket.
s
=
socket
(
sa
->
sa_family
,
SOCK_STREAM
,
IPPROTO_TCP
);
if
(
s
==
-
1
)
return
-
1
;
// Set to non-blocking mode.
#ifdef ZMQ_HAVE_OPENVMS
int
flags
=
1
;
int
rc
=
ioctl
(
s
,
FIONBIO
,
&
flags
);
errno_assert
(
rc
!=
-
1
);
#else
int
flags
=
fcntl
(
s
,
F_GETFL
,
0
);
if
(
flags
==
-
1
)
flags
=
0
;
int
rc
=
fcntl
(
s
,
F_SETFL
,
flags
|
O_NONBLOCK
);
errno_assert
(
rc
!=
-
1
);
#endif
// Connect to the remote peer.
rc
=
::
connect
(
s
,
(
struct
sockaddr
*
)
&
addr
,
addr_len
);
// Connect was successfull immediately.
if
(
rc
==
0
)
return
0
;
// Asynchronous connect was launched.
if
(
rc
==
-
1
&&
errno
==
EINPROGRESS
)
{
errno
=
EAGAIN
;
return
-
1
;
}
// Error occured.
int
err
=
errno
;
close
();
errno
=
err
;
return
-
1
;
}
#ifndef ZMQ_HAVE_OPENVMS
else
{
// Create the socket.
// Create the socket.
zmq_assert
(
AF_UNIX
==
sa
->
sa_family
);
zmq_assert
(
AF_UNIX
==
sa
->
sa_family
);
s
=
socket
(
AF_UNIX
,
SOCK_STREAM
,
0
);
s
=
socket
(
AF_UNIX
,
SOCK_STREAM
,
0
);
...
@@ -313,28 +184,16 @@ int zmq::ipc_connecter_t::open ()
...
@@ -313,28 +184,16 @@ int zmq::ipc_connecter_t::open ()
return
-
1
;
return
-
1
;
// Set the non-blocking flag.
// Set the non-blocking flag.
int
flag
=
fcntl
(
s
,
F_GETFL
,
0
);
unblock_socket
(
s
);
if
(
flag
==
-
1
)
flag
=
0
;
int
rc
=
fcntl
(
s
,
F_SETFL
,
flag
|
O_NONBLOCK
);
errno_assert
(
rc
!=
-
1
);
// Connect to the remote peer.
// Connect to the remote peer.
rc
=
::
connect
(
s
,
(
struct
sockaddr
*
)
&
addr
,
sizeof
(
sockaddr_un
));
int
rc
=
::
connect
(
s
,
(
struct
sockaddr
*
)
&
addr
,
sizeof
(
sockaddr_un
));
// Connect was successfull immediately.
// Connect was successfull immediately.
if
(
rc
==
0
)
if
(
rc
==
0
)
return
0
;
return
0
;
// Error occured.
// Forward the error.
int
err
=
errno
;
close
();
errno
=
err
;
return
-
1
;
}
#endif
zmq_assert
(
false
);
return
-
1
;
return
-
1
;
}
}
...
@@ -379,3 +238,4 @@ zmq::fd_t zmq::ipc_connecter_t::connect ()
...
@@ -379,3 +238,4 @@ zmq::fd_t zmq::ipc_connecter_t::connect ()
}
}
#endif
#endif
src/ipc_connecter.hpp
View file @
d5f3628a
...
@@ -21,6 +21,10 @@
...
@@ -21,6 +21,10 @@
#ifndef __IPC_CONNECTER_HPP_INCLUDED__
#ifndef __IPC_CONNECTER_HPP_INCLUDED__
#define __IPC_CONNECTER_HPP_INCLUDED__
#define __IPC_CONNECTER_HPP_INCLUDED__
#include "platform.hpp"
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
#include "fd.hpp"
#include "fd.hpp"
#include "ip.hpp"
#include "ip.hpp"
#include "own.hpp"
#include "own.hpp"
...
@@ -110,3 +114,6 @@ namespace zmq
...
@@ -110,3 +114,6 @@ namespace zmq
}
}
#endif
#endif
#endif
src/signaler.cpp
View file @
d5f3628a
...
@@ -71,8 +71,6 @@
...
@@ -71,8 +71,6 @@
#include "windows.hpp"
#include "windows.hpp"
#else
#else
#include <unistd.h>
#include <unistd.h>
#include <fcntl.h>
#include <limits.h>
#include <netinet/tcp.h>
#include <netinet/tcp.h>
#include <unistd.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/types.h>
...
@@ -86,22 +84,8 @@ zmq::signaler_t::signaler_t ()
...
@@ -86,22 +84,8 @@ zmq::signaler_t::signaler_t ()
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
// Set both fds to non-blocking mode.
// Set both fds to non-blocking mode.
#if defined ZMQ_HAVE_WINDOWS
unblock_socket
(
w
);
unsigned
long
argp
=
1
;
unblock_socket
(
r
);
rc
=
ioctlsocket
(
w
,
FIONBIO
,
&
argp
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
rc
=
ioctlsocket
(
r
,
FIONBIO
,
&
argp
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
#else
int
flags
=
fcntl
(
w
,
F_GETFL
,
0
);
errno_assert
(
flags
>=
0
);
rc
=
fcntl
(
w
,
F_SETFL
,
flags
|
O_NONBLOCK
);
errno_assert
(
rc
==
0
);
flags
=
fcntl
(
r
,
F_GETFL
,
0
);
errno_assert
(
flags
>=
0
);
rc
=
fcntl
(
r
,
F_SETFL
,
flags
|
O_NONBLOCK
);
errno_assert
(
rc
==
0
);
#endif
}
}
zmq
::
signaler_t
::~
signaler_t
()
zmq
::
signaler_t
::~
signaler_t
()
...
...
src/tcp_connecter.cpp
View file @
d5f3628a
...
@@ -177,8 +177,6 @@ int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
...
@@ -177,8 +177,6 @@ int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
return
this_interval
;
return
this_interval
;
}
}
#ifdef ZMQ_HAVE_WINDOWS
int
zmq
::
tcp_connecter_t
::
set_address
(
const
char
*
addr_
)
int
zmq
::
tcp_connecter_t
::
set_address
(
const
char
*
addr_
)
{
{
return
resolve_ip_hostname
(
&
addr
,
&
addr_len
,
addr_
);
return
resolve_ip_hostname
(
&
addr
,
&
addr_len
,
addr_
);
...
@@ -190,193 +188,96 @@ int zmq::tcp_connecter_t::open ()
...
@@ -190,193 +188,96 @@ int zmq::tcp_connecter_t::open ()
// Create the socket.
// Create the socket.
s
=
socket
(
addr
.
ss_family
,
SOCK_STREAM
,
IPPROTO_TCP
);
s
=
socket
(
addr
.
ss_family
,
SOCK_STREAM
,
IPPROTO_TCP
);
#ifdef ZMQ_HAVE_WINDOWS
if
(
s
==
INVALID_SOCKET
)
{
if
(
s
==
INVALID_SOCKET
)
{
wsa_error_to_errno
();
wsa_error_to_errno
();
return
-
1
;
return
-
1
;
}
}
#else
if
(
s
==
-
1
)
return
-
1
;
#endif
// Set to non-blocking mode.
// Set the socket to non-blocking mode so that we get async connect().
unsigned
long
argp
=
1
;
unblock_socket
(
s
);
int
rc
=
ioctlsocket
(
s
,
FIONBIO
,
&
argp
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
// Connect to the remote peer.
// Connect to the remote peer.
rc
=
::
connect
(
s
,
(
sockaddr
*
)
&
addr
,
addr_len
);
int
rc
=
::
connect
(
s
,
(
struct
sockaddr
*
)
&
addr
,
addr_len
);
// Connect was successfull immediately.
// Connect was successfull immediately.
if
(
rc
==
0
)
if
(
rc
==
0
)
return
0
;
return
0
;
// Asynchronous connect was launched.
// Asynchronous connect was launched.
#ifdef ZMQ_HAVE_WINDOWS
if
(
rc
==
SOCKET_ERROR
&&
(
WSAGetLastError
()
==
WSAEINPROGRESS
||
if
(
rc
==
SOCKET_ERROR
&&
(
WSAGetLastError
()
==
WSAEINPROGRESS
||
WSAGetLastError
()
==
WSAEWOULDBLOCK
))
{
WSAGetLastError
()
==
WSAEWOULDBLOCK
))
{
errno
=
EAGAIN
;
errno
=
EAGAIN
;
return
-
1
;
return
-
1
;
}
}
wsa_error_to_errno
();
wsa_error_to_errno
();
#else
if
(
rc
==
-
1
&&
errno
==
EINPROGRESS
)
{
errno
=
EAGAIN
;
return
-
1
;
}
#endif
return
-
1
;
return
-
1
;
}
int
zmq
::
tcp_connecter_t
::
close
()
{
zmq_assert
(
s
!=
retired_fd
);
int
rc
=
closesocket
(
s
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
s
=
retired_fd
;
return
0
;
}
}
zmq
::
fd_t
zmq
::
tcp_connecter_t
::
connect
()
zmq
::
fd_t
zmq
::
tcp_connecter_t
::
connect
()
{
{
//
Nonblocking
connect have finished. Check whether an error occured.
//
Async
connect have finished. Check whether an error occured.
int
err
=
0
;
int
err
=
0
;
socklen_t
len
=
sizeof
err
;
#if defined ZMQ_HAVE_HPUX
int
len
=
sizeof
(
err
);
#else
socklen_t
len
=
sizeof
(
err
);
#endif
int
rc
=
getsockopt
(
s
,
SOL_SOCKET
,
SO_ERROR
,
(
char
*
)
&
err
,
&
len
);
int
rc
=
getsockopt
(
s
,
SOL_SOCKET
,
SO_ERROR
,
(
char
*
)
&
err
,
&
len
);
// Assert if the error was caused by 0MQ bug.
// Networking problems are OK. No need to assert.
#ifdef ZMQ_HAVE_WINDOWS
zmq_assert
(
rc
==
0
);
zmq_assert
(
rc
==
0
);
if
(
err
!=
0
)
{
if
(
err
!=
0
)
{
// Assert that the error was caused by the networking problems
// rather than 0MQ bug.
if
(
err
==
WSAECONNREFUSED
||
err
==
WSAETIMEDOUT
||
if
(
err
==
WSAECONNREFUSED
||
err
==
WSAETIMEDOUT
||
err
==
WSAECONNABORTED
||
err
==
WSAEHOSTUNREACH
||
err
==
WSAECONNABORTED
||
err
==
WSAEHOSTUNREACH
||
err
==
WSAENETUNREACH
||
err
==
WSAENETDOWN
)
err
==
WSAENETUNREACH
||
err
==
WSAENETDOWN
)
return
retired_fd
;
return
retired_fd
;
wsa_assert_no
(
err
);
wsa_assert_no
(
err
);
}
}
// Return the newly connected socket.
fd_t
result
=
s
;
s
=
retired_fd
;
return
result
;
}
#else
#else
int
zmq
::
tcp_connecter_t
::
set_address
(
const
char
*
addr_
)
{
return
resolve_ip_hostname
(
&
addr
,
&
addr_len
,
addr_
);
}
int
zmq
::
tcp_connecter_t
::
open
()
{
zmq_assert
(
s
==
retired_fd
);
struct
sockaddr
*
sa
=
(
struct
sockaddr
*
)
&
addr
;
if
(
AF_UNIX
!=
sa
->
sa_family
)
{
// Create the socket.
s
=
socket
(
sa
->
sa_family
,
SOCK_STREAM
,
IPPROTO_TCP
);
if
(
s
==
-
1
)
return
-
1
;
// Set to non-blocking mode.
#ifdef ZMQ_HAVE_OPENVMS
int
flags
=
1
;
int
rc
=
ioctl
(
s
,
FIONBIO
,
&
flags
);
errno_assert
(
rc
!=
-
1
);
#else
int
flags
=
fcntl
(
s
,
F_GETFL
,
0
);
if
(
flags
==
-
1
)
flags
=
0
;
int
rc
=
fcntl
(
s
,
F_SETFL
,
flags
|
O_NONBLOCK
);
errno_assert
(
rc
!=
-
1
);
#endif
// Connect to the remote peer.
rc
=
::
connect
(
s
,
(
struct
sockaddr
*
)
&
addr
,
addr_len
);
// Connect was successfull immediately.
if
(
rc
==
0
)
return
0
;
// Asynchronous connect was launched.
if
(
rc
==
-
1
&&
errno
==
EINPROGRESS
)
{
errno
=
EAGAIN
;
return
-
1
;
}
// Error occured.
int
err
=
errno
;
close
();
errno
=
err
;
return
-
1
;
}
#ifndef ZMQ_HAVE_OPENVMS
else
{
// Create the socket.
zmq_assert
(
AF_UNIX
==
sa
->
sa_family
);
s
=
socket
(
AF_UNIX
,
SOCK_STREAM
,
0
);
if
(
s
==
-
1
)
return
-
1
;
// Set the non-blocking flag.
int
flag
=
fcntl
(
s
,
F_GETFL
,
0
);
if
(
flag
==
-
1
)
flag
=
0
;
int
rc
=
fcntl
(
s
,
F_SETFL
,
flag
|
O_NONBLOCK
);
errno_assert
(
rc
!=
-
1
);
// Connect to the remote peer.
rc
=
::
connect
(
s
,
(
struct
sockaddr
*
)
&
addr
,
sizeof
(
sockaddr_un
));
// Connect was successfull immediately.
if
(
rc
==
0
)
return
0
;
// Error occured.
int
err
=
errno
;
close
();
errno
=
err
;
return
-
1
;
}
#endif
zmq_assert
(
false
);
return
-
1
;
}
int
zmq
::
tcp_connecter_t
::
close
()
{
zmq_assert
(
s
!=
retired_fd
);
int
rc
=
::
close
(
s
);
if
(
rc
!=
0
)
return
-
1
;
s
=
retired_fd
;
return
0
;
}
zmq
::
fd_t
zmq
::
tcp_connecter_t
::
connect
()
{
// Following code should handle both Berkeley-derived socket
// Following code should handle both Berkeley-derived socket
// implementations and Solaris.
// implementations and Solaris.
int
err
=
0
;
#if defined ZMQ_HAVE_HPUX
int
len
=
sizeof
(
err
);
#else
socklen_t
len
=
sizeof
(
err
);
#endif
int
rc
=
getsockopt
(
s
,
SOL_SOCKET
,
SO_ERROR
,
(
char
*
)
&
err
,
&
len
);
if
(
rc
==
-
1
)
if
(
rc
==
-
1
)
err
=
errno
;
err
=
errno
;
if
(
err
!=
0
)
{
if
(
err
!=
0
)
{
// Assert if the error was caused by 0MQ bug.
// Networking problems are OK. No need to assert.
errno
=
err
;
errno
=
err
;
errno_assert
(
errno
==
ECONNREFUSED
||
errno
==
ECONNRESET
||
errno_assert
(
errno
==
ECONNREFUSED
||
errno
==
ECONNRESET
||
errno
==
ETIMEDOUT
||
errno
==
EHOSTUNREACH
||
errno
==
ETIMEDOUT
||
errno
==
EHOSTUNREACH
||
errno
==
ENETUNREACH
||
errno
==
ENETDOWN
);
errno
==
ENETUNREACH
||
errno
==
ENETDOWN
);
return
retired_fd
;
return
retired_fd
;
}
}
#endif
// Return the newly connected socket.
fd_t
result
=
s
;
fd_t
result
=
s
;
s
=
retired_fd
;
s
=
retired_fd
;
return
result
;
return
result
;
}
}
void
zmq
::
tcp_connecter_t
::
close
()
{
zmq_assert
(
s
!=
retired_fd
);
#ifdef ZMQ_HAVE_WINDOWS
int
rc
=
closesocket
(
s
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
#else
int
rc
=
::
close
(
s
);
errno_assert
(
rc
==
0
);
#endif
#endif
s
=
retired_fd
;
}
src/tcp_connecter.hpp
View file @
d5f3628a
...
@@ -74,7 +74,7 @@ namespace zmq
...
@@ -74,7 +74,7 @@ namespace zmq
int
open
();
int
open
();
// Close the connecting socket.
// Close the connecting socket.
int
close
();
void
close
();
// Get the file descriptor of newly created connection. Returns
// Get the file descriptor of newly created connection. Returns
// retired_fd if the connection was unsuccessfull.
// retired_fd if the connection was unsuccessfull.
...
...
src/tcp_engine.cpp
View file @
d5f3628a
...
@@ -39,6 +39,7 @@
...
@@ -39,6 +39,7 @@
#include "session.hpp"
#include "session.hpp"
#include "config.hpp"
#include "config.hpp"
#include "err.hpp"
#include "err.hpp"
#include "ip.hpp"
zmq
::
tcp_engine_t
::
tcp_engine_t
(
fd_t
fd_
,
const
options_t
&
options_
)
:
zmq
::
tcp_engine_t
::
tcp_engine_t
(
fd_t
fd_
,
const
options_t
&
options_
)
:
s
(
fd_
),
s
(
fd_
),
...
@@ -53,28 +54,12 @@ zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) :
...
@@ -53,28 +54,12 @@ zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) :
options
(
options_
),
options
(
options_
),
plugged
(
false
)
plugged
(
false
)
{
{
int
rc
;
// Get the socket into non-blocking mode.
unblock_socket
(
s
);
// Set the socket to the non-blocking mode.
#ifdef ZMQ_HAVE_WINDOWS
u_long
nonblock
=
1
;
rc
=
ioctlsocket
(
s
,
FIONBIO
,
&
nonblock
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
#elif ZMQ_HAVE_OPENVMS
int
nonblock
=
1
;
rc
=
ioctl
(
s
,
FIONBIO
,
&
nonblock
);
errno_assert
(
rc
!=
-
1
);
#else
int
flags
=
fcntl
(
s
,
F_GETFL
,
0
);
if
(
flags
==
-
1
)
flags
=
0
;
rc
=
fcntl
(
s
,
F_SETFL
,
flags
|
O_NONBLOCK
);
errno_assert
(
rc
!=
-
1
);
#endif
// Set the socket buffer limits for the underlying socket.
// Set the socket buffer limits for the underlying socket.
if
(
options
.
sndbuf
)
{
if
(
options
.
sndbuf
)
{
rc
=
setsockopt
(
s
,
SOL_SOCKET
,
SO_SNDBUF
,
int
rc
=
setsockopt
(
s
,
SOL_SOCKET
,
SO_SNDBUF
,
(
char
*
)
&
options
.
sndbuf
,
sizeof
(
int
));
(
char
*
)
&
options
.
sndbuf
,
sizeof
(
int
));
#ifdef ZMQ_HAVE_WINDOWS
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert
(
rc
!=
SOCKET_ERROR
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
...
@@ -83,7 +68,7 @@ zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) :
...
@@ -83,7 +68,7 @@ zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) :
#endif
#endif
}
}
if
(
options
.
rcvbuf
)
{
if
(
options
.
rcvbuf
)
{
rc
=
setsockopt
(
s
,
SOL_SOCKET
,
SO_RCVBUF
,
int
rc
=
setsockopt
(
s
,
SOL_SOCKET
,
SO_RCVBUF
,
(
char
*
)
&
options
.
rcvbuf
,
sizeof
(
int
));
(
char
*
)
&
options
.
rcvbuf
,
sizeof
(
int
));
#ifdef ZMQ_HAVE_WINDOWS
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert
(
rc
!=
SOCKET_ERROR
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
...
@@ -96,7 +81,7 @@ zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) :
...
@@ -96,7 +81,7 @@ zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) :
// Make sure that SIGPIPE signal is not generated when writing to a
// Make sure that SIGPIPE signal is not generated when writing to a
// connection that was already closed by the peer.
// connection that was already closed by the peer.
int
set
=
1
;
int
set
=
1
;
rc
=
setsockopt
(
s
,
SOL_SOCKET
,
SO_NOSIGPIPE
,
&
set
,
sizeof
(
int
));
int
rc
=
setsockopt
(
s
,
SOL_SOCKET
,
SO_NOSIGPIPE
,
&
set
,
sizeof
(
int
));
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
#endif
#endif
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment