Commit bfbe556e authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #250 from gimaker/connect-assert

Resolve addresses in the calling thread on connect.
parents 2f44faa7 b9fb48f4
...@@ -6,6 +6,7 @@ pkgconfig_DATA = libzmq.pc ...@@ -6,6 +6,7 @@ pkgconfig_DATA = libzmq.pc
include_HEADERS = ../include/zmq.h ../include/zmq_utils.h include_HEADERS = ../include/zmq.h ../include/zmq_utils.h
libzmq_la_SOURCES = \ libzmq_la_SOURCES = \
address.hpp \
array.hpp \ array.hpp \
atomic_counter.hpp \ atomic_counter.hpp \
atomic_ptr.hpp \ atomic_ptr.hpp \
...@@ -76,6 +77,7 @@ libzmq_la_SOURCES = \ ...@@ -76,6 +77,7 @@ libzmq_la_SOURCES = \
xsub.hpp \ xsub.hpp \
ypipe.hpp \ ypipe.hpp \
yqueue.hpp \ yqueue.hpp \
address.cpp \
clock.cpp \ clock.cpp \
ctx.cpp \ ctx.cpp \
decoder.cpp \ decoder.cpp \
......
/*
Copyright (c) 2012 Spotify AB
Copyright (c) 2012 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "address.hpp"
#include "err.hpp"
#include "tcp_address.hpp"
#include "ipc_address.hpp"
#include <string.h>
zmq::address_t::address_t (
const std::string &protocol_, const std::string &address_)
: protocol (protocol_),
address (address_)
{
memset (&resolved, 0, sizeof (resolved));
}
zmq::address_t::~address_t ()
{
if (protocol == "tcp") {
if (resolved.tcp_addr) {
delete resolved.tcp_addr;
resolved.tcp_addr = 0;
}
}
else if (protocol == "ipc") {
if (resolved.ipc_addr) {
delete resolved.ipc_addr;
resolved.ipc_addr = 0;
}
}
}
/*
Copyright (c) 2012 Spotify AB
Copyright (c) 2012 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_ADDRESS_HPP_INCLUDED__
#define __ZMQ_ADDRESS_HPP_INCLUDED__
#include <string>
namespace zmq
{
class tcp_address_t;
class ipc_address_t;
struct address_t {
address_t (const std::string &protocol_, const std::string &address_);
~address_t ();
const std::string protocol;
const std::string address;
// Protocol specific resolved address
union {
tcp_address_t *tcp_addr;
ipc_address_t *ipc_addr;
} resolved;
};
}
#endif
...@@ -47,12 +47,12 @@ int zmq::ipc_address_t::resolve (const char *path_) ...@@ -47,12 +47,12 @@ int zmq::ipc_address_t::resolve (const char *path_)
return 0; return 0;
} }
sockaddr *zmq::ipc_address_t::addr () const sockaddr *zmq::ipc_address_t::addr () const
{ {
return (sockaddr*) &address; return (sockaddr*) &address;
} }
socklen_t zmq::ipc_address_t::addrlen () socklen_t zmq::ipc_address_t::addrlen () const
{ {
return (socklen_t) sizeof (address); return (socklen_t) sizeof (address);
} }
......
...@@ -41,8 +41,8 @@ namespace zmq ...@@ -41,8 +41,8 @@ namespace zmq
// This function sets up the address for UNIX domain transport. // This function sets up the address for UNIX domain transport.
int resolve (const char* path_); int resolve (const char* path_);
sockaddr *addr (); const sockaddr *addr () const;
socklen_t addrlen (); socklen_t addrlen () const;
private: private:
......
...@@ -31,6 +31,8 @@ ...@@ -31,6 +31,8 @@
#include "random.hpp" #include "random.hpp"
#include "err.hpp" #include "err.hpp"
#include "ip.hpp" #include "ip.hpp"
#include "address.hpp"
#include "ipc_address.hpp"
#include <unistd.h> #include <unistd.h>
#include <sys/types.h> #include <sys/types.h>
...@@ -39,19 +41,18 @@ ...@@ -39,19 +41,18 @@
zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_, zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
class session_base_t *session_, const options_t &options_, class session_base_t *session_, const options_t &options_,
const char *address_, bool wait_) : const address_t *addr_, bool wait_) :
own_t (io_thread_, options_), own_t (io_thread_, options_),
io_object_t (io_thread_), io_object_t (io_thread_),
addr (addr_),
s (retired_fd), s (retired_fd),
handle_valid (false), handle_valid (false),
wait (wait_), wait (wait_),
session (session_), session (session_),
current_reconnect_ivl(options.reconnect_ivl) current_reconnect_ivl(options.reconnect_ivl)
{ {
// TODO: set_addess should be called separately, so that the error zmq_assert (addr);
// can be propagated. zmq_assert (addr->protocol == "ipc");
int rc = set_address (address_);
zmq_assert (rc == 0);
} }
zmq::ipc_connecter_t::~ipc_connecter_t () zmq::ipc_connecter_t::~ipc_connecter_t ()
...@@ -165,11 +166,6 @@ int zmq::ipc_connecter_t::get_new_reconnect_ivl () ...@@ -165,11 +166,6 @@ int zmq::ipc_connecter_t::get_new_reconnect_ivl ()
return this_interval; return this_interval;
} }
int zmq::ipc_connecter_t::set_address (const char *addr_)
{
return address.resolve (addr_);
}
int zmq::ipc_connecter_t::open () int zmq::ipc_connecter_t::open ()
{ {
zmq_assert (s == retired_fd); zmq_assert (s == retired_fd);
...@@ -183,7 +179,9 @@ int zmq::ipc_connecter_t::open () ...@@ -183,7 +179,9 @@ int zmq::ipc_connecter_t::open ()
unblock_socket (s); unblock_socket (s);
// Connect to the remote peer. // Connect to the remote peer.
int rc = ::connect (s, address.addr (), address.addrlen ()); int rc = ::connect (
s, addr->resolved.ipc_addr->addr (),
addr->resolved.ipc_addr->addrlen ());
// Connect was successfull immediately. // Connect was successfull immediately.
if (rc == 0) if (rc == 0)
......
...@@ -29,13 +29,13 @@ ...@@ -29,13 +29,13 @@
#include "own.hpp" #include "own.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "io_object.hpp" #include "io_object.hpp"
#include "ipc_address.hpp"
namespace zmq namespace zmq
{ {
class io_thread_t; class io_thread_t;
class session_base_t; class session_base_t;
struct address_t;
class ipc_connecter_t : public own_t, public io_object_t class ipc_connecter_t : public own_t, public io_object_t
{ {
...@@ -45,7 +45,7 @@ namespace zmq ...@@ -45,7 +45,7 @@ namespace zmq
// connection process. // connection process.
ipc_connecter_t (zmq::io_thread_t *io_thread_, ipc_connecter_t (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_, const options_t &options_, zmq::session_base_t *session_, const options_t &options_,
const char *address_, bool delay_); const address_t *addr_, bool delay_);
~ipc_connecter_t (); ~ipc_connecter_t ();
private: private:
...@@ -72,9 +72,6 @@ namespace zmq ...@@ -72,9 +72,6 @@ namespace zmq
// Returns the currently used interval // Returns the currently used interval
int get_new_reconnect_ivl (); int get_new_reconnect_ivl ();
// Set address to connect to.
int set_address (const char *addr_);
// Open IPC connecting socket. Returns -1 in case of error, // Open IPC connecting socket. Returns -1 in case of error,
// 0 if connect was successfull immediately. Returns -1 with // 0 if connect was successfull immediately. Returns -1 with
// EAGAIN errno if async connect was launched. // EAGAIN errno if async connect was launched.
...@@ -87,8 +84,8 @@ namespace zmq ...@@ -87,8 +84,8 @@ namespace zmq
// retired_fd if the connection was unsuccessfull. // retired_fd if the connection was unsuccessfull.
fd_t connect (); fd_t connect ();
// Address to connect to. // Address to connect to. Owned by session_base_t.
ipc_address_t address; const address_t *addr;
// Underlying socket. // Underlying socket.
fd_t s; fd_t s;
......
...@@ -88,7 +88,7 @@ void zmq::ipc_listener_t::in_event () ...@@ -88,7 +88,7 @@ void zmq::ipc_listener_t::in_event ()
// Create and launch a session object. // Create and launch a session object.
session_base_t *session = session_base_t::create (io_thread, false, socket, session_base_t *session = session_base_t::create (io_thread, false, socket,
options, NULL, NULL); options, NULL);
errno_assert (session); errno_assert (session);
session->inc_seqnum (); session->inc_seqnum ();
launch_child (session); launch_child (session);
......
...@@ -119,9 +119,8 @@ bool zmq::pair_t::xhas_out () ...@@ -119,9 +119,8 @@ bool zmq::pair_t::xhas_out ()
zmq::pair_session_t::pair_session_t (io_thread_t *io_thread_, bool connect_, zmq::pair_session_t::pair_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) : const address_t *addr_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_, session_base_t (io_thread_, connect_, socket_, options_, addr_)
address_)
{ {
} }
......
...@@ -65,7 +65,7 @@ namespace zmq ...@@ -65,7 +65,7 @@ namespace zmq
pair_session_t (zmq::io_thread_t *io_thread_, bool connect_, pair_session_t (zmq::io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const address_t *addr_);
~pair_session_t (); ~pair_session_t ();
private: private:
......
...@@ -46,9 +46,8 @@ bool zmq::pub_t::xhas_in () ...@@ -46,9 +46,8 @@ bool zmq::pub_t::xhas_in ()
zmq::pub_session_t::pub_session_t (io_thread_t *io_thread_, bool connect_, zmq::pub_session_t::pub_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) : const address_t *addr_) :
xpub_session_t (io_thread_, connect_, socket_, options_, protocol_, xpub_session_t (io_thread_, connect_, socket_, options_, addr_)
address_)
{ {
} }
......
...@@ -55,7 +55,7 @@ namespace zmq ...@@ -55,7 +55,7 @@ namespace zmq
pub_session_t (zmq::io_thread_t *io_thread_, bool connect_, pub_session_t (zmq::io_thread_t *io_thread_, bool connect_,
zmq::socket_base_t *socket_, const options_t &options_, zmq::socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const address_t *addr_);
~pub_session_t (); ~pub_session_t ();
private: private:
......
...@@ -62,9 +62,8 @@ bool zmq::pull_t::xhas_in () ...@@ -62,9 +62,8 @@ bool zmq::pull_t::xhas_in ()
zmq::pull_session_t::pull_session_t (io_thread_t *io_thread_, bool connect_, zmq::pull_session_t::pull_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) : const address_t *addr_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_, session_base_t (io_thread_, connect_, socket_, options_, addr_)
address_)
{ {
} }
......
...@@ -67,7 +67,7 @@ namespace zmq ...@@ -67,7 +67,7 @@ namespace zmq
pull_session_t (zmq::io_thread_t *io_thread_, bool connect_, pull_session_t (zmq::io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const address_t *addr_);
~pull_session_t (); ~pull_session_t ();
private: private:
......
...@@ -62,9 +62,8 @@ bool zmq::push_t::xhas_out () ...@@ -62,9 +62,8 @@ bool zmq::push_t::xhas_out ()
zmq::push_session_t::push_session_t (io_thread_t *io_thread_, bool connect_, zmq::push_session_t::push_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) : const address_t *addr_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_, session_base_t (io_thread_, connect_, socket_, options_, addr_)
address_)
{ {
} }
......
...@@ -66,7 +66,7 @@ namespace zmq ...@@ -66,7 +66,7 @@ namespace zmq
push_session_t (zmq::io_thread_t *io_thread_, bool connect_, push_session_t (zmq::io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const address_t *addr_);
~push_session_t (); ~push_session_t ();
private: private:
......
...@@ -114,9 +114,8 @@ bool zmq::rep_t::xhas_out () ...@@ -114,9 +114,8 @@ bool zmq::rep_t::xhas_out ()
zmq::rep_session_t::rep_session_t (io_thread_t *io_thread_, bool connect_, zmq::rep_session_t::rep_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) : const address_t *addr_) :
xrep_session_t (io_thread_, connect_, socket_, options_, protocol_, xrep_session_t (io_thread_, connect_, socket_, options_, addr_)
address_)
{ {
} }
......
...@@ -66,7 +66,7 @@ namespace zmq ...@@ -66,7 +66,7 @@ namespace zmq
rep_session_t (zmq::io_thread_t *io_thread_, bool connect_, rep_session_t (zmq::io_thread_t *io_thread_, bool connect_,
zmq::socket_base_t *socket_, const options_t &options_, zmq::socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const address_t *addr_);
~rep_session_t (); ~rep_session_t ();
private: private:
......
...@@ -139,9 +139,8 @@ bool zmq::req_t::xhas_out () ...@@ -139,9 +139,8 @@ bool zmq::req_t::xhas_out ()
zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_, zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) : const address_t *addr_) :
xreq_session_t (io_thread_, connect_, socket_, options_, protocol_, xreq_session_t (io_thread_, connect_, socket_, options_, addr_),
address_),
state (identity) state (identity)
{ {
} }
......
...@@ -67,7 +67,7 @@ namespace zmq ...@@ -67,7 +67,7 @@ namespace zmq
req_session_t (zmq::io_thread_t *io_thread_, bool connect_, req_session_t (zmq::io_thread_t *io_thread_, bool connect_,
zmq::socket_base_t *socket_, const options_t &options_, zmq::socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const address_t *addr_);
~req_session_t (); ~req_session_t ();
// Overloads of the functions from session_base_t. // Overloads of the functions from session_base_t.
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include "ipc_connecter.hpp" #include "ipc_connecter.hpp"
#include "pgm_sender.hpp" #include "pgm_sender.hpp"
#include "pgm_receiver.hpp" #include "pgm_receiver.hpp"
#include "address.hpp"
#include "req.hpp" #include "req.hpp"
#include "xreq.hpp" #include "xreq.hpp"
...@@ -45,52 +46,52 @@ ...@@ -45,52 +46,52 @@
zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
bool connect_, class socket_base_t *socket_, const options_t &options_, bool connect_, class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) const address_t *addr_)
{ {
session_base_t *s = NULL; session_base_t *s = NULL;
switch (options_.type) { switch (options_.type) {
case ZMQ_REQ: case ZMQ_REQ:
s = new (std::nothrow) req_session_t (io_thread_, connect_, s = new (std::nothrow) req_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_); socket_, options_, addr_);
break; break;
case ZMQ_XREQ: case ZMQ_XREQ:
s = new (std::nothrow) xreq_session_t (io_thread_, connect_, s = new (std::nothrow) xreq_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_); socket_, options_, addr_);
case ZMQ_REP: case ZMQ_REP:
s = new (std::nothrow) rep_session_t (io_thread_, connect_, s = new (std::nothrow) rep_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_); socket_, options_, addr_);
break; break;
case ZMQ_XREP: case ZMQ_XREP:
s = new (std::nothrow) xrep_session_t (io_thread_, connect_, s = new (std::nothrow) xrep_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_); socket_, options_, addr_);
break; break;
case ZMQ_PUB: case ZMQ_PUB:
s = new (std::nothrow) pub_session_t (io_thread_, connect_, s = new (std::nothrow) pub_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_); socket_, options_, addr_);
break; break;
case ZMQ_XPUB: case ZMQ_XPUB:
s = new (std::nothrow) xpub_session_t (io_thread_, connect_, s = new (std::nothrow) xpub_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_); socket_, options_, addr_);
break; break;
case ZMQ_SUB: case ZMQ_SUB:
s = new (std::nothrow) sub_session_t (io_thread_, connect_, s = new (std::nothrow) sub_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_); socket_, options_, addr_);
break; break;
case ZMQ_XSUB: case ZMQ_XSUB:
s = new (std::nothrow) xsub_session_t (io_thread_, connect_, s = new (std::nothrow) xsub_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_); socket_, options_, addr_);
break; break;
case ZMQ_PUSH: case ZMQ_PUSH:
s = new (std::nothrow) push_session_t (io_thread_, connect_, s = new (std::nothrow) push_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_); socket_, options_, addr_);
break; break;
case ZMQ_PULL: case ZMQ_PULL:
s = new (std::nothrow) pull_session_t (io_thread_, connect_, s = new (std::nothrow) pull_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_); socket_, options_, addr_);
break; break;
case ZMQ_PAIR: case ZMQ_PAIR:
s = new (std::nothrow) pair_session_t (io_thread_, connect_, s = new (std::nothrow) pair_session_t (io_thread_, connect_,
socket_, options_, protocol_, address_); socket_, options_, addr_);
break; break;
default: default:
errno = EINVAL; errno = EINVAL;
...@@ -102,7 +103,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, ...@@ -102,7 +103,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
bool connect_, class socket_base_t *socket_, const options_t &options_, bool connect_, class socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) : const address_t *addr_) :
own_t (io_thread_, options_), own_t (io_thread_, options_),
io_object_t (io_thread_), io_object_t (io_thread_),
connect (connect_), connect (connect_),
...@@ -114,12 +115,9 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, ...@@ -114,12 +115,9 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
io_thread (io_thread_), io_thread (io_thread_),
has_linger_timer (false), has_linger_timer (false),
send_identity (options_.send_identity), send_identity (options_.send_identity),
recv_identity (options_.recv_identity) recv_identity (options_.recv_identity),
addr (addr_)
{ {
if (protocol_)
protocol = protocol_;
if (address_)
address = address_;
} }
zmq::session_base_t::~session_base_t () zmq::session_base_t::~session_base_t ()
...@@ -135,6 +133,9 @@ zmq::session_base_t::~session_base_t () ...@@ -135,6 +133,9 @@ zmq::session_base_t::~session_base_t ()
// Close the engine. // Close the engine.
if (engine) if (engine)
engine->terminate (); engine->terminate ();
if (addr)
delete addr;
} }
void zmq::session_base_t::attach_pipe (pipe_t *pipe_) void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
...@@ -393,18 +394,18 @@ void zmq::session_base_t::start_connecting (bool wait_) ...@@ -393,18 +394,18 @@ void zmq::session_base_t::start_connecting (bool wait_)
// Create the connecter object. // Create the connecter object.
if (protocol == "tcp") { if (addr->protocol == "tcp") {
tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t ( tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t (
io_thread, this, options, address.c_str (), wait_); io_thread, this, options, addr, wait_);
alloc_assert (connecter); alloc_assert (connecter);
launch_child (connecter); launch_child (connecter);
return; return;
} }
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
if (protocol == "ipc") { if (addr->protocol == "ipc") {
ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t ( ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t (
io_thread, this, options, address.c_str (), wait_); io_thread, this, options, addr, wait_);
alloc_assert (connecter); alloc_assert (connecter);
launch_child (connecter); launch_child (connecter);
return; return;
...@@ -414,10 +415,10 @@ void zmq::session_base_t::start_connecting (bool wait_) ...@@ -414,10 +415,10 @@ void zmq::session_base_t::start_connecting (bool wait_)
#if defined ZMQ_HAVE_OPENPGM #if defined ZMQ_HAVE_OPENPGM
// Both PGM and EPGM transports are using the same infrastructure. // Both PGM and EPGM transports are using the same infrastructure.
if (protocol == "pgm" || protocol == "epgm") { if (addr->protocol == "pgm" || addr->protocol == "epgm") {
// For EPGM transport with UDP encapsulation of PGM is used. // For EPGM transport with UDP encapsulation of PGM is used.
bool udp_encapsulation = (protocol == "epgm"); bool udp_encapsulation = (addr->protocol == "epgm");
// At this point we'll create message pipes to the session straight // At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect' // away. There's no point in delaying it as no concept of 'connect'
...@@ -429,7 +430,7 @@ void zmq::session_base_t::start_connecting (bool wait_) ...@@ -429,7 +430,7 @@ void zmq::session_base_t::start_connecting (bool wait_)
io_thread, options); io_thread, options);
alloc_assert (pgm_sender); alloc_assert (pgm_sender);
int rc = pgm_sender->init (udp_encapsulation, address.c_str ()); int rc = pgm_sender->init (udp_encapsulation, addr->address.c_str ());
zmq_assert (rc == 0); zmq_assert (rc == 0);
send_attach (this, pgm_sender); send_attach (this, pgm_sender);
...@@ -441,7 +442,7 @@ void zmq::session_base_t::start_connecting (bool wait_) ...@@ -441,7 +442,7 @@ void zmq::session_base_t::start_connecting (bool wait_)
io_thread, options); io_thread, options);
alloc_assert (pgm_receiver); alloc_assert (pgm_receiver);
int rc = pgm_receiver->init (udp_encapsulation, address.c_str ()); int rc = pgm_receiver->init (udp_encapsulation, addr->address.c_str ());
zmq_assert (rc == 0); zmq_assert (rc == 0);
send_attach (this, pgm_receiver); send_attach (this, pgm_receiver);
......
...@@ -36,6 +36,7 @@ namespace zmq ...@@ -36,6 +36,7 @@ namespace zmq
class io_thread_t; class io_thread_t;
class socket_base_t; class socket_base_t;
struct i_engine; struct i_engine;
struct address_t;
class session_base_t : class session_base_t :
public own_t, public own_t,
...@@ -47,8 +48,7 @@ namespace zmq ...@@ -47,8 +48,7 @@ namespace zmq
// Create a session of the particular type. // Create a session of the particular type.
static session_base_t *create (zmq::io_thread_t *io_thread_, static session_base_t *create (zmq::io_thread_t *io_thread_,
bool connect_, zmq::socket_base_t *socket_, bool connect_, zmq::socket_base_t *socket_,
const options_t &options_, const char *protocol_, const options_t &options_, const address_t *addr_);
const char *address_);
// To be used once only, when creating the session. // To be used once only, when creating the session.
void attach_pipe (zmq::pipe_t *pipe_); void attach_pipe (zmq::pipe_t *pipe_);
...@@ -69,8 +69,8 @@ namespace zmq ...@@ -69,8 +69,8 @@ namespace zmq
session_base_t (zmq::io_thread_t *io_thread_, bool connect_, session_base_t (zmq::io_thread_t *io_thread_, bool connect_,
zmq::socket_base_t *socket_, const options_t &options_, zmq::socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const address_t *addr_);
~session_base_t (); virtual ~session_base_t ();
private: private:
...@@ -129,8 +129,7 @@ namespace zmq ...@@ -129,8 +129,7 @@ namespace zmq
bool recv_identity; bool recv_identity;
// Protocol and address to use when connecting. // Protocol and address to use when connecting.
std::string protocol; const address_t *addr;
std::string address;
session_base_t (const session_base_t&); session_base_t (const session_base_t&);
const session_base_t &operator = (const session_base_t&); const session_base_t &operator = (const session_base_t&);
......
...@@ -49,6 +49,9 @@ ...@@ -49,6 +49,9 @@
#include "platform.hpp" #include "platform.hpp"
#include "likely.hpp" #include "likely.hpp"
#include "msg.hpp" #include "msg.hpp"
#include "address.hpp"
#include "ipc_address.hpp"
#include "tcp_address.hpp"
#include "pair.hpp" #include "pair.hpp"
#include "pub.hpp" #include "pub.hpp"
...@@ -447,9 +450,33 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -447,9 +450,33 @@ int zmq::socket_base_t::connect (const char *addr_)
return -1; return -1;
} }
address_t *paddr = new (std::nothrow) address_t (protocol, address);
zmq_assert (paddr);
// Resolve address (if needed by the protocol)
if (protocol == "tcp") {
paddr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
zmq_assert (paddr->resolved.tcp_addr);
int rc = paddr->resolved.tcp_addr->resolve (
address.c_str (), false, options.ipv4only ? true : false);
if (rc != 0) {
delete paddr;
return -1;
}
}
else if(protocol == "ipc") {
paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
zmq_assert (paddr->resolved.ipc_addr);
int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
if (rc != 0) {
delete paddr;
return -1;
}
}
// Create session. // Create session.
session_base_t *session = session_base_t::create (io_thread, true, this, session_base_t *session = session_base_t::create (io_thread, true, this,
options, protocol.c_str (), address.c_str ()); options, paddr);
errno_assert (session); errno_assert (session);
// Create a bi-directional pipe. // Create a bi-directional pipe.
......
...@@ -82,9 +82,8 @@ bool zmq::sub_t::xhas_out () ...@@ -82,9 +82,8 @@ bool zmq::sub_t::xhas_out ()
zmq::sub_session_t::sub_session_t (io_thread_t *io_thread_, bool connect_, zmq::sub_session_t::sub_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) : const address_t *addr_) :
xsub_session_t (io_thread_, connect_, socket_, options_, protocol_, xsub_session_t (io_thread_, connect_, socket_, options_, addr_)
address_)
{ {
} }
......
...@@ -57,7 +57,7 @@ namespace zmq ...@@ -57,7 +57,7 @@ namespace zmq
sub_session_t (zmq::io_thread_t *io_thread_, bool connect_, sub_session_t (zmq::io_thread_t *io_thread_, bool connect_,
zmq::socket_base_t *socket_, const options_t &options_, zmq::socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const address_t *addr_);
~sub_session_t (); ~sub_session_t ();
private: private:
......
...@@ -419,12 +419,12 @@ int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv4only_) ...@@ -419,12 +419,12 @@ int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv4only_)
return 0; return 0;
} }
sockaddr *zmq::tcp_address_t::addr () const sockaddr *zmq::tcp_address_t::addr () const
{ {
return &address.generic; return &address.generic;
} }
socklen_t zmq::tcp_address_t::addrlen () socklen_t zmq::tcp_address_t::addrlen () const
{ {
if (address.generic.sa_family == AF_INET6) if (address.generic.sa_family == AF_INET6)
return (socklen_t) sizeof (address.ipv6); return (socklen_t) sizeof (address.ipv6);
...@@ -433,9 +433,9 @@ socklen_t zmq::tcp_address_t::addrlen () ...@@ -433,9 +433,9 @@ socklen_t zmq::tcp_address_t::addrlen ()
} }
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
unsigned short zmq::tcp_address_t::family () unsigned short zmq::tcp_address_t::family () const
#else #else
sa_family_t zmq::tcp_address_t::family () sa_family_t zmq::tcp_address_t::family () const
#endif #endif
{ {
return address.generic.sa_family; return address.generic.sa_family;
......
...@@ -48,12 +48,12 @@ namespace zmq ...@@ -48,12 +48,12 @@ namespace zmq
int resolve (const char* name_, bool local_, bool ipv4only_); int resolve (const char* name_, bool local_, bool ipv4only_);
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
unsigned short family (); unsigned short family () const;
#else #else
sa_family_t family (); sa_family_t family () const;
#endif #endif
sockaddr *addr (); const sockaddr *addr () const;
socklen_t addrlen (); socklen_t addrlen () const;
private: private:
......
...@@ -29,6 +29,8 @@ ...@@ -29,6 +29,8 @@
#include "random.hpp" #include "random.hpp"
#include "err.hpp" #include "err.hpp"
#include "ip.hpp" #include "ip.hpp"
#include "address.hpp"
#include "tcp_address.hpp"
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp" #include "windows.hpp"
...@@ -48,19 +50,18 @@ ...@@ -48,19 +50,18 @@
zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
class session_base_t *session_, const options_t &options_, class session_base_t *session_, const options_t &options_,
const char *address_, bool wait_) : const address_t *addr_, bool wait_) :
own_t (io_thread_, options_), own_t (io_thread_, options_),
io_object_t (io_thread_), io_object_t (io_thread_),
addr (addr_),
s (retired_fd), s (retired_fd),
handle_valid (false), handle_valid (false),
wait (wait_), wait (wait_),
session (session_), session (session_),
current_reconnect_ivl(options.reconnect_ivl) current_reconnect_ivl(options.reconnect_ivl)
{ {
// TODO: set_addess should be called separately, so that the error zmq_assert (addr);
// can be propagated. zmq_assert (addr->protocol == "tcp");
int rc = set_address (address_);
errno_assert (rc == 0);
} }
zmq::tcp_connecter_t::~tcp_connecter_t () zmq::tcp_connecter_t::~tcp_connecter_t ()
...@@ -176,17 +177,12 @@ int zmq::tcp_connecter_t::get_new_reconnect_ivl () ...@@ -176,17 +177,12 @@ int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
return this_interval; return this_interval;
} }
int zmq::tcp_connecter_t::set_address (const char *addr_)
{
return address.resolve (addr_, false, options.ipv4only ? true : false);
}
int zmq::tcp_connecter_t::open () int zmq::tcp_connecter_t::open ()
{ {
zmq_assert (s == retired_fd); zmq_assert (s == retired_fd);
// Create the socket. // Create the socket.
s = open_socket (address.family (), SOCK_STREAM, IPPROTO_TCP); s = open_socket (addr->resolved.tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
if (s == INVALID_SOCKET) { if (s == INVALID_SOCKET) {
wsa_error_to_errno (); wsa_error_to_errno ();
...@@ -199,14 +195,16 @@ int zmq::tcp_connecter_t::open () ...@@ -199,14 +195,16 @@ int zmq::tcp_connecter_t::open ()
// On some systems, IPv4 mapping in IPv6 sockets is disabled by default. // On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
// Switch it on in such cases. // Switch it on in such cases.
if (address.family () == AF_INET6) if (addr->resolved.tcp_addr->family () == AF_INET6)
enable_ipv4_mapping (s); enable_ipv4_mapping (s);
// Set the socket to non-blocking mode so that we get async connect(). // Set the socket to non-blocking mode so that we get async connect().
unblock_socket (s); unblock_socket (s);
// Connect to the remote peer. // Connect to the remote peer.
int rc = ::connect (s, address.addr (), address.addrlen ()); int rc = ::connect (
s, addr->resolved.tcp_addr->addr (),
addr->resolved.tcp_addr->addrlen ());
// Connect was successfull immediately. // Connect was successfull immediately.
if (rc == 0) if (rc == 0)
......
...@@ -26,13 +26,13 @@ ...@@ -26,13 +26,13 @@
#include "own.hpp" #include "own.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "io_object.hpp" #include "io_object.hpp"
#include "tcp_address.hpp"
namespace zmq namespace zmq
{ {
class io_thread_t; class io_thread_t;
class session_base_t; class session_base_t;
struct address_t;
class tcp_connecter_t : public own_t, public io_object_t class tcp_connecter_t : public own_t, public io_object_t
{ {
...@@ -42,7 +42,7 @@ namespace zmq ...@@ -42,7 +42,7 @@ namespace zmq
// connection process. // connection process.
tcp_connecter_t (zmq::io_thread_t *io_thread_, tcp_connecter_t (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_, const options_t &options_, zmq::session_base_t *session_, const options_t &options_,
const char *address_, bool delay_); const address_t *addr_, bool delay_);
~tcp_connecter_t (); ~tcp_connecter_t ();
private: private:
...@@ -69,9 +69,6 @@ namespace zmq ...@@ -69,9 +69,6 @@ namespace zmq
// Returns the currently used interval // Returns the currently used interval
int get_new_reconnect_ivl (); int get_new_reconnect_ivl ();
// Set address to connect to.
int set_address (const char *addr_);
// Open TCP connecting socket. Returns -1 in case of error, // Open TCP connecting socket. Returns -1 in case of error,
// 0 if connect was successfull immediately. Returns -1 with // 0 if connect was successfull immediately. Returns -1 with
// EAGAIN errno if async connect was launched. // EAGAIN errno if async connect was launched.
...@@ -84,8 +81,8 @@ namespace zmq ...@@ -84,8 +81,8 @@ namespace zmq
// retired_fd if the connection was unsuccessfull. // retired_fd if the connection was unsuccessfull.
fd_t connect (); fd_t connect ();
// Address to connect to. // Address to connect to. Owned by session_base_t.
tcp_address_t address; const address_t *addr;
// Underlying socket. // Underlying socket.
fd_t s; fd_t s;
......
...@@ -100,7 +100,7 @@ void zmq::tcp_listener_t::in_event () ...@@ -100,7 +100,7 @@ void zmq::tcp_listener_t::in_event ()
// Create and launch a session object. // Create and launch a session object.
session_base_t *session = session_base_t::create (io_thread, false, socket, session_base_t *session = session_base_t::create (io_thread, false, socket,
options, NULL, NULL); options, NULL);
errno_assert (session); errno_assert (session);
session->inc_seqnum (); session->inc_seqnum ();
launch_child (session); launch_child (session);
......
...@@ -176,9 +176,8 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, ...@@ -176,9 +176,8 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
zmq::xpub_session_t::xpub_session_t (io_thread_t *io_thread_, bool connect_, zmq::xpub_session_t::xpub_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) : const address_t *addr_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_, session_base_t (io_thread_, connect_, socket_, options_, addr_)
address_)
{ {
} }
......
...@@ -91,7 +91,7 @@ namespace zmq ...@@ -91,7 +91,7 @@ namespace zmq
xpub_session_t (zmq::io_thread_t *io_thread_, bool connect_, xpub_session_t (zmq::io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const address_t *addr_);
~xpub_session_t (); ~xpub_session_t ();
private: private:
......
...@@ -309,9 +309,8 @@ bool zmq::xrep_t::xhas_out () ...@@ -309,9 +309,8 @@ bool zmq::xrep_t::xhas_out ()
zmq::xrep_session_t::xrep_session_t (io_thread_t *io_thread_, bool connect_, zmq::xrep_session_t::xrep_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) : const address_t *addr_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_, session_base_t (io_thread_, connect_, socket_, options_, addr_)
address_)
{ {
} }
......
...@@ -110,7 +110,7 @@ namespace zmq ...@@ -110,7 +110,7 @@ namespace zmq
xrep_session_t (zmq::io_thread_t *io_thread_, bool connect_, xrep_session_t (zmq::io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const address_t *addr_);
~xrep_session_t (); ~xrep_session_t ();
private: private:
......
...@@ -117,9 +117,8 @@ void zmq::xreq_t::xterminated (pipe_t *pipe_) ...@@ -117,9 +117,8 @@ void zmq::xreq_t::xterminated (pipe_t *pipe_)
zmq::xreq_session_t::xreq_session_t (io_thread_t *io_thread_, bool connect_, zmq::xreq_session_t::xreq_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) : const address_t *addr_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_, session_base_t (io_thread_, connect_, socket_, options_, addr_)
address_)
{ {
} }
......
...@@ -78,7 +78,7 @@ namespace zmq ...@@ -78,7 +78,7 @@ namespace zmq
xreq_session_t (zmq::io_thread_t *io_thread_, bool connect_, xreq_session_t (zmq::io_thread_t *io_thread_, bool connect_,
zmq::socket_base_t *socket_, const options_t &options_, zmq::socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const address_t *addr_);
~xreq_session_t (); ~xreq_session_t ();
private: private:
......
...@@ -220,9 +220,8 @@ void zmq::xsub_t::send_subscription (unsigned char *data_, size_t size_, ...@@ -220,9 +220,8 @@ void zmq::xsub_t::send_subscription (unsigned char *data_, size_t size_,
zmq::xsub_session_t::xsub_session_t (io_thread_t *io_thread_, bool connect_, zmq::xsub_session_t::xsub_session_t (io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_) : const address_t *addr_) :
session_base_t (io_thread_, connect_, socket_, options_, protocol_, session_base_t (io_thread_, connect_, socket_, options_, addr_)
address_)
{ {
} }
......
...@@ -93,7 +93,7 @@ namespace zmq ...@@ -93,7 +93,7 @@ namespace zmq
xsub_session_t (class io_thread_t *io_thread_, bool connect_, xsub_session_t (class io_thread_t *io_thread_, bool connect_,
socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const address_t *addr_);
~xsub_session_t (); ~xsub_session_t ();
private: private:
......
...@@ -11,7 +11,8 @@ noinst_PROGRAMS = test_pair_inproc \ ...@@ -11,7 +11,8 @@ noinst_PROGRAMS = test_pair_inproc \
test_reqrep_device \ test_reqrep_device \
test_sub_forward \ test_sub_forward \
test_invalid_rep \ test_invalid_rep \
test_msg_flags test_msg_flags \
test_connect_resolve
if !ON_MINGW if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \ noinst_PROGRAMS += test_shutdown_stress \
...@@ -30,6 +31,7 @@ test_reqrep_device_SOURCES = test_reqrep_device.cpp ...@@ -30,6 +31,7 @@ test_reqrep_device_SOURCES = test_reqrep_device.cpp
test_sub_forward_SOURCES = test_sub_forward.cpp test_sub_forward_SOURCES = test_sub_forward.cpp
test_invalid_rep_SOURCES = test_invalid_rep.cpp test_invalid_rep_SOURCES = test_invalid_rep.cpp
test_msg_flags_SOURCES = test_msg_flags.cpp test_msg_flags_SOURCES = test_msg_flags.cpp
test_connect_resolve_SOURCES = test_connect_resolve.cpp
if !ON_MINGW if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
......
/*
Copyright (c) 2012 Spotify AB
Copyright (c) 2012 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <errno.h>
#include "../include/zmq.h"
int main (int argc, char *argv [])
{
fprintf (stderr, "test_connect_resolve running...\n");
void *ctx = zmq_init (1);
assert (ctx);
// Create pair of socket, each with high watermark of 2. Thus the total
// buffer space should be 4 messages.
void *sock = zmq_socket (ctx, ZMQ_PUB);
assert (sock);
int rc = zmq_connect (sock, "tcp://localhost:1234");
assert (rc == 0);
rc = zmq_connect (sock, "tcp://foobar123xyz:1234");
assert (rc == -1);
assert (errno == EINVAL);
rc = zmq_close (sock);
assert (rc == 0);
rc = zmq_term (ctx);
assert (rc == 0);
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment