Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
9f1f823b
Commit
9f1f823b
authored
Aug 09, 2009
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
zmq_listener/zmq_connecter implemented
parent
3147ff85
Show whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
239 additions
and
33 deletions
+239
-33
Makefile.am
src/Makefile.am
+2
-0
io_object.cpp
src/io_object.cpp
+7
-6
io_object.hpp
src/io_object.hpp
+11
-7
socket_base.cpp
src/socket_base.cpp
+10
-2
tcp_connecter.cpp
src/tcp_connecter.cpp
+13
-11
tcp_connecter.hpp
src/tcp_connecter.hpp
+8
-1
zmq_connecter.cpp
src/zmq_connecter.cpp
+110
-0
zmq_connecter.hpp
src/zmq_connecter.hpp
+70
-0
zmq_listener.cpp
src/zmq_listener.cpp
+6
-3
zmq_listener.hpp
src/zmq_listener.hpp
+2
-3
No files found.
src/Makefile.am
View file @
9f1f823b
...
...
@@ -44,6 +44,7 @@ libzmq_la_SOURCES = \
ypipe.hpp
\
ypollset.hpp
\
yqueue.hpp
\
zmq_connecter.hpp
\
zmq_engine.hpp
\
zmq_listener.hpp
\
app_thread.cpp
\
...
...
@@ -67,6 +68,7 @@ libzmq_la_SOURCES = \
uuid.cpp
\
ypollset.cpp
\
zmq.cpp
\
zmq_connecter.cpp
\
zmq_engine.cpp
\
zmq_listener.cpp
...
...
src/io_object.cpp
View file @
9f1f823b
...
...
@@ -52,9 +52,9 @@ void zmq::io_object_t::process_plug ()
plugged_in
=
true
;
}
zmq
::
handle_t
zmq
::
io_object_t
::
add_fd
(
fd_t
fd_
,
i_poll_events
*
events_
)
zmq
::
handle_t
zmq
::
io_object_t
::
add_fd
(
fd_t
fd_
)
{
return
poller
->
add_fd
(
fd_
,
events_
);
return
poller
->
add_fd
(
fd_
,
this
);
}
void
zmq
::
io_object_t
::
rm_fd
(
handle_t
handle_
)
...
...
@@ -82,14 +82,14 @@ void zmq::io_object_t::reset_pollout (handle_t handle_)
poller
->
reset_pollout
(
handle_
);
}
void
zmq
::
io_object_t
::
add_timer
(
i_poll_events
*
events_
)
void
zmq
::
io_object_t
::
add_timer
()
{
poller
->
add_timer
(
events_
);
poller
->
add_timer
(
this
);
}
void
zmq
::
io_object_t
::
cancel_timer
(
i_poll_events
*
events_
)
void
zmq
::
io_object_t
::
cancel_timer
()
{
poller
->
cancel_timer
(
events_
);
poller
->
cancel_timer
(
this
);
}
void
zmq
::
io_object_t
::
in_event
()
...
...
@@ -126,5 +126,6 @@ void zmq::io_object_t::process_term ()
// Otherwise, destroy the object and acknowledge the termination
// straight away.
send_term_ack
(
owner
);
process_unplug
();
delete
this
;
}
src/io_object.hpp
View file @
9f1f823b
...
...
@@ -51,15 +51,20 @@ namespace zmq
// handler.
void
process_plug
();
// io_object_t defines a new handler used to disconnect the object
// from the poller object. Implement the handlen in the derived
// classes to ensure sane cleanup.
virtual
void
process_unplug
()
=
0
;
// Methods to access underlying poller object.
handle_t
add_fd
(
fd_t
fd_
,
struct
i_poll_events
*
events_
);
handle_t
add_fd
(
fd_t
fd_
);
void
rm_fd
(
handle_t
handle_
);
void
set_pollin
(
handle_t
handle_
);
void
reset_pollin
(
handle_t
handle_
);
void
set_pollout
(
handle_t
handle_
);
void
reset_pollout
(
handle_t
handle_
);
void
add_timer
(
struct
i_poll_events
*
events_
);
void
cancel_timer
(
struct
i_poll_events
*
events_
);
void
add_timer
();
void
cancel_timer
();
// i_poll_events interface implementation.
void
in_event
();
...
...
@@ -70,12 +75,11 @@ namespace zmq
// it when it's being closed.
object_t
*
owner
;
// Set to true when object is plugged in. It's responsibility
// of derived object to set the property after the feat.
bool
plugged_in
;
private
:
// Set to true when object is plugged in.
bool
plugged_in
;
// Set to true when object was terminated before it was plugged in.
// In such case destruction is delayed till 'plug' command arrives.
bool
terminated
;
...
...
src/socket_base.cpp
View file @
9f1f823b
...
...
@@ -25,6 +25,7 @@
#include "app_thread.hpp"
#include "err.hpp"
#include "zmq_listener.hpp"
#include "zmq_connecter.hpp"
#include "io_thread.hpp"
#include "config.hpp"
...
...
@@ -127,7 +128,6 @@ int zmq::socket_base_t::setsockopt (int option_, void *optval_,
int
zmq
::
socket_base_t
::
bind
(
const
char
*
addr_
)
{
// TODO: The taskset should be taken from socket options.
zmq_listener_t
*
listener
=
new
zmq_listener_t
(
choose_io_thread
(
affinity
),
this
);
int
rc
=
listener
->
set_address
(
addr_
);
...
...
@@ -141,7 +141,15 @@ int zmq::socket_base_t::bind (const char *addr_)
int
zmq
::
socket_base_t
::
connect
(
const
char
*
addr_
)
{
zmq_assert
(
false
);
zmq_connecter_t
*
connecter
=
new
zmq_connecter_t
(
choose_io_thread
(
affinity
),
this
);
int
rc
=
connecter
->
set_address
(
addr_
);
if
(
rc
!=
0
)
return
-
1
;
send_plug
(
connecter
);
send_own
(
this
,
connecter
);
return
0
;
}
int
zmq
::
socket_base_t
::
subscribe
(
const
char
*
criteria_
)
...
...
src/tcp_connecter.cpp
View file @
9f1f823b
...
...
@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string>
#include "tcp_connecter.hpp"
#include "platform.hpp"
#include "ip.hpp"
...
...
@@ -40,6 +42,7 @@
zmq
::
tcp_connecter_t
::
tcp_connecter_t
()
:
s
(
retired_fd
)
{
memset
(
&
addr
,
0
,
sizeof
(
addr
));
}
zmq
::
tcp_connecter_t
::~
tcp_connecter_t
()
...
...
@@ -48,15 +51,15 @@ zmq::tcp_connecter_t::~tcp_connecter_t ()
close
();
}
int
zmq
::
tcp_connecter_t
::
open
(
const
char
*
addr_
)
int
zmq
::
tcp_connecter_t
::
set_address
(
const
char
*
addr_
)
{
zmq_assert
(
s
==
retired_fd
);
// Convert the hostname into sockaddr_in structure.
sockaddr_in
address
;
int
rc
=
resolve_ip_hostname
(
&
address
,
addr_
);
if
(
rc
!=
0
)
return
-
1
;
return
resolve_ip_hostname
(
&
addr
,
addr_
);
}
int
zmq
::
tcp_connecter_t
::
open
()
{
zmq_assert
(
s
==
retired_fd
);
// Create the socket.
s
=
socket
(
AF_INET
,
SOCK_STREAM
,
IPPROTO_TCP
);
...
...
@@ -67,7 +70,7 @@ int zmq::tcp_connecter_t::open (const char *addr_)
int
flags
=
fcntl
(
s
,
F_GETFL
,
0
);
if
(
flags
==
-
1
)
flags
=
0
;
rc
=
fcntl
(
s
,
F_SETFL
,
flags
|
O_NONBLOCK
);
int
rc
=
fcntl
(
s
,
F_SETFL
,
flags
|
O_NONBLOCK
);
errno_assert
(
rc
!=
-
1
);
// Disable Nagle's algorithm.
...
...
@@ -83,7 +86,7 @@ int zmq::tcp_connecter_t::open (const char *addr_)
#endif
// Connect to the remote peer.
rc
=
::
connect
(
s
,
(
sockaddr
*
)
&
addr
ess
,
sizeof
address
);
rc
=
::
connect
(
s
,
(
sockaddr
*
)
&
addr
,
sizeof
(
addr
)
);
// Connect was successfull immediately.
if
(
rc
==
0
)
...
...
@@ -91,7 +94,7 @@ int zmq::tcp_connecter_t::open (const char *addr_)
// Asynchronous connect was launched.
if
(
rc
==
-
1
&&
errno
==
EINPROGRESS
)
return
1
;
return
-
1
;
// Error occured.
int
err
=
errno
;
...
...
@@ -125,7 +128,6 @@ zmq::fd_t zmq::tcp_connecter_t::connect ()
if
(
rc
==
-
1
)
err
=
errno
;
if
(
err
!=
0
)
{
close
();
errno
=
err
;
return
retired_fd
;
}
...
...
src/tcp_connecter.hpp
View file @
9f1f823b
...
...
@@ -21,6 +21,7 @@
#define __ZMQ_TCP_CONNECTER_HPP_INCLUDED__
#include "fd.hpp"
#include "ip.hpp"
namespace
zmq
{
...
...
@@ -34,11 +35,14 @@ namespace zmq
tcp_connecter_t
();
~
tcp_connecter_t
();
// Set IP address/port to connect to.
int
set_address
(
const
char
*
addr_
);
// Open TCP connecting socket. Address is in
// <hostname>:<port-number> format. Returns -1 in case of error,
// 0 if connect was successfull immediately and 1 if async connect
// was launched.
int
open
(
const
char
*
addr_
);
int
open
();
// Close the connecting socket.
int
close
();
...
...
@@ -53,6 +57,9 @@ namespace zmq
private
:
// Address to connect to.
sockaddr_in
addr
;
// Underlying socket.
fd_t
s
;
...
...
src/zmq_connecter.cpp
0 → 100644
View file @
9f1f823b
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "zmq_connecter.hpp"
#include "err.hpp"
zmq
::
zmq_connecter_t
::
zmq_connecter_t
(
io_thread_t
*
parent_
,
object_t
*
owner_
)
:
io_object_t
(
parent_
,
owner_
),
waiting
(
false
)
{
}
zmq
::
zmq_connecter_t
::~
zmq_connecter_t
()
{
}
int
zmq
::
zmq_connecter_t
::
set_address
(
const
char
*
addr_
)
{
return
tcp_connecter
.
set_address
(
addr_
);
}
void
zmq
::
zmq_connecter_t
::
process_plug
()
{
start_connecting
();
io_object_t
::
process_plug
();
}
void
zmq
::
zmq_connecter_t
::
process_unplug
()
{
if
(
!
waiting
)
rm_fd
(
handle
);
}
void
zmq
::
zmq_connecter_t
::
in_event
()
{
// We are not polling for incomming data, so we are actually called
// because of error here. However, we can get error on out event as well
// on some platforms, so we'll simply handle both events in the same way.
out_event
();
}
void
zmq
::
zmq_connecter_t
::
out_event
()
{
fd_t
fd
=
tcp_connecter
.
connect
();
// If there was error during the connecting, close the socket and wait
// for a while before trying to reconnect.
if
(
fd
==
retired_fd
)
{
rm_fd
(
handle
);
tcp_connecter
.
close
();
waiting
=
true
;
add_timer
();
return
;
}
zmq_assert
(
false
);
/*
object_t *engine = new zmq_engine_t (choose_io_thread (0), owner);
send_plug (engine);
send_own (owner, engine);
*/
}
void
zmq
::
zmq_connecter_t
::
timer_event
()
{
// Reconnect period have elapsed.
waiting
=
false
;
start_connecting
();
}
void
zmq
::
zmq_connecter_t
::
start_connecting
()
{
// Open the connecting socket.
int
rc
=
tcp_connecter
.
open
();
// Connect may succeed in synchronous manner.
if
(
rc
==
0
)
{
out_event
();
return
;
}
// Connection establishment may be dealyed. Poll for its completion.
else
if
(
rc
==
-
1
&&
errno
==
EINPROGRESS
)
{
handle
=
add_fd
(
tcp_connecter
.
get_fd
());
set_pollout
(
handle
);
return
;
}
// If none of the above is true, synchronous error occured.
// Wait for a while and retry.
waiting
=
true
;
add_timer
();
}
src/zmq_connecter.hpp
0 → 100644
View file @
9f1f823b
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__
#define __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__
#include "io_object.hpp"
#include "tcp_connecter.hpp"
namespace
zmq
{
class
zmq_connecter_t
:
public
io_object_t
{
public
:
zmq_connecter_t
(
class
io_thread_t
*
parent_
,
object_t
*
owner_
);
// Set IP address to connect to.
int
set_address
(
const
char
*
addr_
);
private
:
~
zmq_connecter_t
();
// Handlers for incoming commands.
void
process_plug
();
void
process_unplug
();
// Handlers for I/O events.
void
in_event
();
void
out_event
();
void
timer_event
();
// Internal function to start the actual connection establishment.
void
start_connecting
();
// Actual connecting socket.
tcp_connecter_t
tcp_connecter
;
// Handle corresponding to the listening socket.
handle_t
handle
;
// True, if we are waiting for a period of time before trying to
// reconnect.
bool
waiting
;
zmq_connecter_t
(
const
zmq_connecter_t
&
);
void
operator
=
(
const
zmq_connecter_t
&
);
};
}
#endif
src/zmq_listener.cpp
View file @
9f1f823b
...
...
@@ -29,8 +29,6 @@ zmq::zmq_listener_t::zmq_listener_t (io_thread_t *parent_, object_t *owner_) :
zmq
::
zmq_listener_t
::~
zmq_listener_t
()
{
if
(
plugged_in
)
rm_fd
(
handle
);
}
int
zmq
::
zmq_listener_t
::
set_address
(
const
char
*
addr_
)
...
...
@@ -45,12 +43,17 @@ void zmq::zmq_listener_t::process_plug ()
zmq_assert
(
rc
==
0
);
// Start polling for incoming connections.
handle
=
add_fd
(
tcp_listener
.
get_fd
()
,
this
);
handle
=
add_fd
(
tcp_listener
.
get_fd
());
set_pollin
(
handle
);
io_object_t
::
process_plug
();
}
void
zmq
::
zmq_listener_t
::
process_unplug
()
{
rm_fd
(
handle
);
}
void
zmq
::
zmq_listener_t
::
in_event
()
{
fd_t
fd
=
tcp_listener
.
accept
();
...
...
src/zmq_listener.hpp
View file @
9f1f823b
...
...
@@ -20,8 +20,6 @@
#ifndef __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__
#define __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__
#include <string>
#include "io_object.hpp"
#include "tcp_listener.hpp"
...
...
@@ -43,8 +41,9 @@ namespace zmq
// Handlers for incoming commands.
void
process_plug
();
void
process_unplug
();
// Handle I/O events.
// Handle
rs for
I/O events.
void
in_event
();
// Actual listening socket.
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment