Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
cf6cc012
Commit
cf6cc012
authored
Sep 25, 2009
by
malosek
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
pgm2 receiver working (partly)
parent
72c5c5ff
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
129 additions
and
53 deletions
+129
-53
Makefile.am
src/Makefile.am
+2
-1
pgm_socket.cpp
src/pgm_socket.cpp
+112
-49
pgm_socket.hpp
src/pgm_socket.hpp
+13
-1
socket_base.cpp
src/socket_base.cpp
+2
-2
No files found.
src/Makefile.am
View file @
cf6cc012
...
...
@@ -260,7 +260,8 @@ libzmq_la_CFLAGS = -I$(top_srcdir)/foreign/openpgm/@pgm_basename@/openpgm/pgm/in
-DCONFIG_HAVE_GETPROTOBYNAME_R
\
-DCONFIG_BIND_INADDR_ANY
\
-DCONFIG_GALOIS_MUL_LUT
\
-DGETTEXT_PACKAGE
=
\'
"pgm"
\'
-DGETTEXT_PACKAGE
=
'"pgm"'
\
-DG_LOG_DOMAIN
=
'"Pgm"'
endif
if
BUILD_NO_PGM
...
...
src/pgm_socket.cpp
View file @
cf6cc012
...
...
@@ -119,6 +119,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
if
(
receiver
)
{
pgm_msgv_len
=
get_max_apdu_at_once
(
in_batch_size
);
pgm_msgv
=
new
pgm_msgv_t
[
pgm_msgv_len
];
zmq_log
(
1
,
"PGM transport: pgm_msgv_len %i, %s(%i)
\n
"
,
(
int
)
pgm_msgv_len
,
__FILE__
,
__LINE__
);
}
return
0
;
...
...
@@ -139,6 +141,12 @@ int zmq::pgm_socket_t::open_transport (void)
nbytes_processed
=
0
;
pgm_msgv_processed
=
0
;
#ifdef ZMQ_HAVE_OPENPGM1
int
pgm_ok
=
0
;
#elif defined ZMQ_HAVE_OPENPGM2
int
pgm_ok
=
true
;
#endif
// Init PGM transport.
// Ensure threading enabled, ensure timer enabled and find PGM protocol id.
//
...
...
@@ -170,8 +178,8 @@ int zmq::pgm_socket_t::open_transport (void)
return
-
1
;
}
zmq_log
(
1
,
"Transport GSI: %s, %s(%i)
\n
"
,
pgm_print_gsi
(
&
gsi
),
__FILE__
,
__LINE__
);
//
zmq_log (1, "Transport GSI: %s, %s(%i)\n", pgm_print_gsi (&gsi),
//
__FILE__, __LINE__);
#ifdef ZMQ_HAVE_OPENPGM1
// PGM transport GSRs.
...
...
@@ -192,18 +200,17 @@ int zmq::pgm_socket_t::open_transport (void)
errno
=
ENOMEM
;
return
-
1
;
}
#endif
#elif defined ZMQ_HAVE_OPENPGM2
struct
pgm_transport_info_t
*
res
=
NULL
;
GError
*
pgm_error
=
NULL
;
#ifdef ZMQ_HAVE_OPENPGM2
struct
pgm_transport_info_t
*
res
=
NULL
;
if
(
!
pgm_if_get_transport_info
(
network
,
NULL
,
&
res
,
NULL
))
{
if
(
!
pgm_if_get_transport_info
(
network
,
NULL
,
&
res
,
&
pgm_error
))
{
errno
=
EINVAL
;
return
-
1
;
}
res
->
ti_gsi
=
gsi
;
res
->
ti_dport
=
port_number
;
#endif
// If we are using UDP encapsulation update gsr or res.
...
...
@@ -214,9 +221,7 @@ int zmq::pgm_socket_t::open_transport (void)
g_htons
(
port_number
);
((
struct
sockaddr_in
*
)
&
recv_gsr
.
gsr_group
)
->
sin_port
=
g_htons
(
port_number
);
#endif
#ifdef ZMQ_HAVE_OPENPGM2
#elif defined ZMQ_HAVE_OPENPGM2
res
->
ti_udp_encap_ucast_port
=
port_number
;
res
->
ti_udp_encap_mcast_port
=
port_number
;
#endif
...
...
@@ -228,10 +233,8 @@ int zmq::pgm_socket_t::open_transport (void)
if
(
rc
!=
0
)
{
return
-
1
;
}
#endif
#ifdef ZMQ_HAVE_OPENPGM2
if
(
!
pgm_transport_create
(
&
g_transport
,
res
,
NULL
))
{
#elif defined ZMQ_HAVE_OPENPGM2
if
(
!
pgm_transport_create
(
&
g_transport
,
res
,
&
pgm_error
))
{
pgm_if_free_transport_info
(
res
);
// TODO: tranlate errors from glib into errnos.
errno
=
EINVAL
;
...
...
@@ -241,78 +244,88 @@ int zmq::pgm_socket_t::open_transport (void)
pgm_if_free_transport_info
(
res
);
#endif
zmq_log
(
1
,
"PGM transport created, %s(%i)
\n
"
,
__FILE__
,
__LINE__
);
// Common parameters for receiver and sender.
// Set maximum transport protocol data unit size (TPDU).
rc
=
pgm_transport_set_max_tpdu
(
g_transport
,
pgm_max_tpdu
);
if
(
rc
!=
0
)
{
if
(
rc
!=
pgm_ok
)
{
errno
=
EINVAL
;
return
-
1
;
}
// Set maximum number of network hops to cross.
rc
=
pgm_transport_set_hops
(
g_transport
,
16
);
if
(
rc
!=
0
)
{
if
(
rc
!=
pgm_ok
)
{
errno
=
EINVAL
;
return
-
1
;
}
#ifdef ZMQ_HAVE_OPENPGM2
// Set nonblocking send/recv sockets.
if
(
!
pgm_transport_set_nonblocking
(
g_transport
,
true
))
{
errno
=
EINVAL
;
return
-
1
;
}
#endif
// Receiver transport.
if
(
receiver
)
{
// Set transport->can_send_data = FALSE.
// Note that NAKs are still generated by the transport.
rc
=
pgm_transport_set_recv_only
(
g_transport
,
false
);
if
(
rc
!=
0
)
{
if
(
rc
!=
pgm_ok
)
{
errno
=
EINVAL
;
return
-
1
;
}
// Set NAK transmit back-off interval [us].
rc
=
pgm_transport_set_nak_bo_ivl
(
g_transport
,
50
*
1000
);
if
(
rc
!=
0
)
{
if
(
rc
!=
pgm_ok
)
{
errno
=
EINVAL
;
return
-
1
;
}
// Set timeout before repeating NAK [us].
rc
=
pgm_transport_set_nak_rpt_ivl
(
g_transport
,
200
*
1000
);
if
(
rc
!=
0
)
{
if
(
rc
!=
pgm_ok
)
{
errno
=
EINVAL
;
return
-
1
;
}
// Set timeout for receiving RDATA.
rc
=
pgm_transport_set_nak_rdata_ivl
(
g_transport
,
200
*
1000
);
if
(
rc
!=
0
)
{
if
(
rc
!=
pgm_ok
)
{
errno
=
EINVAL
;
return
-
1
;
}
// Set retries for NAK without NCF/DATA (NAK_DATA_RETRIES).
rc
=
pgm_transport_set_nak_data_retries
(
g_transport
,
5
);
if
(
rc
!=
0
)
{
if
(
rc
!=
pgm_ok
)
{
errno
=
EINVAL
;
return
-
1
;
}
// Set retries for NCF after NAK (NAK_NCF_RETRIES).
rc
=
pgm_transport_set_nak_ncf_retries
(
g_transport
,
2
);
if
(
rc
!=
0
)
{
if
(
rc
!=
pgm_ok
)
{
errno
=
EINVAL
;
return
-
1
;
}
// Set timeout for removing a dead peer [us].
rc
=
pgm_transport_set_peer_expiry
(
g_transport
,
5
*
8192
*
1000
);
if
(
rc
!=
0
)
{
if
(
rc
!=
pgm_ok
)
{
errno
=
EINVAL
;
return
-
1
;
}
// Set expiration time of SPM Requests [us].
rc
=
pgm_transport_set_spmr_expiry
(
g_transport
,
25
*
1000
);
if
(
rc
!=
0
)
{
if
(
rc
!=
pgm_ok
)
{
errno
=
EINVAL
;
return
-
1
;
}
...
...
@@ -327,7 +340,7 @@ int zmq::pgm_socket_t::open_transport (void)
rc
=
pgm_transport_set_rxw_max_rte
(
g_transport
,
options
.
rate
*
1000
/
8
);
if
(
rc
!=
0
)
{
if
(
rc
!=
pgm_ok
)
{
errno
=
EINVAL
;
return
-
1
;
}
...
...
@@ -339,7 +352,7 @@ int zmq::pgm_socket_t::open_transport (void)
}
rc
=
pgm_transport_set_rxw_secs
(
g_transport
,
options
.
recovery_ivl
);
if
(
rc
!=
0
)
{
if
(
rc
!=
pgm_ok
)
{
errno
=
EINVAL
;
return
-
1
;
}
...
...
@@ -349,7 +362,7 @@ int zmq::pgm_socket_t::open_transport (void)
// Set transport->can_recv = FALSE, waiting_pipe wont not be read.
rc
=
pgm_transport_set_send_only
(
g_transport
,
TRUE
);
if
(
rc
!=
0
)
{
if
(
rc
!=
pgm_ok
)
{
errno
=
EINVAL
;
return
-
1
;
}
...
...
@@ -364,7 +377,7 @@ int zmq::pgm_socket_t::open_transport (void)
rc
=
pgm_transport_set_txw_max_rte
(
g_transport
,
options
.
rate
*
1000
/
8
);
if
(
rc
!=
0
)
{
if
(
rc
!=
pgm_ok
)
{
errno
=
EINVAL
;
return
-
1
;
}
...
...
@@ -376,7 +389,7 @@ int zmq::pgm_socket_t::open_transport (void)
}
rc
=
pgm_transport_set_txw_secs
(
g_transport
,
options
.
recovery_ivl
);
if
(
rc
!=
0
)
{
if
(
rc
!=
pgm_ok
)
{
errno
=
EINVAL
;
return
-
1
;
}
...
...
@@ -400,7 +413,7 @@ int zmq::pgm_socket_t::open_transport (void)
// Set interval of background SPM packets [us].
rc
=
pgm_transport_set_ambient_spm
(
g_transport
,
8192
*
1000
);
if
(
rc
!=
0
)
{
if
(
rc
!=
pgm_ok
)
{
errno
=
EINVAL
;
return
-
1
;
}
...
...
@@ -412,7 +425,7 @@ int zmq::pgm_socket_t::open_transport (void)
rc
=
pgm_transport_set_heartbeat_spm
(
g_transport
,
spm_heartbeat
,
G_N_ELEMENTS
(
spm_heartbeat
));
if
(
rc
!=
0
)
{
if
(
rc
!=
pgm_ok
)
{
errno
=
EINVAL
;
return
-
1
;
}
...
...
@@ -421,7 +434,7 @@ int zmq::pgm_socket_t::open_transport (void)
// Enable multicast loopback.
if
(
options
.
use_multicast_loop
)
{
rc
=
pgm_transport_set_multicast_loop
(
g_transport
,
true
);
if
(
rc
!=
0
)
{
if
(
rc
!=
pgm_ok
)
{
errno
=
EINVAL
;
return
-
1
;
}
...
...
@@ -433,15 +446,15 @@ int zmq::pgm_socket_t::open_transport (void)
if
(
rc
!=
0
)
{
return
-
1
;
}
#endif
#ifdef ZMQ_HAVE_OPENPGM2
if
(
!
pgm_transport_bind
(
g_transport
,
NULL
))
{
#elif defined ZMQ_HAVE_OPENPGM2
if
(
!
pgm_transport_bind
(
g_transport
,
&
pgm_error
))
{
// TODO: tranlate errors from glib into errnos.
return
-
1
;
}
#endif
zmq_log
(
1
,
"PGM transport binded, %s(%i)
\n
"
,
__FILE__
,
__LINE__
);
return
0
;
}
...
...
@@ -585,26 +598,34 @@ size_t zmq::pgm_socket_t::get_max_apdu_at_once (size_t readbuf_size_)
return
apdu_count
;
}
#ifdef ZMQ_HAVE_OPENPGM1
// Allocate buffer for one packet from the transmit window, The memory buffer
// is owned by the transmit window and so must be returned to the window with
// content via pgm_transport_send() calls or unused with pgm_packetv_free1().
void
*
zmq
::
pgm_socket_t
::
get_buffer
(
size_t
*
size_
)
{
#ifdef ZMQ_HAVE_OPENPGM1
// Store size.
*
size_
=
get_max_tsdu_size
();
// Allocate one packet.
return
pgm_packetv_alloc
(
g_transport
,
false
);
#elif ZMQ_HAVE_OPENPGM2
zmq_assert
(
false
);
#endif
}
// Return an unused packet allocated from the transmit window
// via pgm_packetv_alloc().
void
zmq
::
pgm_socket_t
::
free_buffer
(
void
*
data_
)
{
#ifdef ZMQ_HAVE_OPENPGM1
pgm_packetv_free1
(
g_transport
,
data_
,
false
);
}
#elif ZMQ_HAVE_OPENPGM2
zmq_assert
(
false
);
#endif
}
// pgm_transport_recvmsgv is called to fill the pgm_msgv array up to
// pgm_msgv_len. In subsequent calls data from pgm_msgv structure are
...
...
@@ -614,7 +635,6 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
size_t
raw_data_len
=
0
;
#ifdef ZMQ_HAVE_OPENPGM1
// We just sent all data from pgm_transport_recvmsgv up
// and have to return 0 that another engine in this thread is scheduled.
if
(
nbytes_rec
==
nbytes_processed
&&
nbytes_rec
>
0
)
{
...
...
@@ -638,9 +658,10 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
// Receive a vector of Application Protocol Domain Unit's (APDUs)
// from the transport.
#ifdef ZMQ_HAVE_OPENPGM1
nbytes_rec
=
pgm_transport_recvmsgv
(
g_transport
,
pgm_msgv
,
pgm_msgv_len
,
MSG_DONTWAIT
);
// In a case when no ODATA/RDATA fired POLLIN event (SPM...)
// pgm_transport_recvmsg returns -1 with errno == EAGAIN.
if
(
nbytes_rec
==
-
1
&&
errno
==
EAGAIN
)
{
...
...
@@ -666,17 +687,49 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
// Catch the rest of the errors.
if
(
nbytes_rec
<=
0
)
{
zmq_log
(
2
,
"received %i B, errno %i, %s(%i)"
,
(
int
)
nbytes_rec
,
zmq_log
(
2
,
"received %i B, errno %i, %s(%i)
.
\n
"
,
(
int
)
nbytes_rec
,
errno
,
__FILE__
,
__LINE__
);
errno_assert
(
nbytes_rec
>
0
);
}
#elif defined ZMQ_HAVE_OPENPGM2
GError
*
pgm_error
=
NULL
;
const
PGMIOStatus
status
=
pgm_recvmsgv
(
g_transport
,
pgm_msgv
,
pgm_msgv_len
,
MSG_DONTWAIT
,
&
nbytes_rec
,
&
pgm_error
);
if
(
nbytes_rec
>
0
)
{
zmq_log
(
1
,
"PGMIOStatus %i, nbytes_rec %i, %s(%i).
\n
"
,
status
,
(
int
)
nbytes_rec
,
__FILE__
,
__LINE__
);
}
// In a case when no ODATA/RDATA fired POLLIN event (SPM...)
// pgm_recvmsg returns ?.
if
(
status
==
PGM_IO_STATUS_AGAIN
||
status
==
PGM_IO_STATUS_AGAIN2
)
{
// In case if no RDATA/ODATA caused POLLIN 0 is
// returned.
nbytes_rec
=
0
;
return
0
;
}
// Data loss?
if
(
status
!=
PGM_IO_STATUS_NORMAL
)
{
zmq_log
(
1
,
"PGMIOStatus %i, nbytes_rec %i, %s(%i).
\n
"
,
status
,
(
int
)
nbytes_rec
,
__FILE__
,
__LINE__
);
nbytes_rec
=
0
;
return
-
1
;
}
#endif
zmq_log
(
4
,
"received %i bytes
\n
"
,
(
int
)
nbytes_rec
);
}
zmq_assert
(
nbytes_rec
>
0
);
#ifdef ZMQ_HAVE_OPENPGM1
// Only one APDU per pgm_msgv_t structure is allowed.
zmq_assert
(
pgm_msgv
[
pgm_msgv_processed
].
msgv_iovlen
==
1
);
...
...
@@ -686,13 +739,25 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
// Save current TSI.
*
tsi_
=
pgm_msgv
[
pgm_msgv_processed
].
msgv_tsi
;
#elif defined ZMQ_HAVE_OPENPGM2
// Only one APDU per pgm_msgv_t structure is allowed.
zmq_assert
(
pgm_msgv
[
pgm_msgv_processed
].
msgv_len
==
1
);
struct
pgm_sk_buff_t
*
skb
=
pgm_msgv
[
pgm_msgv_processed
].
msgv_skb
[
0
];
// Take pointers from pgm_msgv_t structure.
*
raw_data_
=
skb
->
data
;
raw_data_len
=
skb
->
len
;
// Save current TSI.
*
tsi_
=
&
skb
->
tsi
;
#endif
// Move the the next pgm_msgv_t structure.
pgm_msgv_processed
++
;
nbytes_processed
+=
raw_data_len
;
#endif
zmq_log
(
4
,
"sendig up %i bytes
\n
"
,
(
int
)
raw_data_len
);
return
raw_data_len
;
...
...
@@ -711,9 +776,7 @@ void zmq::pgm_socket_t::process_upstream (void)
dummy_bytes
=
pgm_transport_recvmsgv
(
g_transport
,
&
dummy_msg
,
1
,
MSG_DONTWAIT
);
#endif
#ifdef ZMQ_HAVE_OPENPGM2
#elif defined ZMQ_HAVE_OPENPGM2
zmq_assert
(
false
);
#endif
...
...
src/pgm_socket.hpp
View file @
cf6cc012
...
...
@@ -119,16 +119,28 @@ namespace zmq
pgm_msgv_t
*
pgm_msgv
;
// How many bytes were read from pgm socket.
#ifdef ZMQ_HAVE_OPENPGM1
ssize_t
nbytes_rec
;
#elif defined ZMQ_HAVE_OPENPGM2
size_t
nbytes_rec
;
#endif
// How many bytes were processed from last pgm socket read.
#ifdef ZMQ_HAVE_OPENPGM1
ssize_t
nbytes_processed
;
#elif defined ZMQ_HAVE_OPENPGM2
size_t
nbytes_processed
;
#endif
// How many messages from pgm_msgv were already sent up.
#ifdef ZMQ_HAVE_OPENPGM1
ssize_t
pgm_msgv_processed
;
#elif defined ZMQ_HAVE_OPENPGM2
size_t
pgm_msgv_processed
;
#endif
// Size of pgm_msgv array.
s
s
ize_t
pgm_msgv_len
;
size_t
pgm_msgv_len
;
// Sender transport uses 2 fd.
enum
{
pgm_sender_fd_count
=
2
};
...
...
src/socket_base.cpp
View file @
cf6cc012
...
...
@@ -93,7 +93,7 @@ int zmq::socket_base_t::bind (const char *addr_)
return
0
;
}
#if defined ZMQ_HAVE_OPENPGM
1
#if defined ZMQ_HAVE_OPENPGM
if
(
addr_type
==
"pgm"
||
addr_type
==
"udp"
)
{
// In the case of PGM bind behaves the same like connect.
return
connect
(
addr_
);
...
...
@@ -179,7 +179,7 @@ int zmq::socket_base_t::connect (const char *addr_)
return
0
;
}
#if defined ZMQ_HAVE_OPENPGM
1
#if defined ZMQ_HAVE_OPENPGM
if
(
addr_type
==
"pgm"
||
addr_type
==
"udp"
)
{
// If the socket type requires bi-directional communication
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment