Commit f06ca69a authored by Martin Hurton's avatar Martin Hurton

Add support for SOCKS proxies

This is still raw and experimental.
To connect through a SOCKS proxy, set ZMQ_SOCKS_PROXY socket option on
socket before issuing a connect call, e.g.:

    zmq_setsockopt (s, ZMQ_SOCKS_PROXY,
        "127.0.0.1:22222", strlen ("127.0.0.1:22222"));
    zmq_connect (s, "tcp://127.0.0.1:5555");

Known limitations:
- only SOCKS version 5 supported
- authentication not supported
- new option is still undocumented
parent 8b801972
......@@ -302,6 +302,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
#define ZMQ_GSSAPI_PLAINTEXT 65
#define ZMQ_HANDSHAKE_IVL 66
#define ZMQ_IDENTITY_FD 67
#define ZMQ_SOCKS_PROXY 68
/* Message options */
#define ZMQ_MORE 1
......
......@@ -78,6 +78,8 @@ libzmq_la_SOURCES = \
session_base.hpp \
signaler.hpp \
socket_base.hpp \
socks.hpp \
socks_connecter.hpp \
stdint.hpp \
stream.hpp \
stream_engine.hpp \
......@@ -149,6 +151,8 @@ libzmq_la_SOURCES = \
session_base.cpp \
signaler.cpp \
socket_base.cpp \
socks.cpp \
socks_connecter.cpp \
stream.cpp \
stream_engine.cpp \
sub.cpp \
......
......@@ -206,6 +206,19 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
}
break;
case ZMQ_SOCKS_PROXY:
if (optval_ == NULL && optvallen_ == 0) {
socks_proxy_address.clear ();
return 0;
}
else
if (optval_ != NULL && optvallen_ > 0 ) {
socks_proxy_address =
std::string ((const char *) optval_, optvallen_);
return 0;
}
break;
case ZMQ_TCP_KEEPALIVE:
if (is_int && (value == -1 || value == 0 || value == 1)) {
tcp_keepalive = value;
......@@ -618,6 +631,14 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
}
break;
case ZMQ_SOCKS_PROXY:
if (*optvallen_ >= socks_proxy_address.size () + 1) {
memcpy (optval_, socks_proxy_address.c_str (), socks_proxy_address.size () + 1);
*optvallen_ = socks_proxy_address.size () + 1;
return 0;
}
break;
case ZMQ_TCP_KEEPALIVE:
if (is_int) {
*value = tcp_keepalive;
......
......@@ -114,6 +114,9 @@ namespace zmq
// if true, router socket accepts non-zmq tcp connections
bool raw_sock;
// Addres of SOCKS proxy
std::string socks_proxy_address;
// TCP keep-alive settings.
// Defaults to -1 = do not change socket options
int tcp_keepalive;
......
......@@ -25,6 +25,7 @@
#include "tcp_connecter.hpp"
#include "ipc_connecter.hpp"
#include "tipc_connecter.hpp"
#include "socks_connecter.hpp"
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
#include "address.hpp"
......@@ -497,10 +498,22 @@ void zmq::session_base_t::start_connecting (bool wait_)
// Create the connecter object.
if (addr->protocol == "tcp") {
tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t (
io_thread, this, options, addr, wait_);
alloc_assert (connecter);
launch_child (connecter);
if (options.socks_proxy_address != "") {
address_t *proxy_address = new (std::nothrow)
address_t ("tcp", options.socks_proxy_address);
alloc_assert (proxy_address);
socks_connecter_t *connecter =
new (std::nothrow) socks_connecter_t (
io_thread, this, options, addr, proxy_address, wait_);
alloc_assert (connecter);
launch_child (connecter);
}
else {
tcp_connecter_t *connecter = new (std::nothrow)
tcp_connecter_t (io_thread, this, options, addr, wait_);
alloc_assert (connecter);
launch_child (connecter);
}
return;
}
......
/*
Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include "err.hpp"
#include "platform.hpp"
#include "socks.hpp"
#include "tcp.hpp"
zmq::socks_greeting_t::socks_greeting_t (uint8_t method_) :
num_methods (1)
{
methods [0] = method_;
}
zmq::socks_greeting_t::socks_greeting_t (
uint8_t *methods_, size_t num_methods_)
: num_methods (num_methods_)
{
zmq_assert (num_methods_ <= 255);
for (size_t i = 0; i < num_methods_; i++)
methods [i] = methods_ [i];
}
zmq::socks_greeting_encoder_t::socks_greeting_encoder_t ()
: bytes_encoded (0), bytes_written (0)
{}
void zmq::socks_greeting_encoder_t::encode (const socks_greeting_t &greeting_)
{
uint8_t *ptr = buf;
*ptr++ = 0x05;
*ptr++ = greeting_.num_methods;
for (size_t i = 0; i < greeting_.num_methods; i++)
*ptr++ = greeting_.methods [i];
bytes_encoded = 2 + greeting_.num_methods;
bytes_written = 0;
}
int zmq::socks_greeting_encoder_t::output (fd_t fd_)
{
const int rc = tcp_write (
fd_, buf + bytes_written, bytes_encoded - bytes_written);
if (rc > 0)
bytes_written += static_cast <size_t> (rc);
return rc;
}
bool zmq::socks_greeting_encoder_t::has_pending_data () const
{
return bytes_written < bytes_encoded;
}
void zmq::socks_greeting_encoder_t::reset ()
{
bytes_encoded = bytes_written = 0;
}
zmq::socks_choice_t::socks_choice_t (unsigned char method_)
: method (method_)
{}
zmq::socks_choice_decoder_t::socks_choice_decoder_t ()
: bytes_read (0)
{}
int zmq::socks_choice_decoder_t::input (fd_t fd_)
{
zmq_assert (bytes_read < 2);
const int rc = tcp_read (fd_, buf + bytes_read, 2 - bytes_read);
if (rc > 0) {
bytes_read += static_cast <size_t> (rc);
if (buf [0] != 0x05)
return -1;
}
return rc;
}
bool zmq::socks_choice_decoder_t::message_ready () const
{
return bytes_read == 2;
}
zmq::socks_choice_t zmq::socks_choice_decoder_t::decode ()
{
zmq_assert (message_ready ());
return socks_choice_t (buf [1]);
}
void zmq::socks_choice_decoder_t::reset ()
{
bytes_read = 0;
}
zmq::socks_request_t::socks_request_t (
uint8_t command_, std::string hostname_, uint16_t port_)
: command (command_), hostname (hostname_), port (port_)
{}
zmq::socks_request_encoder_t::socks_request_encoder_t ()
: bytes_encoded (0), bytes_written (0)
{}
void zmq::socks_request_encoder_t::encode (const socks_request_t &req)
{
unsigned char *ptr = buf;
*ptr++ = 0x05;
*ptr++ = req.command;
*ptr++ = 0x00;
#if defined ZMQ_HAVE_OPENVMS && defined __ia64 && __INITIAL_POINTER_SIZE == 64
__addrinfo64 hints, *res = NULL;
#else
addrinfo hints, *res = NULL;
#endif
memset (&hints, 0, sizeof hints);
// Suppress potential DNS lookups.
hints.ai_flags = AI_NUMERICHOST;
const int rc = getaddrinfo (req.hostname.c_str (), NULL, &hints, &res);
if (rc == 0 && res->ai_family == AF_INET) {
struct sockaddr_in *sockaddr_in =
reinterpret_cast <struct sockaddr_in *> (res->ai_addr);
*ptr++ = 0x01;
memcpy (ptr, &sockaddr_in->sin_addr, 4);
ptr += 4;
}
else
if (rc == 0 && res->ai_family == AF_INET6) {
struct sockaddr_in6 *sockaddr_in6 =
reinterpret_cast <struct sockaddr_in6 *> (res->ai_addr);
*ptr++ = 0x04;
memcpy (ptr, &sockaddr_in6->sin6_addr, 16);
ptr += 16;
}
else {
*ptr++ = 0x03;
*ptr++ = req.hostname.size ();
memcpy (ptr, req.hostname.c_str (), req.hostname.size ());
ptr += req.hostname.size ();
}
if (rc == 0)
freeaddrinfo (res);
*ptr++ = req.port / 256;
*ptr++ = req.port % 256;
bytes_encoded = ptr - buf;
bytes_written = 0;
}
int zmq::socks_request_encoder_t::output (fd_t fd_)
{
const int rc = tcp_write (
fd_, buf + bytes_written, bytes_encoded - bytes_written);
if (rc > 0)
bytes_written += static_cast <size_t> (rc);
return rc;
}
bool zmq::socks_request_encoder_t::has_pending_data () const
{
return bytes_written < bytes_encoded;
}
void zmq::socks_request_encoder_t::reset ()
{
bytes_encoded = bytes_written = 0;
}
zmq::socks_response_t::socks_response_t (
uint8_t response_code_, std::string address_, uint16_t port_)
: response_code (response_code_), address (address_), port (port_)
{}
zmq::socks_response_decoder_t::socks_response_decoder_t ()
: bytes_read (0)
{}
int zmq::socks_response_decoder_t::input (fd_t fd_)
{
size_t n = 0;
if (bytes_read < 5)
n = 5 - bytes_read;
else {
const uint8_t atyp = buf [3];
zmq_assert (atyp == 0x01 || atyp == 0x03 || atyp == 0x04);
if (atyp == 0x01)
n = 3 + 2;
else
if (atyp == 0x03)
n = buf [4] + 2;
else
if (atyp == 0x04)
n = 15 + 2;
}
const int rc = tcp_read (fd_, buf + bytes_read, n);
if (rc > 0) {
bytes_read += static_cast <size_t> (rc);
if (buf [0] != 0x05)
return -1;
if (bytes_read >= 2)
if (buf [1] > 0x08)
return -1;
if (bytes_read >= 3)
if (buf [2] != 0x00)
return -1;
if (bytes_read >= 4) {
const uint8_t atyp = buf [3];
if (atyp != 0x01 && atyp != 0x03 && atyp != 0x04)
return -1;
}
}
return rc;
}
bool zmq::socks_response_decoder_t::message_ready () const
{
if (bytes_read < 4)
return false;
else {
const uint8_t atyp = buf [3];
zmq_assert (atyp == 0x01 || atyp == 0x03 || atyp == 0x04);
if (atyp == 0x01)
return bytes_read == 10;
else
if (atyp == 0x03)
return bytes_read > 4 && bytes_read == 4 + 1 + buf [4] + 2u;
else
return bytes_read == 22;
}
}
zmq::socks_response_t zmq::socks_response_decoder_t::decode ()
{
zmq_assert (message_ready ());
return socks_response_t (buf [1], "", 0);
}
void zmq::socks_response_decoder_t::reset ()
{
bytes_read = 0;
}
/*
Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_SOCKS_HPP_INCLUDED__
#define __ZMQ_SOCKS_HPP_INCLUDED__
#include <string>
#include "fd.hpp"
#include "stdint.hpp"
namespace zmq
{
struct socks_greeting_t
{
socks_greeting_t (uint8_t method);
socks_greeting_t (uint8_t *methods_, size_t num_methods_);
uint8_t methods [255];
const size_t num_methods;
};
class socks_greeting_encoder_t
{
public:
socks_greeting_encoder_t ();
void encode (const socks_greeting_t &greeting_);
int output (fd_t fd_);
bool has_pending_data () const;
void reset ();
private:
size_t bytes_encoded;
size_t bytes_written;
uint8_t buf [2 + 255];
};
struct socks_choice_t
{
socks_choice_t (uint8_t method_);
uint8_t method;
};
class socks_choice_decoder_t
{
public:
socks_choice_decoder_t ();
int input (fd_t fd_);
bool message_ready () const;
socks_choice_t decode ();
void reset ();
private:
unsigned char buf [2];
size_t bytes_read;
};
struct socks_request_t
{
socks_request_t (
uint8_t command_, std::string hostname_, uint16_t port_);
const uint8_t command;
const std::string hostname;
const uint16_t port;
};
class socks_request_encoder_t
{
public:
socks_request_encoder_t ();
void encode (const socks_request_t &req);
int output (fd_t fd_);
bool has_pending_data () const;
void reset ();
private:
size_t bytes_encoded;
size_t bytes_written;
uint8_t buf [4 + 256 + 2];
};
struct socks_response_t
{
socks_response_t (
uint8_t response_code_, std::string address_, uint16_t port_);
uint8_t response_code;
std::string address;
uint16_t port;
};
class socks_response_decoder_t
{
public:
socks_response_decoder_t ();
int input (fd_t fd_);
bool message_ready () const;
socks_response_t decode ();
void reset ();
private:
uint8_t buf [4 + 256 + 2];
size_t bytes_read;
};
}
#endif
This diff is collapsed.
/*
Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __SOCKS_CONNECTER_HPP_INCLUDED__
#define __SOCKS_CONNECTER_HPP_INCLUDED__
#include "fd.hpp"
#include "io_object.hpp"
#include "own.hpp"
#include "stdint.hpp"
#include "../include/zmq.h"
#include "socks.hpp"
namespace zmq
{
class io_thread_t;
class session_base_t;
struct address_t;
class socks_connecter_t : public own_t, public io_object_t
{
public:
// If 'delayed_start' is true connecter first waits for a while,
// then starts connection process.
socks_connecter_t (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_, const options_t &options_,
address_t *addr_, address_t *proxy_addr_, bool delayed_start_);
~socks_connecter_t ();
private:
enum {
unplugged,
waiting_for_reconnect_time,
waiting_for_proxy_connection,
sending_greeting,
waiting_for_choice,
sending_request,
waiting_for_response
};
// ID of the timer used to delay the reconnection.
enum { reconnect_timer_id = 1 };
// Method ID
enum { socks_no_auth_required = 0 };
// Handlers for incoming commands.
virtual void process_plug ();
virtual void process_term (int linger_);
// Handlers for I/O events.
virtual void in_event ();
virtual void out_event ();
virtual void timer_event (int id_);
// Internal function to start the actual connection establishment.
void initiate_connect ();
int process_server_response (const socks_choice_t &response);
int process_server_response (const socks_response_t &response);
int parse_address (const std::string &address_,
std::string &hostname_, uint16_t &port_);
int connect_to_proxy ();
void error ();
// Internal function to start reconnect timer
void start_timer ();
// Internal function to return a reconnect backoff delay.
// Will modify the current_reconnect_ivl used for next call
// Returns the currently used interval
int get_new_reconnect_ivl ();
// Open TCP connecting socket. Returns -1 in case of error,
// 0 if connect was successfull immediately. Returns -1 with
// EAGAIN errno if async connect was launched.
int open ();
// Close the connecting socket.
void close ();
// Get the file descriptor of newly created connection. Returns
// retired_fd if the connection was unsuccessfull.
int check_proxy_connection ();
socks_greeting_encoder_t greeting_encoder;
socks_choice_decoder_t choice_decoder;
socks_request_encoder_t request_encoder;
socks_response_decoder_t response_decoder;
// Address to connect to. Owned by session_base_t.
address_t *addr;
address_t *proxy_addr;
int status;
// Underlying socket.
fd_t s;
// Handle corresponding to the listening socket.
handle_t handle;
// If true file descriptor is registered with the poller and 'handle'
// contains valid value.
bool handle_valid;
// If true, connecter is waiting a while before trying to connect.
const bool delayed_start;
// True iff a timer has been started.
bool timer_started;
// Reference to the session we belong to.
zmq::session_base_t *session;
// Current reconnect ivl, updated for backoff strategy
int current_reconnect_ivl;
// String representation of endpoint to connect to
std::string endpoint;
// Socket
zmq::socket_base_t *socket;
socks_connecter_t (const socks_connecter_t&);
const socks_connecter_t &operator = (const socks_connecter_t&);
};
}
#endif
......@@ -54,6 +54,7 @@
#include "config.hpp"
#include "err.hpp"
#include "ip.hpp"
#include "tcp.hpp"
#include "likely.hpp"
#include "wire.hpp"
......@@ -272,7 +273,7 @@ void zmq::stream_engine_t::in_event ()
size_t bufsize = 0;
decoder->get_buffer (&inpos, &bufsize);
int const rc = read (inpos, bufsize);
const int rc = tcp_read (s, inpos, bufsize);
if (rc == 0) {
error (connection_error);
return;
......@@ -359,7 +360,7 @@ void zmq::stream_engine_t::out_event ()
// arbitrarily large. However, we assume that underlying TCP layer has
// limited transmission buffer and thus the actual number of bytes
// written should be reasonably modest.
int nbytes = write (outpos, outsize);
const int nbytes = tcp_write (s, outpos, outsize);
// IO error has occurred. We stop waiting for output events.
// The engine is not terminated until we detect input error;
......@@ -448,8 +449,8 @@ bool zmq::stream_engine_t::handshake ()
zmq_assert (greeting_bytes_read < greeting_size);
// Receive the greeting.
while (greeting_bytes_read < greeting_size) {
const int n = read (greeting_recv + greeting_bytes_read,
greeting_size - greeting_bytes_read);
const int n = tcp_read (s, greeting_recv + greeting_bytes_read,
greeting_size - greeting_bytes_read);
if (n == 0) {
error (connection_error);
return false;
......@@ -892,107 +893,6 @@ void zmq::stream_engine_t::error (error_reason_t reason)
delete this;
}
int zmq::stream_engine_t::write (const void *data_, size_t size_)
{
#ifdef ZMQ_HAVE_WINDOWS
int nbytes = send (s, (char*) data_, (int) size_, 0);
// If not a single byte can be written to the socket in non-blocking mode
// we'll get an error (this may happen during the speculative write).
if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK)
return 0;
// Signalise peer failure.
if (nbytes == SOCKET_ERROR && (
WSAGetLastError () == WSAENETDOWN ||
WSAGetLastError () == WSAENETRESET ||
WSAGetLastError () == WSAEHOSTUNREACH ||
WSAGetLastError () == WSAECONNABORTED ||
WSAGetLastError () == WSAETIMEDOUT ||
WSAGetLastError () == WSAECONNRESET))
return -1;
wsa_assert (nbytes != SOCKET_ERROR);
return nbytes;
#else
ssize_t nbytes = send (s, data_, size_, 0);
// Several errors are OK. When speculative write is being done we may not
// be able to write a single byte from the socket. Also, SIGSTOP issued
// by a debugging tool can result in EINTR error.
if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
errno == EINTR))
return 0;
// Signalise peer failure.
if (nbytes == -1) {
errno_assert (errno != EACCES
&& errno != EBADF
&& errno != EDESTADDRREQ
&& errno != EFAULT
&& errno != EINVAL
&& errno != EISCONN
&& errno != EMSGSIZE
&& errno != ENOMEM
&& errno != ENOTSOCK
&& errno != EOPNOTSUPP);
return -1;
}
return static_cast <int> (nbytes);
#endif
}
int zmq::stream_engine_t::read (void *data_, size_t size_)
{
#ifdef ZMQ_HAVE_WINDOWS
const int rc = recv (s, (char*) data_, (int) size_, 0);
// If not a single byte can be read from the socket in non-blocking mode
// we'll get an error (this may happen during the speculative read).
if (rc == SOCKET_ERROR) {
if (WSAGetLastError () == WSAEWOULDBLOCK)
errno = EAGAIN;
else {
wsa_assert (WSAGetLastError () == WSAENETDOWN
|| WSAGetLastError () == WSAENETRESET
|| WSAGetLastError () == WSAECONNABORTED
|| WSAGetLastError () == WSAETIMEDOUT
|| WSAGetLastError () == WSAECONNRESET
|| WSAGetLastError () == WSAECONNREFUSED
|| WSAGetLastError () == WSAENOTCONN);
errno = wsa_error_to_errno (WSAGetLastError ());
}
}
return rc == SOCKET_ERROR? -1: rc;
#else
const ssize_t rc = recv (s, data_, size_, 0);
// Several errors are OK. When speculative read is being done we may not
// be able to read a single byte from the socket. Also, SIGSTOP issued
// by a debugging tool can result in EINTR error.
if (rc == -1) {
errno_assert (errno != EBADF
&& errno != EFAULT
&& errno != EINVAL
&& errno != ENOMEM
&& errno != ENOTSOCK);
if (errno == EWOULDBLOCK || errno == EINTR)
errno = EAGAIN;
}
return static_cast <int> (rc);
#endif
}
void zmq::stream_engine_t::set_handshake_timer ()
{
zmq_assert (!has_handshake_timer);
......
......@@ -92,16 +92,6 @@ namespace zmq
// Detects the protocol used by the peer.
bool handshake ();
// Writes data to the socket. Returns the number of bytes actually
// written (even zero is to be considered to be a success). In case
// of error or orderly shutdown by the other peer -1 is returned.
int write (const void *data_, size_t size_);
// Reads data from the socket (up to 'size' bytes).
// Returns the number of bytes actually read or -1 on error.
// Zero indicates the peer has closed the connection.
int read (void *data_, size_t size_);
int identity_msg (msg_t *msg_);
int process_identity_msg (msg_t *msg_);
......
......@@ -141,3 +141,104 @@ void zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, int
#endif // ZMQ_HAVE_SO_KEEPALIVE
#endif // ZMQ_HAVE_WINDOWS
}
int zmq::tcp_write (fd_t s_, const void *data_, size_t size_)
{
#ifdef ZMQ_HAVE_WINDOWS
int nbytes = send (s_, (char*) data_, (int) size_, 0);
// If not a single byte can be written to the socket in non-blocking mode
// we'll get an error (this may happen during the speculative write).
if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK)
return 0;
// Signalise peer failure.
if (nbytes == SOCKET_ERROR && (
WSAGetLastError () == WSAENETDOWN ||
WSAGetLastError () == WSAENETRESET ||
WSAGetLastError () == WSAEHOSTUNREACH ||
WSAGetLastError () == WSAECONNABORTED ||
WSAGetLastError () == WSAETIMEDOUT ||
WSAGetLastError () == WSAECONNRESET))
return -1;
wsa_assert (nbytes != SOCKET_ERROR);
return nbytes;
#else
ssize_t nbytes = send (s_, data_, size_, 0);
// Several errors are OK. When speculative write is being done we may not
// be able to write a single byte from the socket. Also, SIGSTOP issued
// by a debugging tool can result in EINTR error.
if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
errno == EINTR))
return 0;
// Signalise peer failure.
if (nbytes == -1) {
errno_assert (errno != EACCES
&& errno != EBADF
&& errno != EDESTADDRREQ
&& errno != EFAULT
&& errno != EINVAL
&& errno != EISCONN
&& errno != EMSGSIZE
&& errno != ENOMEM
&& errno != ENOTSOCK
&& errno != EOPNOTSUPP);
return -1;
}
return static_cast <int> (nbytes);
#endif
}
int zmq::tcp_read (fd_t s_, void *data_, size_t size_)
{
#ifdef ZMQ_HAVE_WINDOWS
const int rc = recv (s_, (char*) data_, (int) size_, 0);
// If not a single byte can be read from the socket in non-blocking mode
// we'll get an error (this may happen during the speculative read).
if (rc == SOCKET_ERROR) {
if (WSAGetLastError () == WSAEWOULDBLOCK)
errno = EAGAIN;
else {
wsa_assert (WSAGetLastError () == WSAENETDOWN
|| WSAGetLastError () == WSAENETRESET
|| WSAGetLastError () == WSAECONNABORTED
|| WSAGetLastError () == WSAETIMEDOUT
|| WSAGetLastError () == WSAECONNRESET
|| WSAGetLastError () == WSAECONNREFUSED
|| WSAGetLastError () == WSAENOTCONN);
errno = wsa_error_to_errno (WSAGetLastError ());
}
}
return rc == SOCKET_ERROR? -1: rc;
#else
const ssize_t rc = recv (s_, data_, size_, 0);
// Several errors are OK. When speculative read is being done we may not
// be able to read a single byte from the socket. Also, SIGSTOP issued
// by a debugging tool can result in EINTR error.
if (rc == -1) {
errno_assert (errno != EBADF
&& errno != EFAULT
&& errno != EINVAL
&& errno != ENOMEM
&& errno != ENOTSOCK);
if (errno == EWOULDBLOCK || errno == EINTR)
errno = EAGAIN;
}
return static_cast <int> (rc);
#endif
}
......@@ -37,6 +37,16 @@ namespace zmq
// Tunes TCP keep-alives
void tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, int keepalive_idle_, int keepalive_intvl_);
// Writes data to the socket. Returns the number of bytes actually
// written (even zero is to be considered to be a success). In case
// of error or orderly shutdown by the other peer -1 is returned.
int tcp_write (fd_t s_, const void *data_, size_t size_);
// Reads data from the socket (up to 'size' bytes).
// Returns the number of bytes actually read or -1 on error.
// Zero indicates the peer has closed the connection.
int tcp_read (fd_t s_, void *data_, size_t size_);
}
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment