Commit 2bb57ac5 authored by Martin Sustrik's avatar Martin Sustrik

ZMQII-39: Implement IPC transport

parent 56c36927
......@@ -7,7 +7,7 @@ dist_man_MANS = man1/zmq_forwarder.1 man1/zmq_streamer.1 man1/zmq_queue.1 \
man3/zmq_msg_data.3 man3/zmq_msg_size.3 man3/zmq_strerror.3 \
man7/zmq.7 man7/zmq_cpp.7 man7/zmq_python.7 man7/zmq_ruby.7 \
man7/zmq_cl.7 man7/zmq_tcp.7 man7/zmq_udp.7 man7/zmq_pgm.7 \
man7/zmq_inproc.7
man7/zmq_inproc.7 man7/zmq_ipc.7
distclean-local:
-rm *.pdf
......
......@@ -51,4 +51,5 @@ groff -man -Thtml man7/zmq_tcp.7 > man7/zmq_tcp.7.html
groff -man -Thtml man7/zmq_udp.7 > man7/zmq_udp.7.html
groff -man -Thtml man7/zmq_pgm.7 > man7/zmq_pgm.7.html
groff -man -Thtml man7/zmq_inproc.7 > man7/zmq_inproc.7.html
groff -man -Thtml man7/zmq_ipc.7 > man7/zmq_ipc.7.html
......@@ -83,4 +83,6 @@ groff -man -Tps man7/zmq_pgm.7 > man7/zmq_pgm.7.ps
ps2pdf man7/zmq_pgm.7.ps zmq_pgm.pdf
groff -man -Tps man7/zmq_inproc.7 > man7/zmq_inproc.7.ps
ps2pdf man7/zmq_inproc.7.ps zmq_inproc.pdf
groff -man -Tps man7/zmq_ipc.7 > man7/zmq_ipc.7.ps
ps2pdf man7/zmq_ipc.7.ps zmq_ipc.pdf
......@@ -112,6 +112,9 @@ UDP reliable multicast transport:
PGM reliable multicast transport:
.BR zmq_pgm(7)
Inter-process transport:
.BR zmq_ipc (7)
In-process (inter-thread) transport:
.BR zmq_inproc(7)
......
......@@ -3,7 +3,7 @@
In-process (inter-thread) tranport for 0MQ
.SH SYNOPSIS
In-process transport is optimised for passing messages betweem threads in the
In-process transport is optimised for passing messages between threads in the
same process.
Messages are passed directly from one application thread to
......@@ -32,6 +32,7 @@ wire format specification.
.SH "SEE ALSO"
.BR zmq_ipc (7)
.BR zmq_tcp (7)
.BR zmq_udp (7)
.BR zmq_pgm (7)
......
.TH zmq_ipc 7 "" "(c)2007-2010 iMatix Corporation" "0MQ User Manuals"
.SH NAME
Inter-process tranport for 0MQ
.SH SYNOPSIS
In-process transport is optimised for passing messages between processes on the
same physical machine.
.SH CONNECTION STRING
Connection string for inproc transport is "inproc://" followed by a file name.
The file will be used as placeholder for a message endpoint. (UNIX domain
sockets associate a file with the listening socket in a similar way.)
.nf
ipc:///tmp/my_ipc_endpoint
ipc:///tmp/prices.ipc
.fi
.SH WIRE FORMAT
IPC transport doesn't transfer messages across the network thus there is no need
for a wire format specification.
.SH "SEE ALSO"
.BR zmq_inproc (7)
.BR zmq_tcp (7)
.BR zmq_udp (7)
.BR zmq_pgm (7)
.SH AUTHOR
Martin Sustrik <sustrik at 250bpm dot com>
......@@ -80,6 +80,7 @@ Following example shows how messages are arranged in subsequent packets:
.BR zmq_udp (7)
.BR zmq_tcp (7)
.BR zmq_ipc (7)
.BR zmq_inproc (7)
.BR zmq_setsockopt (3)
......
......@@ -72,6 +72,7 @@ Binary layout of a larger message:
.BR zmq_udp (7)
.BR zmq_pgm (7)
.BR zmq_ipc (7)
.BR zmq_inproc (7)
.SH AUTHOR
......
......@@ -37,6 +37,7 @@ Same as with PGM transport except for UDP packet headers.
.BR zmq_pgm (7)
.BR zmq_tcp (7)
.BR zmq_ipc (7)
.BR zmq_inproc (7)
.SH AUTHOR
......
......@@ -309,3 +309,20 @@ int zmq::resolve_ip_hostname (sockaddr_in *addr_, const char *hostname_)
return 0;
}
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
int zmq::resolve_local_path (sockaddr_un *addr_, const char *path_)
{
if (strlen (path_) >= sizeof (addr_->sun_path))
{
errno = ENAMETOOLONG;
return -1;
}
strcpy (addr_->sun_path, path_);
addr_->sun_family = AF_LOCAL;
return 0;
}
#endif
......@@ -32,6 +32,10 @@
#include <netdb.h>
#endif
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
#include <sys/un.h>
#endif
namespace zmq
{
......@@ -42,6 +46,11 @@ namespace zmq
// This function resolves a string in <hostname>:<port-number> format.
// Hostname can be either the name of the host or its IP address.
int resolve_ip_hostname (sockaddr_in *addr_, const char *hostname_);
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
// This function sets up the sockaddr_un structure with the pathname_
int resolve_local_path( sockaddr_un * addr_, const char* pathname_);
#endif
}
#endif
......@@ -87,11 +87,19 @@ int zmq::socket_base_t::bind (const char *addr_)
if (addr_type == "inproc")
return register_endpoint (addr_args.c_str (), this);
if (addr_type == "tcp") {
if (addr_type == "tcp" || addr_type == "ipc") {
#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
if (addr_type == "ipc") {
errno = EPROTONOSUPPORT;
return -1;
}
#endif
zmq_listener_t *listener = new (std::nothrow) zmq_listener_t (
choose_io_thread (options.affinity), this, options);
zmq_assert (listener);
int rc = listener->set_address (addr_args.c_str ());
int rc = listener->set_address (addr_type.c_str(), addr_args.c_str ());
if (rc != 0)
return -1;
......@@ -202,7 +210,14 @@ int zmq::socket_base_t::connect (const char *addr_)
send_plug (session);
send_own (this, session);
if (addr_type == "tcp") {
if (addr_type == "tcp" || addr_type == "ipc") {
#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
if (addr_type == "ipc") {
errno = EPROTONOSUPPORT;
return -1;
}
#endif
// Create the connecter object. Supply it with the session name
// so that it can bind the new connection to the session once
......@@ -211,7 +226,7 @@ int zmq::socket_base_t::connect (const char *addr_)
choose_io_thread (options.affinity), this, options,
session->get_ordinal (), false);
zmq_assert (connecter);
int rc = connecter->set_address (addr_args.c_str ());
int rc = connecter->set_address (addr_type.c_str(), addr_args.c_str ());
if (rc != 0) {
delete connecter;
return -1;
......
......@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string.h>
#include <string>
#include "tcp_connecter.hpp"
......@@ -38,10 +40,13 @@ zmq::tcp_connecter_t::~tcp_connecter_t ()
close ();
}
int zmq::tcp_connecter_t::set_address (const char *addr_)
int zmq::tcp_connecter_t::set_address (const char *protocol_, const char *addr_)
{
// Convert the hostname into sockaddr_in structure.
return resolve_ip_hostname (&addr, addr_);
if (strcmp (protocol_, "tcp") == 0)
return resolve_ip_hostname ((sockaddr_in*) &addr, addr_);
errno = EPROTONOSUPPORT;
return -1;
}
int zmq::tcp_connecter_t::open ()
......@@ -67,7 +72,7 @@ int zmq::tcp_connecter_t::open ()
wsa_assert (rc != SOCKET_ERROR);
// Connect to the remote peer.
rc = ::connect (s, (sockaddr*) &addr, sizeof addr);
rc = ::connect (s, (sockaddr*) &addr, sizeof (sockaddr_in));
// Connect was successfull immediately.
if (rc == 0)
......@@ -143,15 +148,23 @@ zmq::tcp_connecter_t::~tcp_connecter_t ()
close ();
}
int zmq::tcp_connecter_t::set_address (const char *addr_)
int zmq::tcp_connecter_t::set_address (const char *protocol_, const char *addr_)
{
// Convert the hostname into sockaddr_in structure.
return resolve_ip_hostname (&addr, addr_);
if (strcmp (protocol_, "tcp") == 0)
return resolve_ip_hostname ((struct sockaddr_in*)&addr, addr_);
else if (strcmp (protocol_, "ipc") == 0)
return resolve_local_path (( struct sockaddr_un*)&addr, addr_);
errno = EPROTONOSUPPORT;
return -1;
}
int zmq::tcp_connecter_t::open ()
{
zmq_assert (s == retired_fd);
struct sockaddr *sa = (struct sockaddr*) &addr;
if (AF_INET == sa->sa_family) {
// Create the socket.
s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
......@@ -178,7 +191,7 @@ int zmq::tcp_connecter_t::open ()
#endif
// Connect to the remote peer.
rc = ::connect (s, (sockaddr*) &addr, sizeof (addr));
rc = ::connect (s, (struct sockaddr*) &addr, sizeof (sockaddr_in));
// Connect was successfull immediately.
if (rc == 0)
......@@ -195,6 +208,34 @@ int zmq::tcp_connecter_t::open ()
close ();
errno = err;
return -1;
}
else if (AF_LOCAL == sa->sa_family) {
s = socket (AF_LOCAL, SOCK_STREAM, 0);
if (s == -1)
return -1;
// Set the non-blocking flag.
int flag = fcntl (s, F_GETFL, 0);
if (flag == -1)
flag = 0;
int rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
errno_assert (rc != -1);
// Connect to the remote peer.
rc = ::connect (s, (struct sockaddr*) &addr, sizeof (sockaddr_un));
// Connect was successfull immediately.
if (rc == 0)
return 0;
// Error occured.
int err = errno;
close ();
errno = err;
return -1;
}
zmq_assert (false);
}
int zmq::tcp_connecter_t::close ()
......
......@@ -35,8 +35,8 @@ namespace zmq
tcp_connecter_t ();
~tcp_connecter_t ();
// Set IP address/port to connect to.
int set_address (const char *addr_);
// Set address to connect to.
int set_address (const char *protocol, const char *addr_);
// Open TCP connecting socket. Address is in
// <hostname>:<port-number> format. Returns -1 in case of error,
......
......@@ -39,10 +39,16 @@ zmq::tcp_listener_t::~tcp_listener_t ()
close ();
}
int zmq::tcp_listener_t::set_address (const char *addr_)
int zmq::tcp_listener_t::set_address (cosnt char *protocol_, const char *addr_)
{
// IPC protocol is not supported on Windows platform.
if (strcmp (protocol_, "tcp") != 0 ) {
errno = EPROTONOSUPPORT;
return -1;
}
// Convert the interface into sockaddr_in structure.
int rc = resolve_ip_interface (&addr, addr_);
int rc = resolve_ip_interface ((sockaddr_in*) &addr, addr_);
if (rc != 0)
return rc;
......@@ -65,7 +71,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
wsa_assert (rc != SOCKET_ERROR);
// Bind the socket to the network interface and port.
rc = bind (s, (struct sockaddr*) &addr, sizeof (addr));
rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_in));
if (rc == SOCKET_ERROR) {
wsa_error_to_errno ();
return -1;
......@@ -131,6 +137,7 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#include <sys/un.h>
zmq::tcp_listener_t::tcp_listener_t () :
s (retired_fd)
......@@ -144,12 +151,14 @@ zmq::tcp_listener_t::~tcp_listener_t ()
close ();
}
int zmq::tcp_listener_t::set_address (const char *addr_)
int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_)
{
if (strcmp (protocol_, "tcp") == 0 ) {
// Convert the interface into sockaddr_in structure.
int rc = resolve_ip_interface (&addr, addr_);
int rc = resolve_ip_interface ((struct sockaddr_in*) &addr, addr_);
if (rc != 0)
return rc;
return -1;
// Create a listening socket.
s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
......@@ -169,7 +178,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
errno_assert (rc != -1);
// Bind the socket to the network interface and port.
rc = bind (s, (struct sockaddr*) &addr, sizeof (addr));
rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_in));
if (rc != 0) {
close ();
return -1;
......@@ -183,6 +192,50 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
}
return 0;
}
else if (strcmp (protocol_, "ipc") == 0) {
// Get rid of the file associated with the UNIX domain socket that
// may have been left behind by the previous run of the application.
::unlink (addr_);
// Convert the address into sockaddr_un structure.
int rc = resolve_local_path ((struct sockaddr_un*) &addr, addr_);
if (rc != 0)
return -1;
// Create a listening socket.
s = socket (AF_LOCAL, SOCK_STREAM, 0);
if (s == -1)
return -1;
// Set the non-blocking flag.
int flag = fcntl (s, F_GETFL, 0);
if (flag == -1)
flag = 0;
rc = fcntl (s, F_SETFL, flag | O_NONBLOCK);
errno_assert (rc != -1);
// Bind the socket to the file path.
rc = bind (s, (struct sockaddr*) &addr, sizeof (sockaddr_un));
if (rc != 0) {
close ();
return -1;
}
// Listen for incomming connections.
rc = listen (s, tcp_connection_backlog);
if (rc != 0) {
close ();
return -1;
}
return 0;
}
else {
errno = EPROTONOSUPPORT;
return -1;
}
}
int zmq::tcp_listener_t::close ()
......@@ -192,6 +245,17 @@ int zmq::tcp_listener_t::close ()
if (rc != 0)
return -1;
s = retired_fd;
// If there's an underlying UNIX domain socket, get rid of the file it
// is associated with.
struct sockaddr *sa = (struct sockaddr*) &addr;
if (AF_LOCAL == sa->sa_family) {
struct sockaddr_un *sun = (struct sockaddr_un*) &addr;
rc = ::unlink(sun->sun_path);
if (rc != 0)
return -1;
}
return 0;
}
......@@ -239,6 +303,9 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
int rc = fcntl (sock, F_SETFL, flags | O_NONBLOCK);
errno_assert (rc != -1);
struct sockaddr *sa = (struct sockaddr*) &addr;
if (AF_INET == sa->sa_family) {
// Disable Nagle's algorithm.
int flag = 1;
rc = setsockopt (sock, IPPROTO_TCP, TCP_NODELAY, (char*) &flag,
......@@ -252,6 +319,7 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
sizeof (int));
errno_assert (rc != SOCKET_ERROR);
#endif
}
return sock;
}
......
......@@ -35,10 +35,8 @@ namespace zmq
tcp_listener_t ();
~tcp_listener_t ();
// Start listening on the interface. Address is in
// <interface-name>:<port-number> format. Interface name may be '*'
// to bind to all the interfaces.
int set_address (const char *addr_);
// Start listening on the interface.
int set_address (const char *protocol_, const char *addr_);
// Close the listening socket.
int close ();
......@@ -54,8 +52,8 @@ namespace zmq
private:
// IP address/port to listen on.
sockaddr_in addr;
// Address to listen on.
sockaddr_storage addr;
// Underlying socket.
fd_t s;
......
......@@ -41,11 +41,13 @@ zmq::zmq_connecter_t::~zmq_connecter_t ()
{
}
int zmq::zmq_connecter_t::set_address (const char *address_)
int zmq::zmq_connecter_t::set_address (const char *protocol_,
const char *address_)
{
int rc = tcp_connecter.set_address (address_);
int rc = tcp_connecter.set_address (protocol_, address_);
if (rc != 0)
return rc;
protocol = protocol_;
address = address_;
return 0;
}
......@@ -91,7 +93,8 @@ void zmq::zmq_connecter_t::out_event ()
// Create an init object.
zmq_init_t *init = new (std::nothrow) zmq_init_t (
choose_io_thread (options.affinity), owner,
fd, options, true, address.c_str (), session_ordinal);
fd, options, true, protocol.c_str (), address.c_str (),
session_ordinal);
zmq_assert (init);
send_plug (init);
send_own (owner, init);
......@@ -128,7 +131,6 @@ void zmq::zmq_connecter_t::start_connecting ()
}
// Handle any other error condition by eventual reconnect.
tcp_connecter.close ();
wait = true;
add_timer ();
}
......@@ -39,8 +39,8 @@ namespace zmq
const options_t &options_, uint64_t session_ordinal_, bool wait_);
~zmq_connecter_t ();
// Set IP address to connect to.
int set_address (const char *address_);
// Set address to connect to.
int set_address (const char *protocol_, const char *address_);
private:
......@@ -75,7 +75,8 @@ namespace zmq
// Associated socket options.
options_t options;
// Address to connect to.
// Protocol and address to connect to.
std::string protocol;
std::string address;
zmq_connecter_t (const zmq_connecter_t&);
......
......@@ -27,7 +27,8 @@
#include "err.hpp"
zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
const options_t &options_, bool reconnect_, const char *address_) :
const options_t &options_, bool reconnect_,
const char *protocol_, const char *address_) :
io_object_t (parent_),
inpos (NULL),
insize (0),
......@@ -39,8 +40,10 @@ zmq::zmq_engine_t::zmq_engine_t (io_thread_t *parent_, fd_t fd_,
options (options_),
reconnect (reconnect_)
{
if (reconnect)
if (reconnect) {
protocol = protocol_;
address = address_;
}
// Initialise the underlying socket.
int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf);
......@@ -166,7 +169,7 @@ void zmq::zmq_engine_t::error ()
inout->get_io_thread (), inout->get_owner (),
options, inout->get_ordinal (), true);
zmq_assert (reconnecter);
reconnecter->set_address (address.c_str ());
reconnecter->set_address (protocol.c_str(), address.c_str ());
}
inout->detach (reconnecter);
......
......@@ -22,6 +22,8 @@
#include <stddef.h>
#include <string>
#include "i_engine.hpp"
#include "io_object.hpp"
#include "tcp_socket.hpp"
......@@ -37,7 +39,8 @@ namespace zmq
public:
zmq_engine_t (class io_thread_t *parent_, fd_t fd_,
const options_t &options_, bool reconnect_, const char *address_);
const options_t &options_, bool reconnect_,
const char *protocol_, const char *address_);
~zmq_engine_t ();
// i_engine interface implementation.
......@@ -70,6 +73,7 @@ namespace zmq
options_t options;
bool reconnect;
std::string protocol;
std::string address;
zmq_engine_t (const zmq_engine_t&);
......
......@@ -25,7 +25,7 @@
zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_,
fd_t fd_, const options_t &options_, bool reconnect_,
const char *address_, uint64_t session_ordinal_) :
const char *protocol_, const char *address_, uint64_t session_ordinal_) :
owned_t (parent_, owner_),
sent (false),
received (false),
......@@ -34,7 +34,7 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *parent_, socket_base_t *owner_,
{
// Create the engine object for this connection.
engine = new (std::nothrow) zmq_engine_t (parent_, fd_, options,
reconnect_, address_);
reconnect_, protocol_, address_);
zmq_assert (engine);
}
......
......@@ -41,7 +41,8 @@ namespace zmq
zmq_init_t (class io_thread_t *parent_, socket_base_t *owner_,
fd_t fd_, const options_t &options_, bool reconnect_,
const char *address_, uint64_t session_ordinal_);
const char *protocol_, const char *address_,
uint64_t session_ordinal_);
~zmq_init_t ();
private:
......
......@@ -36,9 +36,9 @@ zmq::zmq_listener_t::~zmq_listener_t ()
{
}
int zmq::zmq_listener_t::set_address (const char *addr_)
int zmq::zmq_listener_t::set_address (const char *protocol_, const char *addr_)
{
return tcp_listener.set_address (addr_);
return tcp_listener.set_address (protocol_, addr_);
}
void zmq::zmq_listener_t::process_plug ()
......@@ -65,7 +65,7 @@ void zmq::zmq_listener_t::in_event ()
// Create an init object.
io_thread_t *io_thread = choose_io_thread (options.affinity);
zmq_init_t *init = new (std::nothrow) zmq_init_t (
io_thread, owner, fd, options, false, NULL, 0);
io_thread, owner, fd, options, false, NULL, NULL, 0);
zmq_assert (init);
send_plug (init);
send_own (owner, init);
......
......@@ -36,8 +36,8 @@ namespace zmq
zmq_listener_t (class io_thread_t *parent_, socket_base_t *owner_,
const options_t &options_);
// Set IP address to listen on.
int set_address (const char *addr_);
// Set address to listen on.
int set_address (const char* protocol_, const char *addr_);
private:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment