Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
95944551
Commit
95944551
authored
Sep 10, 2009
by
malosek
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'master' of git@github.com:sustrik/zeromq2
parents
5acef9fc
b3f32e21
Show whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
159 additions
and
3 deletions
+159
-3
zmq.h
c/zmq.h
+2
-0
Makefile.am
src/Makefile.am
+2
-0
app_thread.cpp
src/app_thread.cpp
+16
-2
socket_base.cpp
src/socket_base.cpp
+5
-0
socket_base.hpp
src/socket_base.hpp
+1
-1
sub.cpp
src/sub.cpp
+82
-0
sub.hpp
src/sub.hpp
+51
-0
No files found.
c/zmq.h
View file @
95944551
...
@@ -50,6 +50,8 @@ extern "C" {
...
@@ -50,6 +50,8 @@ extern "C" {
#define ZMQ_MASK 4
#define ZMQ_MASK 4
#define ZMQ_AFFINITY 5
#define ZMQ_AFFINITY 5
#define ZMQ_IDENTITY 6
#define ZMQ_IDENTITY 6
#define ZMQ_SUBSCRIBE 7
#define ZMQ_UNSUBSCRIBE 8
// The operation should be performed in non-blocking mode. I.e. if it cannot
// The operation should be performed in non-blocking mode. I.e. if it cannot
// be processed immediately, error should be returned with errno set to EAGAIN.
// be processed immediately, error should be returned with errno set to EAGAIN.
...
...
src/Makefile.am
View file @
95944551
...
@@ -70,6 +70,7 @@ libzmq_la_SOURCES = $(pgm_sources) \
...
@@ -70,6 +70,7 @@ libzmq_la_SOURCES = $(pgm_sources) \
simple_semaphore.hpp
\
simple_semaphore.hpp
\
socket_base.hpp
\
socket_base.hpp
\
stdint.hpp
\
stdint.hpp
\
sub.hpp
\
tcp_connecter.hpp
\
tcp_connecter.hpp
\
tcp_listener.hpp
\
tcp_listener.hpp
\
tcp_socket.hpp
\
tcp_socket.hpp
\
...
@@ -105,6 +106,7 @@ libzmq_la_SOURCES = $(pgm_sources) \
...
@@ -105,6 +106,7 @@ libzmq_la_SOURCES = $(pgm_sources) \
select
.cpp
\
select
.cpp
\
session.cpp
\
session.cpp
\
socket_base.cpp
\
socket_base.cpp
\
sub.cpp
\
tcp_connecter.cpp
\
tcp_connecter.cpp
\
tcp_listener.cpp
\
tcp_listener.cpp
\
tcp_socket.cpp
\
tcp_socket.cpp
\
...
...
src/app_thread.cpp
View file @
95944551
...
@@ -35,6 +35,7 @@
...
@@ -35,6 +35,7 @@
#include "pipe.hpp"
#include "pipe.hpp"
#include "config.hpp"
#include "config.hpp"
#include "socket_base.hpp"
#include "socket_base.hpp"
#include "sub.hpp"
// If the RDTSC is available we use it to prevent excessive
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
// polling for commands. The nice thing here is that it will work on any
...
@@ -135,8 +136,21 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_)
...
@@ -135,8 +136,21 @@ void zmq::app_thread_t::process_commands (bool block_, bool throttle_)
zmq
::
socket_base_t
*
zmq
::
app_thread_t
::
create_socket
(
int
type_
)
zmq
::
socket_base_t
*
zmq
::
app_thread_t
::
create_socket
(
int
type_
)
{
{
// TODO: type is ignored for the time being.
socket_base_t
*
s
=
NULL
;
socket_base_t
*
s
=
new
socket_base_t
(
this
);
switch
(
type_
)
{
case
ZMQ_SUB
:
s
=
new
sub_t
(
this
);
break
;
case
ZMQ_P2P
:
case
ZMQ_PUB
:
case
ZMQ_REQ
:
case
ZMQ_REP
:
s
=
new
socket_base_t
(
this
);
break
;
default
:
// TODO: This should be EINVAL.
zmq_assert
(
false
);
}
zmq_assert
(
s
);
zmq_assert
(
s
);
s
->
set_index
(
sockets
.
size
());
s
->
set_index
(
sockets
.
size
());
sockets
.
push_back
(
s
);
sockets
.
push_back
(
s
);
...
...
src/socket_base.cpp
View file @
95944551
...
@@ -140,6 +140,11 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
...
@@ -140,6 +140,11 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
options
.
identity
.
assign
((
const
char
*
)
optval_
,
optvallen_
);
options
.
identity
.
assign
((
const
char
*
)
optval_
,
optvallen_
);
return
0
;
return
0
;
case
ZMQ_SUBSCRIBE
:
case
ZMQ_UNSUBSCRIBE
:
errno
=
ENOTSUP
;
return
-
1
;
default
:
default
:
errno
=
EINVAL
;
errno
=
EINVAL
;
return
-
1
;
return
-
1
;
...
...
src/socket_base.hpp
View file @
95944551
...
@@ -39,7 +39,7 @@ namespace zmq
...
@@ -39,7 +39,7 @@ namespace zmq
public
:
public
:
socket_base_t
(
class
app_thread_t
*
parent_
);
socket_base_t
(
class
app_thread_t
*
parent_
);
~
socket_base_t
();
virtual
~
socket_base_t
();
// Interface for communication with the API layer.
// Interface for communication with the API layer.
virtual
int
setsockopt
(
int
option_
,
const
void
*
optval_
,
virtual
int
setsockopt
(
int
option_
,
const
void
*
optval_
,
...
...
src/sub.cpp
0 → 100644
View file @
95944551
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../c/zmq.h"
#include "sub.hpp"
#include "err.hpp"
zmq
::
sub_t
::
sub_t
(
class
app_thread_t
*
parent_
)
:
socket_base_t
(
parent_
)
{
}
zmq
::
sub_t
::~
sub_t
()
{
}
int
zmq
::
sub_t
::
setsockopt
(
int
option_
,
const
void
*
optval_
,
size_t
optvallen_
)
{
if
(
option_
==
ZMQ_SUBSCRIBE
)
{
std
::
string
subscription
((
const
char
*
)
optval_
,
optvallen_
);
subscriptions
.
insert
(
subscription
);
return
0
;
}
if
(
option_
==
ZMQ_UNSUBSCRIBE
)
{
std
::
string
subscription
((
const
char
*
)
optval_
,
optvallen_
);
subscriptions_t
::
iterator
it
=
subscriptions
.
find
(
subscription
);
if
(
it
==
subscriptions
.
end
())
{
errno
=
EINVAL
;
return
-
1
;
}
subscriptions
.
erase
(
it
);
return
0
;
}
return
socket_base_t
::
setsockopt
(
option_
,
optval_
,
optvallen_
);
}
int
zmq
::
sub_t
::
recv
(
struct
zmq_msg_t
*
msg_
,
int
flags_
)
{
while
(
true
)
{
// Get a message.
int
rc
=
socket_base_t
::
recv
(
msg_
,
flags_
);
// If there's no message available, return immediately.
if
(
rc
!=
0
&&
errno
==
EAGAIN
)
return
-
1
;
// Check the message format.
// TODO: We should either ignore the message or drop the connection
// if the message doesn't conform with the expected format.
unsigned
char
*
data
=
(
unsigned
char
*
)
zmq_msg_data
(
msg_
);
zmq_assert
(
*
data
<=
zmq_msg_size
(
msg_
)
-
1
);
// Check whether the message matches at least one subscription.
std
::
string
topic
((
const
char
*
)
(
data
+
1
),
*
data
);
subscriptions_t
::
iterator
it
=
subscriptions
.
find
(
topic
);
if
(
it
!=
subscriptions
.
end
())
break
;
}
return
0
;
}
src/sub.hpp
0 → 100644
View file @
95944551
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_SUB_INCLUDED__
#define __ZMQ_SUB_INCLUDED__
#include <set>
#include <string>
#include "socket_base.hpp"
namespace
zmq
{
class
sub_t
:
public
socket_base_t
{
public
:
sub_t
(
class
app_thread_t
*
parent_
);
~
sub_t
();
// Overloads of API functions from socket_base_t.
int
setsockopt
(
int
option_
,
const
void
*
optval_
,
size_t
optvallen_
);
int
recv
(
struct
zmq_msg_t
*
msg_
,
int
flags_
);
private
:
// List of all the active subscriptions.
typedef
std
::
multiset
<
std
::
string
>
subscriptions_t
;
subscriptions_t
subscriptions
;
};
}
#endif
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment