Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
969522bb
Commit
969522bb
authored
Sep 16, 2009
by
malosek
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
added OpenPGM receiver - ZMQ_SUB
parent
0381a78c
Hide whitespace changes
Inline
Side-by-side
Showing
11 changed files
with
395 additions
and
25 deletions
+395
-25
zmq.h
c/zmq.h
+1
-0
Makefile.am
src/Makefile.am
+2
-0
options.cpp
src/options.cpp
+2
-1
options.hpp
src/options.hpp
+5
-2
pgm_receiver.cpp
src/pgm_receiver.cpp
+202
-0
pgm_receiver.hpp
src/pgm_receiver.hpp
+98
-0
pgm_sender.cpp
src/pgm_sender.cpp
+1
-6
pgm_sender.hpp
src/pgm_sender.hpp
+1
-1
pgm_socket.cpp
src/pgm_socket.cpp
+10
-7
socket_base.cpp
src/socket_base.cpp
+67
-8
sub.cpp
src/sub.cpp
+6
-0
No files found.
c/zmq.h
View file @
969522bb
...
...
@@ -53,6 +53,7 @@ extern "C" {
#define ZMQ_UNSUBSCRIBE 7 // string
#define ZMQ_RATE 8 // int64_t
#define ZMQ_RECOVERY_IVL 9 // int64_t
#define ZMQ_MCAST_LOOP 10 // boolean
// The operation should be performed in non-blocking mode. I.e. if it cannot
// be processed immediately, error should be returned with errno set to EAGAIN.
...
...
src/Makefile.am
View file @
969522bb
...
...
@@ -62,6 +62,7 @@ libzmq_la_SOURCES = $(pgm_sources) \
object.hpp
\
options.hpp
\
owned.hpp
\
pgm_receiver.hpp
\
pgm_sender.hpp
\
pgm_socket.hpp
\
pipe.hpp
\
...
...
@@ -104,6 +105,7 @@ libzmq_la_SOURCES = $(pgm_sources) \
object.cpp
\
options.cpp
\
owned.cpp
\
pgm_receiver.cpp
\
pgm_sender.cpp
\
pgm_socket.cpp
\
pipe.cpp
\
...
...
src/options.cpp
View file @
969522bb
...
...
@@ -25,6 +25,7 @@ zmq::options_t::options_t () :
swap
(
0
),
affinity
(
0
),
rate
(
100
),
recovery_ivl
(
10
)
recovery_ivl
(
10
),
use_multicast_loop
(
false
)
{
}
src/options.hpp
View file @
969522bb
...
...
@@ -37,11 +37,14 @@ namespace zmq
uint64_t
affinity
;
std
::
string
identity
;
// Maximum tranfer rate [kb/s].
// Maximum tranfer rate [kb/s].
Default 100kb/s.
uint32_t
rate
;
// Reliability time interval [s].
// Reliability time interval [s].
Default 10s.
uint32_t
recovery_ivl
;
// Enable multicast loopback. Default disabled (false).
bool
use_multicast_loop
;
};
}
...
...
src/pgm_receiver.cpp
0 → 100644
View file @
969522bb
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "platform.hpp"
#if defined ZMQ_HAVE_OPENPGM
#include <iostream>
#include "pgm_receiver.hpp"
#include "err.hpp"
#include "stdint.hpp"
#include "wire.hpp"
#include "i_inout.hpp"
//#define PGM_RECEIVER_DEBUG
//#define PGM_RECEIVER_DEBUG_LEVEL 1
// level 1 = key behaviour
// level 2 = processing flow
// level 4 = infos
#ifndef PGM_RECEIVER_DEBUG
# define zmq_log(n, ...) while (0)
#else
# define zmq_log(n, ...) do { if ((n) <= PGM_RECEIVER_DEBUG_LEVEL) \
{ printf (__VA_ARGS__);}} while (0)
#endif
zmq
::
pgm_receiver_t
::
pgm_receiver_t
(
class
io_thread_t
*
parent_
,
const
options_t
&
options_
,
const
char
*
session_name_
)
:
io_object_t
(
parent_
),
pgm_socket
(
true
,
options_
),
options
(
options_
),
session_name
(
session_name_
),
joined
(
false
),
inout
(
NULL
)
{
}
zmq
::
pgm_receiver_t
::~
pgm_receiver_t
()
{
}
int
zmq
::
pgm_receiver_t
::
init
(
const
char
*
network_
)
{
return
pgm_socket
.
init
(
network_
);
}
void
zmq
::
pgm_receiver_t
::
plug
(
i_inout
*
inout_
)
{
// Allocate 2 fds one for socket second for waiting pipe.
int
socket_fd
;
int
waiting_pipe_fd
;
decoder
.
set_inout
(
inout_
);
// Fill socket_fd and waiting_pipe_fd from PGM transport
pgm_socket
.
get_receiver_fds
(
&
socket_fd
,
&
waiting_pipe_fd
);
// Add socket_fd into poller.
socket_handle
=
add_fd
(
socket_fd
);
// Add waiting_pipe_fd into poller.
pipe_handle
=
add_fd
(
waiting_pipe_fd
);
// Set POLLIN for both handlers.
set_pollin
(
pipe_handle
);
set_pollin
(
socket_handle
);
inout
=
inout_
;
}
void
zmq
::
pgm_receiver_t
::
unplug
()
{
rm_fd
(
socket_handle
);
rm_fd
(
pipe_handle
);
decoder
.
set_inout
(
NULL
);
inout
=
NULL
;
}
void
zmq
::
pgm_receiver_t
::
revive
()
{
zmq_assert
(
false
);
}
void
zmq
::
pgm_receiver_t
::
reconnect
()
{
// Save inout ptr.
i_inout
*
inout_tmp
=
inout
;
// Unplug - plug PGM transport.
unplug
();
decoder
.
reset
();
plug
(
inout_tmp
);
}
// POLLIN event from socket or waiting_pipe.
void
zmq
::
pgm_receiver_t
::
in_event
()
{
void
*
data_with_offset
;
ssize_t
nbytes
=
0
;
// Read all data from pgm socket.
while
((
nbytes
=
receive_with_offset
(
&
data_with_offset
))
>
0
)
{
// Push all the data to the decoder.
decoder
.
write
((
unsigned
char
*
)
data_with_offset
,
nbytes
);
}
// Flush any messages decoder may have produced to the dispatcher.
inout
->
flush
();
// Data loss detected.
if
(
nbytes
==
-
1
)
{
// Throw message in progress from decoder
decoder
.
reset
();
// PGM receive is not joined anymore.
joined
=
false
;
// Recreate PGM transport.
reconnect
();
}
}
void
zmq
::
pgm_receiver_t
::
out_event
()
{
zmq_assert
(
false
);
}
ssize_t
zmq
::
pgm_receiver_t
::
receive_with_offset
(
void
**
data_
)
{
// Data from PGM socket.
void
*
rd
=
NULL
;
unsigned
char
*
raw_data
=
NULL
;
// Read data from underlying pgm_socket.
ssize_t
nbytes
=
pgm_socket
.
receive
((
void
**
)
&
rd
);
raw_data
=
(
unsigned
char
*
)
rd
;
// No ODATA or RDATA.
if
(
!
nbytes
)
return
0
;
// Data loss.
if
(
nbytes
==
-
1
)
{
return
-
1
;
}
// Read offset of the fist message in current APDU.
uint16_t
apdu_offset
=
get_uint16
(
raw_data
);
// Shift raw_data & decrease nbytes by the first message offset
// information (sizeof uint16_t).
*
data_
=
raw_data
+
sizeof
(
uint16_t
);
nbytes
-=
sizeof
(
uint16_t
);
// There is not beginning of the message in current APDU and we
// are not joined jet -> throwing data.
if
(
apdu_offset
==
0xFFFF
&&
!
joined
)
{
*
data_
=
NULL
;
return
0
;
}
// Now is the possibility to join the stream.
if
(
!
joined
)
{
// We have to move data to the begining of the first message.
*
data_
=
(
unsigned
char
*
)
*
data_
+
apdu_offset
;
nbytes
-=
apdu_offset
;
// Joined the stream.
joined
=
true
;
zmq_log
(
2
,
"joined into the stream, %s(%i)
\n
"
,
__FILE__
,
__LINE__
);
}
return
nbytes
;
}
#endif
src/pgm_receiver.hpp
0 → 100644
View file @
969522bb
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_PGM_RECEIVER_HPP_INCLUDED__
#define __ZMQ_PGM_RECEIVER_HPP_INCLUDED__
#include "platform.hpp"
#if defined ZMQ_HAVE_OPENPGM
#include "io_object.hpp"
#include "i_engine.hpp"
#include "options.hpp"
#include "zmq_decoder.hpp"
#include "pgm_socket.hpp"
namespace
zmq
{
class
pgm_receiver_t
:
public
io_object_t
,
public
i_engine
{
public
:
// Creates gm_engine. Underlying PGM connection is initialised
// using network_ parameter.
pgm_receiver_t
(
class
io_thread_t
*
parent_
,
const
options_t
&
options_
,
const
char
*
session_name_
);
~
pgm_receiver_t
();
int
init
(
const
char
*
network_
);
void
reconnect
();
// i_engine interface implementation.
void
plug
(
struct
i_inout
*
inout_
);
void
unplug
();
void
revive
();
// i_poll_events interface implementation.
void
in_event
();
void
out_event
();
private
:
// Read exactly iov_len_ count APDUs, function returns number
// of bytes received. Note that if we did not join message stream
// before and there is not message beginning in the APDUs being
// received iov_len for such a APDUs will be 0.
ssize_t
receive_with_offset
(
void
**
data_
);
// Message decoder.
zmq_decoder_t
decoder
;
// PGM socket.
pgm_socket_t
pgm_socket
;
// Socket options.
options_t
options
;
// Name of the session associated with the connecter.
std
::
string
session_name
;
// If receiver joined the messages stream.
bool
joined
;
// Parent session.
i_inout
*
inout
;
// Poll handle associated with PGM socket.
handle_t
socket_handle
;
// Poll handle associated with engine PGM waiting pipe.
handle_t
pipe_handle
;
pgm_receiver_t
(
const
pgm_receiver_t
&
);
void
operator
=
(
const
pgm_receiver_t
&
);
};
}
#endif
#endif
src/pgm_sender.cpp
View file @
969522bb
...
...
@@ -90,11 +90,6 @@ void zmq::pgm_sender_t::plug (i_inout *inout_)
set_pollout
(
handle
);
inout
=
inout_
;
zmq_log
(
1
,
"plug: downlink_socket_fd %i, uplink_socket_fd %i, %s(%i)"
,
downlink_socket_fd
,
uplink_socket_fd
,
__FILE__
,
__LINE__
);
std
::
cout
<<
std
::
flush
;
}
void
zmq
::
pgm_sender_t
::
unplug
()
...
...
@@ -185,7 +180,7 @@ void zmq::pgm_sender_t::out_event ()
size_t
zmq
::
pgm_sender_t
::
write_one_pkt_with_offset
(
unsigned
char
*
data_
,
size_t
size_
,
uint16_t
offset_
)
{
zmq_log
(
1
,
"data_size %i, first message offset %i, %s(%i)"
,
zmq_log
(
1
,
"data_size %i, first message offset %i, %s(%i)
\n
"
,
(
int
)
size_
,
offset_
,
__FILE__
,
__LINE__
);
std
::
cout
<<
std
::
flush
;
...
...
src/pgm_sender.hpp
View file @
969522bb
...
...
@@ -77,7 +77,7 @@ namespace zmq
handle_t
handle
;
handle_t
uplink_handle
;
//
?
//
Parent session.
i_inout
*
inout
;
// Output buffer from pgm_socket.
...
...
src/pgm_socket.cpp
View file @
969522bb
...
...
@@ -386,12 +386,14 @@ int zmq::pgm_socket_t::open_transport (void)
return
-
1
;
}
}
// Enable multicast loopback.
rc
=
pgm_transport_set_multicast_loop
(
g_transport
,
true
);
if
(
rc
!=
0
)
{
errno
=
EINVAL
;
return
-
1
;
if
(
options
.
use_multicast_loop
)
{
rc
=
pgm_transport_set_multicast_loop
(
g_transport
,
true
);
if
(
rc
!=
0
)
{
errno
=
EINVAL
;
return
-
1
;
}
}
// Bind a transport to the specified network devices.
...
...
@@ -486,6 +488,7 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_)
// Send one APDU, transmit window owned memory.
size_t
zmq
::
pgm_socket_t
::
send
(
unsigned
char
*
data_
,
size_t
data_len_
)
{
iovec
iov
=
{
data_
,
data_len_
};
ssize_t
nbytes
=
pgm_transport_send_packetv
(
g_transport
,
&
iov
,
1
,
...
...
@@ -561,7 +564,6 @@ void zmq::pgm_socket_t::free_buffer (void *data_)
// returned.
ssize_t
zmq
::
pgm_socket_t
::
receive
(
void
**
raw_data_
)
{
// We just sent all data from pgm_transport_recvmsgv up
// and have to return 0 that another engine in this thread is scheduled.
if
(
nbytes_rec
==
nbytes_processed
&&
nbytes_rec
>
0
)
{
...
...
@@ -575,7 +577,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_)
}
// If we have are going first time or if we have processed all pgm_msgv_t
// structure previ
a
ously read from the pgm socket.
// structure previously read from the pgm socket.
if
(
nbytes_rec
==
nbytes_processed
)
{
// Check program flow.
...
...
@@ -615,6 +617,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_)
}
zmq_log
(
4
,
"received %i bytes
\n
"
,
(
int
)
nbytes_rec
);
}
zmq_assert
(
nbytes_rec
>
0
);
...
...
src/socket_base.cpp
View file @
969522bb
...
...
@@ -37,6 +37,7 @@
#include "err.hpp"
#include "platform.hpp"
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
zmq
::
socket_base_t
::
socket_base_t
(
app_thread_t
*
parent_
,
int
type_
)
:
object_t
(
parent_
),
...
...
@@ -156,6 +157,14 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
options
.
recovery_ivl
=
(
uint32_t
)
*
((
int64_t
*
)
optval_
);
return
0
;
case
ZMQ_MCAST_LOOP
:
if
(
optvallen_
!=
sizeof
(
bool
))
{
errno
=
EINVAL
;
return
-
1
;
}
options
.
use_multicast_loop
=
optval_
;
return
0
;
default
:
errno
=
EINVAL
;
return
-
1
;
...
...
@@ -164,15 +173,43 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
int
zmq
::
socket_base_t
::
bind
(
const
char
*
addr_
)
{
zmq_listener_t
*
listener
=
new
zmq_listener_t
(
choose_io_thread
(
options
.
affinity
),
this
,
options
);
int
rc
=
listener
->
set_address
(
addr_
);
if
(
rc
!=
0
)
// Parse addr_ string.
std
::
string
addr_type
;
std
::
string
addr_args
;
std
::
string
addr
(
addr_
);
std
::
string
::
size_type
pos
=
addr
.
find
(
"://"
);
if
(
pos
==
std
::
string
::
npos
)
{
errno
=
EINVAL
;
return
-
1
;
}
send_plug
(
listener
);
send_own
(
this
,
listener
);
return
0
;
addr_type
=
addr
.
substr
(
0
,
pos
);
addr_args
=
addr
.
substr
(
pos
+
3
);
if
(
addr_type
==
"tcp"
)
{
zmq_listener_t
*
listener
=
new
zmq_listener_t
(
choose_io_thread
(
options
.
affinity
),
this
,
options
);
int
rc
=
listener
->
set_address
(
addr_args
.
c_str
());
if
(
rc
!=
0
)
return
-
1
;
send_plug
(
listener
);
send_own
(
this
,
listener
);
return
0
;
}
#if defined ZMQ_HAVE_OPENPGM
if
(
addr_type
==
"pgm"
)
{
// In the case of PGM bind behaves the same like connect.
return
connect
(
addr_
);
}
#endif
// Unknown address type.
errno
=
EFAULT
;
return
-
1
;
}
int
zmq
::
socket_base_t
::
connect
(
const
char
*
addr_
)
...
...
@@ -246,6 +283,8 @@ int zmq::socket_base_t::connect (const char *addr_)
if
(
addr_type
==
"pgm"
)
{
switch
(
type
)
{
// PGM sender.
case
ZMQ_PUB
:
{
pgm_sender_t
*
pgm_sender
=
...
...
@@ -266,9 +305,29 @@ int zmq::socket_base_t::connect (const char *addr_)
break
;
}
// PGM receiver.
case
ZMQ_SUB
:
zmq_assert
(
false
);
{
pgm_receiver_t
*
pgm_receiver
=
new
pgm_receiver_t
(
choose_io_thread
(
options
.
affinity
),
options
,
session_name
.
c_str
());
int
rc
=
pgm_receiver
->
init
(
addr_args
.
c_str
());
if
(
rc
!=
0
)
{
delete
pgm_receiver
;
return
-
1
;
}
// Reserve a sequence number for following 'attach' command.
session
->
inc_seqnum
();
send_attach
(
session
,
pgm_receiver
);
pgm_receiver
=
NULL
;
break
;
}
default
:
errno
=
EINVAL
;
return
-
1
;
...
...
src/sub.cpp
View file @
969522bb
...
...
@@ -101,6 +101,12 @@ int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_)
if
(
rc
!=
0
&&
errno
==
EAGAIN
)
return
-
1
;
// If there is no subscription return -1/EAGAIN.
if
(
!
all_count
&&
prefixes
.
empty
()
&&
topics
.
empty
())
{
errno
=
EAGAIN
;
return
-
1
;
}
// If there is at least one "*" subscription, the message matches.
if
(
all_count
)
return
0
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment