Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
5b5b5133
Commit
5b5b5133
authored
Aug 09, 2009
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
socket options interface modeled as in BSD sockets
parent
a8b410e6
Hide whitespace changes
Inline
Side-by-side
Showing
15 changed files
with
222 additions
and
53 deletions
+222
-53
zmq.h
include/zmq.h
+14
-14
zmq.hpp
include/zmq.hpp
+10
-4
Makefile.am
src/Makefile.am
+2
-0
epoll.cpp
src/epoll.cpp
+3
-1
i_api.hpp
src/i_api.hpp
+4
-2
io_object.cpp
src/io_object.cpp
+44
-1
io_object.hpp
src/io_object.hpp
+18
-5
io_thread.hpp
src/io_thread.hpp
+1
-1
socket_base.cpp
src/socket_base.cpp
+20
-13
socket_base.hpp
src/socket_base.hpp
+5
-4
zmq.cpp
src/zmq.cpp
+9
-4
zmq_engine.cpp
src/zmq_engine.cpp
+35
-0
zmq_engine.hpp
src/zmq_engine.hpp
+47
-0
zmq_listener.cpp
src/zmq_listener.cpp
+7
-2
zmq_listener.hpp
src/zmq_listener.hpp
+3
-2
No files found.
include/zmq.h
View file @
5b5b5133
...
...
@@ -42,6 +42,14 @@ extern "C" {
#define ZMQ_DELIMITER 31
#define ZMQ_VSM 32
// Socket options.
#define ZMQ_HWM 1
#define ZMQ_LWM 2
#define ZMQ_SWAP 3
#define ZMQ_MASK 4
#define ZMQ_AFFINITY 5
#define ZMQ_SESSIONID 6
// The operation should be performed in non-blocking mode. I.e. if it cannot
// be processed immediately, error should be returned with errno set to EAGAIN.
#define ZMQ_NOBLOCK 1
...
...
@@ -94,18 +102,6 @@ struct zmq_msg
unsigned
char
vsm_data
[
ZMQ_MAX_VSM_SIZE
];
};
// TODO: Different options...
struct
zmq_opts
{
uint64_t
hwm
;
uint64_t
lwm
;
uint64_t
swap
;
uint64_t
mask
;
uint64_t
taskset
;
const
char
*
identity
;
const
char
*
args
;
};
// Initialise an empty message (zero bytes long).
ZMQ_EXPORT
int
zmq_msg_init
(
zmq_msg
*
msg
);
...
...
@@ -165,11 +161,15 @@ ZMQ_EXPORT void *zmq_socket (void *context, int type);
// Close the socket.
ZMQ_EXPORT
int
zmq_close
(
void
*
s
);
// Sets an option on the socket.
ZMQ_EXPORT
int
zmq_setsockopt
(
void
*
s
,
int
option_
,
void
*
optval_
,
size_t
optvallen_
);
// Bind the socket to a particular address.
ZMQ_EXPORT
int
zmq_bind
(
void
*
s
,
const
char
*
addr
,
zmq_opts
*
opts
);
ZMQ_EXPORT
int
zmq_bind
(
void
*
s
,
const
char
*
addr
);
// Connect the socket to a particular address.
ZMQ_EXPORT
int
zmq_connect
(
void
*
s
,
const
char
*
addr
,
zmq_opts
*
opts
);
ZMQ_EXPORT
int
zmq_connect
(
void
*
s
,
const
char
*
addr
);
// Subscribe for the subset of messages identified by 'criteria' argument.
ZMQ_EXPORT
int
zmq_subscribe
(
void
*
s
,
const
char
*
criteria
);
...
...
include/zmq.hpp
View file @
5b5b5133
...
...
@@ -230,9 +230,15 @@ namespace zmq
assert
(
rc
==
0
);
}
inline
void
bind
(
const
char
*
addr_
,
zmq_opts
*
opts_
=
NULL
)
template
<
typename
T
>
inline
void
setsockopt
(
int
option_
,
T
&
value_
)
{
int
rc
=
zmq_bind
(
ptr
,
addr_
,
opts_
);
int
rc
=
zmq_setsockopt
(
ptr
,
option_
,
(
void
*
)
&
value_
,
sizeof
(
T
));
assert
(
rc
==
0
);
}
inline
void
bind
(
const
char
*
addr_
)
{
int
rc
=
zmq_bind
(
ptr
,
addr_
);
if
(
rc
==
-
1
)
{
assert
(
errno
==
EINVAL
||
errno
==
EADDRINUSE
);
if
(
errno
==
EINVAL
)
...
...
@@ -242,9 +248,9 @@ namespace zmq
}
}
inline
void
connect
(
const
char
*
addr_
,
zmq_opts
*
opts_
=
NULL
)
inline
void
connect
(
const
char
*
addr_
)
{
int
rc
=
zmq_connect
(
ptr
,
addr_
,
opts_
);
int
rc
=
zmq_connect
(
ptr
,
addr_
);
if
(
rc
==
-
1
)
{
assert
(
errno
==
EINVAL
||
errno
==
EADDRINUSE
);
if
(
errno
==
EINVAL
)
...
...
src/Makefile.am
View file @
5b5b5133
...
...
@@ -44,6 +44,7 @@ libzmq_la_SOURCES = \
ypipe.hpp
\
ypollset.hpp
\
yqueue.hpp
\
zmq_engine.hpp
\
zmq_listener.hpp
\
app_thread.cpp
\
devpoll.cpp
\
...
...
@@ -66,6 +67,7 @@ libzmq_la_SOURCES = \
uuid.cpp
\
ypollset.cpp
\
zmq.cpp
\
zmq_engine.cpp
\
zmq_listener.cpp
libzmq_la_LDFLAGS
=
-version-info
0:0:0
...
...
src/epoll.cpp
View file @
5b5b5133
...
...
@@ -41,10 +41,12 @@ zmq::epoll_t::epoll_t () :
zmq
::
epoll_t
::~
epoll_t
()
{
// Wait till the worker thread exits.
worker
.
stop
();
// Make sure there are no fds registered on shutdown.
zmq_assert
(
load
.
get
()
==
0
);
worker
.
stop
();
close
(
epoll_fd
);
for
(
retired_t
::
iterator
it
=
retired
.
begin
();
it
!=
retired
.
end
();
it
++
)
delete
*
it
;
...
...
src/i_api.hpp
View file @
5b5b5133
...
...
@@ -27,8 +27,10 @@ namespace zmq
{
virtual
~
i_api
()
{}
virtual
int
bind
(
const
char
*
addr_
,
struct
zmq_opts
*
opts_
)
=
0
;
virtual
int
connect
(
const
char
*
addr_
,
struct
zmq_opts
*
opts_
)
=
0
;
virtual
int
setsockopt
(
int
option_
,
void
*
optval_
,
size_t
optvallen_
)
=
0
;
virtual
int
bind
(
const
char
*
addr_
)
=
0
;
virtual
int
connect
(
const
char
*
addr_
)
=
0
;
virtual
int
subscribe
(
const
char
*
criteria_
)
=
0
;
virtual
int
send
(
struct
zmq_msg
*
msg_
,
int
flags_
)
=
0
;
virtual
int
flush
()
=
0
;
...
...
src/io_object.cpp
View file @
5b5b5133
...
...
@@ -18,17 +18,60 @@
*/
#include "io_object.hpp"
#include "io_thread.hpp"
zmq
::
io_object_t
::
io_object_t
(
object
_t
*
parent_
,
object_t
*
owner_
)
:
zmq
::
io_object_t
::
io_object_t
(
io_thread
_t
*
parent_
,
object_t
*
owner_
)
:
object_t
(
parent_
),
owner
(
owner_
)
{
// Retrieve the poller from the thread we are running in.
poller
=
parent_
->
get_poller
();
}
zmq
::
io_object_t
::~
io_object_t
()
{
}
zmq
::
handle_t
zmq
::
io_object_t
::
add_fd
(
fd_t
fd_
,
i_poll_events
*
events_
)
{
return
poller
->
add_fd
(
fd_
,
events_
);
}
void
zmq
::
io_object_t
::
rm_fd
(
handle_t
handle_
)
{
poller
->
rm_fd
(
handle_
);
}
void
zmq
::
io_object_t
::
set_pollin
(
handle_t
handle_
)
{
poller
->
set_pollin
(
handle_
);
}
void
zmq
::
io_object_t
::
reset_pollin
(
handle_t
handle_
)
{
poller
->
reset_pollin
(
handle_
);
}
void
zmq
::
io_object_t
::
set_pollout
(
handle_t
handle_
)
{
poller
->
set_pollout
(
handle_
);
}
void
zmq
::
io_object_t
::
reset_pollout
(
handle_t
handle_
)
{
poller
->
reset_pollout
(
handle_
);
}
void
zmq
::
io_object_t
::
add_timer
(
i_poll_events
*
events_
)
{
poller
->
add_timer
(
events_
);
}
void
zmq
::
io_object_t
::
cancel_timer
(
i_poll_events
*
events_
)
{
poller
->
cancel_timer
(
events_
);
}
void
zmq
::
io_object_t
::
term
()
{
send_term_req
(
owner
,
this
);
...
...
src/io_object.hpp
View file @
5b5b5133
...
...
@@ -21,6 +21,7 @@
#define __ZMQ_IO_OBJECT_HPP_INCLUDED__
#include "object.hpp"
#include "i_poller.hpp"
namespace
zmq
{
...
...
@@ -31,7 +32,7 @@ namespace zmq
// I/O object will live in the thread inherited from the parent.
// However, it's lifetime is managed by the owner.
io_object_t
(
object
_t
*
parent_
,
object_t
*
owner_
);
io_object_t
(
class
io_thread
_t
*
parent_
,
object_t
*
owner_
);
protected
:
...
...
@@ -44,15 +45,27 @@ namespace zmq
// of I/O object correctly.
virtual
~
io_object_t
();
private
:
// Handlers for incoming commands.
void
process_term
();
// Methods to access underlying poller object.
handle_t
add_fd
(
fd_t
fd_
,
struct
i_poll_events
*
events_
);
void
rm_fd
(
handle_t
handle_
);
void
set_pollin
(
handle_t
handle_
);
void
reset_pollin
(
handle_t
handle_
);
void
set_pollout
(
handle_t
handle_
);
void
reset_pollout
(
handle_t
handle_
);
void
add_timer
(
struct
i_poll_events
*
events_
);
void
cancel_timer
(
struct
i_poll_events
*
events_
);
// Socket owning this I/O object. It is responsible for destroying
// it when it's being closed.
object_t
*
owner
;
private
:
struct
i_poller
*
poller
;
// Handlers for incoming commands.
void
process_term
();
io_object_t
(
const
io_object_t
&
);
void
operator
=
(
const
io_object_t
&
);
};
...
...
src/io_thread.hpp
View file @
5b5b5133
...
...
@@ -57,7 +57,7 @@ namespace zmq
void
out_event
();
void
timer_event
();
//
???
//
Used by io_objects to retrieve the assciated poller object.
struct
i_poller
*
get_poller
();
// Command handlers.
...
...
src/socket_base.cpp
View file @
5b5b5133
...
...
@@ -46,8 +46,9 @@ zmq::socket_base_t::~socket_base_t ()
break
;
// Send termination request to all associated I/O objects.
for
(
io_objects_t
::
size_type
i
=
0
;
i
!=
io_objects
.
size
();
i
++
)
send_term
(
io_objects
[
i
]);
for
(
io_objects_t
::
iterator
it
=
io_objects
.
begin
();
it
!=
io_objects
.
end
();
it
++
)
send_term
(
*
it
);
// Move the objects to the list of pending term acks.
pending_term_acks
+=
io_objects
.
size
();
...
...
@@ -59,16 +60,23 @@ zmq::socket_base_t::~socket_base_t ()
}
}
int
zmq
::
socket_base_t
::
bind
(
const
char
*
addr_
,
struct
zmq_opts
*
opts_
)
int
zmq
::
socket_base_t
::
setsockopt
(
int
option_
,
void
*
optval_
,
size_t
optvallen_
)
{
uint64_t
taskset
=
opts_
?
opts_
->
taskset
:
0
;
zmq_assert
(
false
);
}
int
zmq
::
socket_base_t
::
bind
(
const
char
*
addr_
)
{
// TODO: The taskset should be taken from socket options.
uint64_t
taskset
=
0
;
object_t
*
listener
=
new
zmq_listener_t
(
choose_io_thread
(
taskset
),
this
);
send_plug
(
listener
);
send_own
(
this
,
listener
);
return
0
;
}
int
zmq
::
socket_base_t
::
connect
(
const
char
*
addr_
,
struct
zmq_opts
*
opts_
)
int
zmq
::
socket_base_t
::
connect
(
const
char
*
addr_
)
{
zmq_assert
(
false
);
}
...
...
@@ -102,24 +110,23 @@ int zmq::socket_base_t::close ()
void
zmq
::
socket_base_t
::
process_own
(
object_t
*
object_
)
{
io_objects
.
push_back
(
object_
);
io_objects
.
insert
(
object_
);
}
void
zmq
::
socket_base_t
::
process_term_req
(
object_t
*
object_
)
{
// If I/O object is well and alive ask it to terminate.
// TODO: Following find may produce an unacceptable jitter in
// C10K-style applications. If so, use set instead of vector.
io_objects_t
::
iterator
it
=
std
::
find
(
io_objects
.
begin
(),
io_objects
.
end
(),
object_
);
if
(
it
!=
io_objects
.
end
())
{
pending_term_acks
++
;
io_objects
.
erase
(
it
);
send_term
(
object_
);
}
// If not found, we assume that termination request was already sent to
// the object so we can sagely ignore the request.
if
(
it
==
io_objects
.
end
())
return
;
pending_term_acks
++
;
io_objects
.
erase
(
it
);
send_term
(
object_
);
}
void
zmq
::
socket_base_t
::
process_term_ack
()
...
...
src/socket_base.hpp
View file @
5b5b5133
...
...
@@ -20,7 +20,7 @@
#ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__
#define __ZMQ_SOCKET_BASE_HPP_INCLUDED__
#include <
vector
>
#include <
set
>
#include "i_api.hpp"
#include "object.hpp"
...
...
@@ -36,8 +36,9 @@ namespace zmq
~
socket_base_t
();
// i_api interface implementation.
int
bind
(
const
char
*
addr_
,
struct
zmq_opts
*
opts_
);
int
connect
(
const
char
*
addr_
,
struct
zmq_opts
*
opts_
);
int
setsockopt
(
int
option_
,
void
*
optval_
,
size_t
optvallen_
);
int
bind
(
const
char
*
addr_
);
int
connect
(
const
char
*
addr_
);
int
subscribe
(
const
char
*
criteria_
);
int
send
(
struct
zmq_msg
*
msg_
,
int
flags_
);
int
flush
();
...
...
@@ -53,7 +54,7 @@ namespace zmq
// List of all I/O objects owned by this socket. The socket is
// responsible for deallocating them before it quits.
typedef
std
::
vector
<
object_t
*>
io_objects_t
;
typedef
std
::
set
<
object_t
*>
io_objects_t
;
io_objects_t
io_objects
;
// Number of I/O objects that were already asked to terminate
...
...
src/zmq.cpp
View file @
5b5b5133
...
...
@@ -192,14 +192,19 @@ int zmq_close (void *s_)
return
0
;
}
int
zmq_
bind
(
void
*
s_
,
const
char
*
addr_
,
zmq_opts
*
opts
_
)
int
zmq_
setsockopt
(
void
*
s_
,
int
option_
,
void
*
optval_
,
size_t
optvallen
_
)
{
return
(((
zmq
::
i_api
*
)
s_
)
->
bind
(
addr_
,
opts
_
));
return
(((
zmq
::
i_api
*
)
s_
)
->
setsockopt
(
option_
,
optval_
,
optvallen
_
));
}
int
zmq_
connect
(
void
*
s_
,
const
char
*
addr_
,
zmq_opts
*
opts
_
)
int
zmq_
bind
(
void
*
s_
,
const
char
*
addr
_
)
{
return
(((
zmq
::
i_api
*
)
s_
)
->
connect
(
addr_
,
opts_
));
return
(((
zmq
::
i_api
*
)
s_
)
->
bind
(
addr_
));
}
int
zmq_connect
(
void
*
s_
,
const
char
*
addr_
)
{
return
(((
zmq
::
i_api
*
)
s_
)
->
connect
(
addr_
));
}
int
zmq_subscribe
(
void
*
s_
,
const
char
*
criteria_
)
...
...
src/zmq_engine.cpp
0 → 100644
View file @
5b5b5133
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "zmq_engine.hpp"
#include "io_thread.hpp"
zmq
::
zmq_engine_t
::
zmq_engine_t
(
io_thread_t
*
parent_
,
object_t
*
owner_
)
:
io_object_t
(
parent_
,
owner_
)
{
}
zmq
::
zmq_engine_t
::~
zmq_engine_t
()
{
}
void
zmq
::
zmq_engine_t
::
process_plug
()
{
}
src/zmq_engine.hpp
0 → 100644
View file @
5b5b5133
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__
#define __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__
#include "io_object.hpp"
namespace
zmq
{
class
zmq_engine_t
:
public
io_object_t
{
public
:
zmq_engine_t
(
class
io_thread_t
*
parent_
,
object_t
*
owner_
);
private
:
~
zmq_engine_t
();
// Handlers for incoming commands.
void
process_plug
();
zmq_engine_t
(
const
zmq_engine_t
&
);
void
operator
=
(
const
zmq_engine_t
&
);
};
}
#endif
src/zmq_listener.cpp
View file @
5b5b5133
...
...
@@ -18,9 +18,11 @@
*/
#include "zmq_listener.hpp"
#include "zmq_engine.hpp"
#include "io_thread.hpp"
#include "err.hpp"
zmq
::
zmq_listener_t
::
zmq_listener_t
(
object
_t
*
parent_
,
object_t
*
owner_
)
:
zmq
::
zmq_listener_t
::
zmq_listener_t
(
io_thread
_t
*
parent_
,
object_t
*
owner_
)
:
io_object_t
(
parent_
,
owner_
)
{
}
...
...
@@ -31,5 +33,8 @@ zmq::zmq_listener_t::~zmq_listener_t ()
void
zmq
::
zmq_listener_t
::
process_plug
()
{
// TODO: Register with the I/O thread here.
// TODO: Testing code follows...
object_t
*
engine
=
new
zmq_engine_t
(
choose_io_thread
(
0
),
owner
);
send_plug
(
engine
);
send_own
(
owner
,
engine
);
}
src/zmq_listener.hpp
View file @
5b5b5133
...
...
@@ -29,11 +29,12 @@ namespace zmq
{
public
:
zmq_listener_t
(
object_t
*
parent_
,
object_t
*
owner_
);
~
zmq_listener_t
();
zmq_listener_t
(
class
io_thread_t
*
parent_
,
object_t
*
owner_
);
private
:
~
zmq_listener_t
();
// Handlers for incoming commands.
void
process_plug
();
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment