Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
626099aa
Commit
626099aa
authored
Oct 31, 2011
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
VTCP transport removed
Signed-off-by:
Martin Sustrik
<
sustrik@250bpm.com
>
parent
ac7717b7
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
1 addition
and
626 deletions
+1
-626
configure.in
configure.in
+0
-13
Makefile.am
src/Makefile.am
+0
-4
session_base.cpp
src/session_base.cpp
+0
-13
socket_base.cpp
src/socket_base.cpp
+1
-26
vtcp_connecter.cpp
src/vtcp_connecter.cpp
+0
-252
vtcp_connecter.hpp
src/vtcp_connecter.hpp
+0
-121
vtcp_listener.cpp
src/vtcp_listener.cpp
+0
-125
vtcp_listener.hpp
src/vtcp_listener.hpp
+0
-72
No files found.
configure.in
View file @
626099aa
...
...
@@ -348,19 +348,6 @@ fi
AC_SUBST(pgm_basename)
# VTCP extension
libzmq_vtcp="no"
AC_ARG_WITH([vtcp], [AS_HELP_STRING([--with-vtcp],
[build libzmq with VTCP extension [default=no]])],
[with_vtcp=$withval], [with_vtcp=no])
if test "x$with_vtcp" != "xno"; then
AC_DEFINE(ZMQ_HAVE_VTCP, 1, [Have VTCP extension])
AC_CHECK_LIB(vtcp, vtcp_bind, ,
[AC_MSG_ERROR([cannot link with -lvtcp, install libvtcp.])])
fi
# Set -Wall, -Werror and -pedantic
AC_LANG_PUSH([C++])
...
...
src/Makefile.am
View file @
626099aa
...
...
@@ -67,8 +67,6 @@ libzmq_la_SOURCES = \
tcp_listener.hpp
\
thread.hpp
\
trie.hpp
\
vtcp_connecter.hpp
\
vtcp_listener.hpp
\
windows.hpp
\
wire.hpp
\
xpub.hpp
\
...
...
@@ -125,8 +123,6 @@ libzmq_la_SOURCES = \
tcp_listener.cpp
\
thread.cpp
\
trie.cpp
\
vtcp_connecter.cpp
\
vtcp_listener.cpp
\
xpub.cpp
\
xrep.cpp
\
xreq.cpp
\
...
...
src/session_base.cpp
View file @
626099aa
...
...
@@ -27,7 +27,6 @@
#include "likely.hpp"
#include "tcp_connecter.hpp"
#include "ipc_connecter.hpp"
#include "vtcp_connecter.hpp"
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
...
...
@@ -394,18 +393,6 @@ void zmq::session_base_t::start_connecting (bool wait_)
}
#endif
#if defined ZMQ_HAVE_VTCP
if
(
protocol
==
"vtcp"
)
{
vtcp_connecter_t
*
connecter
=
new
(
std
::
nothrow
)
vtcp_connecter_t
(
io_thread
,
this
,
options
,
address
.
c_str
(),
wait_
);
alloc_assert
(
connecter
);
launch_child
(
connecter
);
return
;
}
#endif
#if defined ZMQ_HAVE_OPENPGM
// Both PGM and EPGM transports are using the same infrastructure.
...
...
src/socket_base.cpp
View file @
626099aa
...
...
@@ -37,7 +37,6 @@
#include "socket_base.hpp"
#include "tcp_listener.hpp"
#include "ipc_listener.hpp"
#include "vtcp_listener.hpp"
#include "tcp_connecter.hpp"
#include "io_thread.hpp"
#include "session_base.hpp"
...
...
@@ -173,8 +172,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
{
// First check out whether the protcol is something we are aware of.
if
(
protocol_
!=
"inproc"
&&
protocol_
!=
"ipc"
&&
protocol_
!=
"tcp"
&&
protocol_
!=
"pgm"
&&
protocol_
!=
"epgm"
&&
protocol_
!=
"sys"
&&
protocol_
!=
"vtcp"
)
{
protocol_
!=
"pgm"
&&
protocol_
!=
"epgm"
&&
protocol_
!=
"sys"
)
{
errno
=
EPROTONOSUPPORT
;
return
-
1
;
}
...
...
@@ -188,14 +186,6 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
}
#endif
// If 0MQ is not compiled with VTCP, vtcp transport is not avaialble.
#if !defined ZMQ_HAVE_VTCP
if
(
protocol_
==
"vtcp"
)
{
errno
=
EPROTONOSUPPORT
;
return
-
1
;
}
#endif
// IPC transport is not available on Windows and OpenVMS.
#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
if
(
protocol_
==
"ipc"
)
{
...
...
@@ -389,21 +379,6 @@ int zmq::socket_base_t::bind (const char *addr_)
}
#endif
#if defined ZMQ_HAVE_VTCP
if
(
protocol
==
"vtcp"
)
{
vtcp_listener_t
*
listener
=
new
(
std
::
nothrow
)
vtcp_listener_t
(
io_thread
,
this
,
options
);
alloc_assert
(
listener
);
int
rc
=
listener
->
set_address
(
address
.
c_str
());
if
(
rc
!=
0
)
{
delete
listener
;
return
-
1
;
}
launch_child
(
listener
);
return
0
;
}
#endif
zmq_assert
(
false
);
return
-
1
;
}
...
...
src/vtcp_connecter.cpp
deleted
100644 → 0
View file @
ac7717b7
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vtcp_connecter.hpp"
#if defined ZMQ_HAVE_VTCP
#include <new>
#include <string>
#include "stream_engine.hpp"
#include "io_thread.hpp"
#include "platform.hpp"
#include "random.hpp"
#include "likely.hpp"
#include "err.hpp"
#include "ip.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <netinet/in.h>
#include <netdb.h>
#include <fcntl.h>
#ifdef ZMQ_HAVE_OPENVMS
#include <ioctl.h>
#endif
#endif
zmq
::
vtcp_connecter_t
::
vtcp_connecter_t
(
class
io_thread_t
*
io_thread_
,
class
session_base_t
*
session_
,
const
options_t
&
options_
,
const
char
*
address_
,
bool
wait_
)
:
own_t
(
io_thread_
,
options_
),
io_object_t
(
io_thread_
),
s
(
retired_fd
),
handle_valid
(
false
),
wait
(
wait_
),
session
(
session_
),
current_reconnect_ivl
(
options
.
reconnect_ivl
)
{
subport
=
0
;
int
rc
=
set_address
(
address_
);
zmq_assert
(
rc
==
0
);
}
zmq
::
vtcp_connecter_t
::~
vtcp_connecter_t
()
{
if
(
wait
)
cancel_timer
(
reconnect_timer_id
);
if
(
handle_valid
)
rm_fd
(
handle
);
if
(
s
!=
retired_fd
)
close
();
}
int
zmq
::
vtcp_connecter_t
::
set_address
(
const
char
*
addr_
)
{
const
char
*
delimiter
=
strrchr
(
addr_
,
'.'
);
if
(
!
delimiter
)
{
delimiter
=
strrchr
(
addr_
,
':'
);
if
(
!
delimiter
)
{
errno
=
EINVAL
;
return
-
1
;
}
std
::
string
addr_str
(
addr_
,
delimiter
-
addr_
);
addr_str
+=
":9220"
;
std
::
string
subport_str
(
delimiter
+
1
);
subport
=
(
vtcp_subport_t
)
atoi
(
subport_str
.
c_str
());
int
rc
=
address
.
resolve
(
addr_str
.
c_str
(),
false
,
true
);
if
(
rc
!=
0
)
return
-
1
;
}
else
{
std
::
string
addr_str
(
addr_
,
delimiter
-
addr_
);
std
::
string
subport_str
(
delimiter
+
1
);
subport
=
(
vtcp_subport_t
)
atoi
(
subport_str
.
c_str
());
int
rc
=
address
.
resolve
(
addr_str
.
c_str
(),
false
,
true
);
if
(
rc
!=
0
)
return
-
1
;
}
return
0
;
}
void
zmq
::
vtcp_connecter_t
::
process_plug
()
{
if
(
wait
)
add_reconnect_timer
();
else
start_connecting
();
}
void
zmq
::
vtcp_connecter_t
::
in_event
()
{
// We are not polling for incomming data, so we are actually called
// because of error here. However, we can get error on out event as well
// on some platforms, so we'll simply handle both events in the same way.
out_event
();
}
void
zmq
::
vtcp_connecter_t
::
out_event
()
{
fd_t
fd
=
connect
();
rm_fd
(
handle
);
handle_valid
=
false
;
// Handle the error condition by attempt to reconnect.
if
(
fd
==
retired_fd
)
{
close
();
wait
=
true
;
add_reconnect_timer
();
return
;
}
// Create the engine object for this connection.
stream_engine_t
*
engine
=
new
(
std
::
nothrow
)
stream_engine_t
(
fd
,
options
);
alloc_assert
(
engine
);
// Attach the engine to the corresponding session object.
send_attach
(
session
,
engine
);
// Shut the connecter down.
terminate
();
}
void
zmq
::
vtcp_connecter_t
::
timer_event
(
int
id_
)
{
zmq_assert
(
id_
==
reconnect_timer_id
);
wait
=
false
;
start_connecting
();
}
void
zmq
::
vtcp_connecter_t
::
start_connecting
()
{
// Open the connecting socket.
int
rc
=
open
();
// Handle error condition by eventual reconnect.
if
(
unlikely
(
rc
!=
0
))
{
errno_assert
(
false
);
wait
=
true
;
add_reconnect_timer
();
return
;
}
// Connection establishment may be dealyed. Poll for its completion.
handle
=
add_fd
(
s
);
handle_valid
=
true
;
set_pollout
(
handle
);
}
void
zmq
::
vtcp_connecter_t
::
add_reconnect_timer
()
{
add_timer
(
get_new_reconnect_ivl
(),
reconnect_timer_id
);
}
int
zmq
::
vtcp_connecter_t
::
get_new_reconnect_ivl
()
{
// The new interval is the current interval + random value.
int
this_interval
=
current_reconnect_ivl
+
(
generate_random
()
%
options
.
reconnect_ivl
);
// Only change the current reconnect interval if the maximum reconnect
// interval was set and if it's larger than the reconnect interval.
if
(
options
.
reconnect_ivl_max
>
0
&&
options
.
reconnect_ivl_max
>
options
.
reconnect_ivl
)
{
// Calculate the next interval
current_reconnect_ivl
=
current_reconnect_ivl
*
2
;
if
(
current_reconnect_ivl
>=
options
.
reconnect_ivl_max
)
{
current_reconnect_ivl
=
options
.
reconnect_ivl_max
;
}
}
return
this_interval
;
}
int
zmq
::
vtcp_connecter_t
::
open
()
{
zmq_assert
(
s
==
retired_fd
);
// Start the connection procedure.
sockaddr_in
*
paddr
=
(
sockaddr_in
*
)
address
.
addr
();
s
=
vtcp_connect
(
paddr
->
sin_addr
.
s_addr
,
ntohs
(
paddr
->
sin_port
));
// Connect was successfull immediately.
if
(
s
!=
retired_fd
)
return
0
;
// Asynchronous connect was launched.
if
(
errno
==
EINPROGRESS
)
{
errno
=
EAGAIN
;
return
-
1
;
}
// Error occured.
return
-
1
;
}
zmq
::
fd_t
zmq
::
vtcp_connecter_t
::
connect
()
{
int
rc
=
vtcp_acceptc
(
s
,
subport
);
if
(
rc
!=
0
)
{
int
err
=
errno
;
close
();
errno
=
err
;
return
retired_fd
;
}
tune_tcp_socket
(
s
);
fd_t
result
=
s
;
s
=
retired_fd
;
return
result
;
}
int
zmq
::
vtcp_connecter_t
::
close
()
{
zmq_assert
(
s
!=
retired_fd
);
int
rc
=
::
close
(
s
);
if
(
rc
!=
0
)
return
-
1
;
s
=
retired_fd
;
return
0
;
}
#endif
src/vtcp_connecter.hpp
deleted
100644 → 0
View file @
ac7717b7
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __VTCP_CONNECTER_HPP_INCLUDED__
#define __VTCP_CONNECTER_HPP_INCLUDED__
#include "platform.hpp"
#if defined ZMQ_HAVE_VTCP
#include <vtcp.h>
#include "fd.hpp"
#include "own.hpp"
#include "stdint.hpp"
#include "io_object.hpp"
#include "tcp_address.hpp"
namespace
zmq
{
class
vtcp_connecter_t
:
public
own_t
,
public
io_object_t
{
public
:
// If 'delay' is true connecter first waits for a while, then starts
// connection process.
vtcp_connecter_t
(
class
io_thread_t
*
io_thread_
,
class
session_base_t
*
session_
,
const
options_t
&
options_
,
const
char
*
address_
,
bool
delay_
);
~
vtcp_connecter_t
();
private
:
// ID of the timer used to delay the reconnection.
enum
{
reconnect_timer_id
=
1
};
// Handlers for incoming commands.
void
process_plug
();
// Handlers for I/O events.
void
in_event
();
void
out_event
();
void
timer_event
(
int
id_
);
// Internal function to start the actual connection establishment.
void
start_connecting
();
// Internal function to add a reconnect timer
void
add_reconnect_timer
();
// Internal function to return a reconnect backoff delay.
// Will modify the current_reconnect_ivl used for next call
// Returns the currently used interval
int
get_new_reconnect_ivl
();
// Set address to connect to.
int
set_address
(
const
char
*
addr_
);
// Open TCP connecting socket. Returns -1 in case of error,
// 0 if connect was successfull immediately and 1 if async connect
// was launched.
int
open
();
// Close the connecting socket.
int
close
();
// Get the file descriptor of newly created connection. Returns
// retired_fd if the connection was unsuccessfull.
fd_t
connect
();
// Address to connect to.
tcp_address_t
address
;
vtcp_subport_t
subport
;
// Underlying socket.
fd_t
s
;
// Handle corresponding to the listening socket.
handle_t
handle
;
// If true file descriptor is registered with the poller and 'handle'
// contains valid value.
bool
handle_valid
;
// If true, connecter is waiting a while before trying to connect.
bool
wait
;
// Reference to the session we belong to.
class
session_base_t
*
session
;
// Current reconnect ivl, updated for backoff strategy
int
current_reconnect_ivl
;
vtcp_connecter_t
(
const
vtcp_connecter_t
&
);
const
vtcp_connecter_t
&
operator
=
(
const
vtcp_connecter_t
&
);
};
}
#endif
#endif
src/vtcp_listener.cpp
deleted
100644 → 0
View file @
ac7717b7
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vtcp_listener.hpp"
#if defined ZMQ_HAVE_VTCP
#include <string>
#include <string.h>
#include <vtcp.h>
#include "stream_engine.hpp"
#include "session_base.hpp"
#include "stdint.hpp"
#include "err.hpp"
#include "ip.hpp"
zmq
::
vtcp_listener_t
::
vtcp_listener_t
(
io_thread_t
*
io_thread_
,
socket_base_t
*
socket_
,
options_t
&
options_
)
:
own_t
(
io_thread_
,
options_
),
io_object_t
(
io_thread_
),
s
(
retired_fd
),
socket
(
socket_
)
{
}
zmq
::
vtcp_listener_t
::~
vtcp_listener_t
()
{
if
(
s
!=
retired_fd
)
{
int
rc
=
::
close
(
s
);
errno_assert
(
rc
==
0
);
s
=
retired_fd
;
}
}
int
zmq
::
vtcp_listener_t
::
set_address
(
const
char
*
addr_
)
{
// VTCP doesn't allow for binding to a specific interface. Connection
// string has to begin with *: (INADDR_ANY).
if
(
strlen
(
addr_
)
<
2
||
addr_
[
0
]
!=
'*'
||
addr_
[
1
]
!=
':'
)
{
errno
=
EADDRNOTAVAIL
;
return
-
1
;
}
// Parse port and subport.
uint16_t
port
;
uint32_t
subport
;
const
char
*
delimiter
=
strrchr
(
addr_
,
'.'
);
if
(
!
delimiter
)
{
port
=
9220
;
subport
=
(
uint32_t
)
atoi
(
addr_
+
2
);
}
else
{
std
::
string
port_str
(
addr_
+
2
,
delimiter
-
addr_
-
2
);
std
::
string
subport_str
(
delimiter
+
1
);
port
=
(
uint16_t
)
atoi
(
port_str
.
c_str
());
subport
=
(
uint32_t
)
atoi
(
subport_str
.
c_str
());
}
// Start listening.
s
=
vtcp_bind
(
port
,
subport
);
if
(
s
==
retired_fd
)
return
-
1
;
return
0
;
}
void
zmq
::
vtcp_listener_t
::
process_plug
()
{
// Start polling for incoming connections.
handle
=
add_fd
(
s
);
set_pollin
(
handle
);
}
void
zmq
::
vtcp_listener_t
::
process_term
(
int
linger_
)
{
rm_fd
(
handle
);
own_t
::
process_term
(
linger_
);
}
void
zmq
::
vtcp_listener_t
::
in_event
()
{
fd_t
fd
=
vtcp_acceptb
(
s
);
if
(
fd
==
retired_fd
)
return
;
tune_tcp_socket
(
fd
);
// Create the engine object for this connection.
stream_engine_t
*
engine
=
new
(
std
::
nothrow
)
stream_engine_t
(
fd
,
options
);
alloc_assert
(
engine
);
// Choose I/O thread to run connecter in. Given that we are already
// running in an I/O thread, there must be at least one available.
io_thread_t
*
io_thread
=
choose_io_thread
(
options
.
affinity
);
zmq_assert
(
io_thread
);
// Create and launch a session object.
session_base_t
*
session
=
session_base_t
::
create
(
io_thread
,
false
,
socket
,
options
,
NULL
,
NULL
);
alloc_assert
(
session
);
session
->
inc_seqnum
();
launch_child
(
session
);
send_attach
(
session
,
engine
,
false
);
}
#endif
src/vtcp_listener.hpp
deleted
100644 → 0
View file @
ac7717b7
/*
Copyright (c) 2009-2011 250bpm s.r.o.
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_VTCP_LISTENER_HPP_INCLUDED__
#define __ZMQ_VTCP_LISTENER_HPP_INCLUDED__
#include "platform.hpp"
#if defined ZMQ_HAVE_VTCP
#include "own.hpp"
#include "io_object.hpp"
#include "fd.hpp"
namespace
zmq
{
class
vtcp_listener_t
:
public
own_t
,
public
io_object_t
{
public
:
vtcp_listener_t
(
class
io_thread_t
*
io_thread_
,
class
socket_base_t
*
socket_
,
class
options_t
&
options_
);
~
vtcp_listener_t
();
int
set_address
(
const
char
*
addr_
);
private
:
// Handlers for incoming commands.
void
process_plug
();
void
process_term
(
int
linger_
);
// Handlers for I/O events.
void
in_event
();
// VTCP listener socket.
fd_t
s
;
// Handle corresponding to the listening socket.
handle_t
handle
;
// Socket the listerner belongs to.
class
socket_base_t
*
socket
;
vtcp_listener_t
(
const
vtcp_listener_t
&
);
const
vtcp_listener_t
&
operator
=
(
const
vtcp_listener_t
&
);
};
}
#endif
#endif
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment