Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
3d8eb071
Commit
3d8eb071
authored
Oct 30, 2010
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Coding style fixed for pgm_socket
Signed-off-by:
Martin Sustrik
<
sustrik@250bpm.com
>
parent
b358df9f
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
132 additions
and
84 deletions
+132
-84
pgm_socket.cpp
src/pgm_socket.cpp
+130
-81
pgm_socket.hpp
src/pgm_socket.hpp
+2
-3
No files found.
src/pgm_socket.cpp
View file @
3d8eb071
...
...
@@ -107,59 +107,71 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
memset
(
&
hints
,
0
,
sizeof
(
hints
));
hints
.
ai_family
=
AF_UNSPEC
;
if
(
!
pgm_getaddrinfo
(
network
,
NULL
,
&
res
,
&
pgm_error
))
{
// Invalid parameters don't set pgm_error_t
// Invalid parameters don't set pgm_error_t.
zmq_assert
(
pgm_error
!=
NULL
);
if
(
pgm_error
->
domain
==
PGM_ERROR_DOMAIN_IF
&&
(
// NB: cannot catch EAI_BADFLAGS
// NB: cannot catch EAI_BADFLAGS.
pgm_error
->
code
!=
PGM_ERROR_SERVICE
&&
pgm_error
->
code
!=
PGM_ERROR_SOCKTNOSUPPORT
))
// User, host, or network configuration or transient error
// User, host, or network configuration or transient error.
goto
err_abort
;
// Fatal OpenPGM internal error
// Fatal OpenPGM internal error.
zmq_assert
(
false
);
}
zmq_assert
(
res
!=
NULL
);
// Pick up detected IP family
// Pick up detected IP family
.
sa_family
=
res
->
ai_send_addrs
[
0
].
gsr_group
.
ss_family
;
// Create IP/PGM or UDP/PGM socket
// Create IP/PGM or UDP/PGM socket
.
if
(
udp_encapsulation_
)
{
if
(
!
pgm_socket
(
&
sock
,
sa_family
,
SOCK_SEQPACKET
,
IPPROTO_UDP
,
&
pgm_error
))
{
// Invalid parameters don't set pgm_error_t
if
(
!
pgm_socket
(
&
sock
,
sa_family
,
SOCK_SEQPACKET
,
IPPROTO_UDP
,
&
pgm_error
))
{
// Invalid parameters don't set pgm_error_t.
zmq_assert
(
pgm_error
!=
NULL
);
if
(
pgm_error
->
domain
==
PGM_ERROR_DOMAIN_SOCKET
&&
(
pgm_error
->
code
!=
PGM_ERROR_BADF
&&
pgm_error
->
code
!=
PGM_ERROR_FAULT
&&
pgm_error
->
code
!=
PGM_ERROR_NOPROTOOPT
&&
pgm_error
->
code
!=
PGM_ERROR_FAILED
))
// User, host, or network configuration or transient error
// User, host, or network configuration or transient error.
goto
err_abort
;
// Fatal OpenPGM internal error
// Fatal OpenPGM internal error.
zmq_assert
(
false
);
}
// All options are of data type int
const
int
encapsulation_port
=
port_number
;
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_UDP_ENCAP_UCAST_PORT
,
&
encapsulation_port
,
sizeof
(
encapsulation_port
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_UDP_ENCAP_MCAST_PORT
,
&
encapsulation_port
,
sizeof
(
encapsulation_port
)))
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_UDP_ENCAP_UCAST_PORT
,
&
encapsulation_port
,
sizeof
(
encapsulation_port
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_UDP_ENCAP_MCAST_PORT
,
&
encapsulation_port
,
sizeof
(
encapsulation_port
)))
goto
err_abort
;
}
else
{
if
(
!
pgm_socket
(
&
sock
,
sa_family
,
SOCK_SEQPACKET
,
IPPROTO_PGM
,
&
pgm_error
))
{
// Invalid parameters don't set pgm_error_t
}
else
{
if
(
!
pgm_socket
(
&
sock
,
sa_family
,
SOCK_SEQPACKET
,
IPPROTO_PGM
,
&
pgm_error
))
{
// Invalid parameters don't set pgm_error_t.
zmq_assert
(
pgm_error
!=
NULL
);
if
(
pgm_error
->
domain
==
PGM_ERROR_DOMAIN_SOCKET
&&
(
pgm_error
->
code
!=
PGM_ERROR_BADF
&&
pgm_error
->
code
!=
PGM_ERROR_FAULT
&&
pgm_error
->
code
!=
PGM_ERROR_NOPROTOOPT
&&
pgm_error
->
code
!=
PGM_ERROR_FAILED
))
// User, host, or network configuration or transient error
// User, host, or network configuration or transient error.
goto
err_abort
;
// Fatal OpenPGM internal error
// Fatal OpenPGM internal error.
zmq_assert
(
false
);
}
}
...
...
@@ -169,16 +181,19 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
sndbuf
=
(
int
)
options
.
sndbuf
,
max_tpdu
=
(
int
)
pgm_max_tpdu
;
if
(
rcvbuf
)
{
if
(
!
pgm_setsockopt
(
sock
,
SOL_SOCKET
,
SO_RCVBUF
,
&
rcvbuf
,
sizeof
(
rcvbuf
)))
goto
err_abort
;
if
(
!
pgm_setsockopt
(
sock
,
SOL_SOCKET
,
SO_RCVBUF
,
&
rcvbuf
,
sizeof
(
rcvbuf
)))
goto
err_abort
;
}
if
(
sndbuf
)
{
if
(
!
pgm_setsockopt
(
sock
,
SOL_SOCKET
,
SO_SNDBUF
,
&
sndbuf
,
sizeof
(
sndbuf
)))
goto
err_abort
;
if
(
!
pgm_setsockopt
(
sock
,
SOL_SOCKET
,
SO_SNDBUF
,
&
sndbuf
,
sizeof
(
sndbuf
)))
goto
err_abort
;
}
// Set maximum transport protocol data unit size (TPDU).
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_MTU
,
&
max_tpdu
,
sizeof
(
max_tpdu
)))
// Set maximum transport protocol data unit size (TPDU).
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_MTU
,
&
max_tpdu
,
sizeof
(
max_tpdu
)))
goto
err_abort
;
}
...
...
@@ -194,16 +209,26 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
nak_data_retries
=
50
,
nak_ncf_retries
=
50
;
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_RECV_ONLY
,
&
recv_only
,
sizeof
(
recv_only
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_RXW_MAX_RTE
,
&
rxw_max_rte
,
sizeof
(
rxw_max_rte
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_RXW_SECS
,
&
rxw_secs
,
sizeof
(
rxw_secs
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_PEER_EXPIRY
,
&
peer_expiry
,
sizeof
(
peer_expiry
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_SPMR_EXPIRY
,
&
spmr_expiry
,
sizeof
(
spmr_expiry
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_NAK_BO_IVL
,
&
nak_bo_ivl
,
sizeof
(
nak_bo_ivl
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_NAK_RPT_IVL
,
&
nak_rpt_ivl
,
sizeof
(
nak_rpt_ivl
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_NAK_RDATA_IVL
,
&
nak_rdata_ivl
,
sizeof
(
nak_rdata_ivl
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_NAK_DATA_RETRIES
,
&
nak_data_retries
,
sizeof
(
nak_data_retries
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_NAK_NCF_RETRIES
,
&
nak_ncf_retries
,
sizeof
(
nak_ncf_retries
)))
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_RECV_ONLY
,
&
recv_only
,
sizeof
(
recv_only
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_RXW_MAX_RTE
,
&
rxw_max_rte
,
sizeof
(
rxw_max_rte
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_RXW_SECS
,
&
rxw_secs
,
sizeof
(
rxw_secs
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_PEER_EXPIRY
,
&
peer_expiry
,
sizeof
(
peer_expiry
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_SPMR_EXPIRY
,
&
spmr_expiry
,
sizeof
(
spmr_expiry
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_NAK_BO_IVL
,
&
nak_bo_ivl
,
sizeof
(
nak_bo_ivl
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_NAK_RPT_IVL
,
&
nak_rpt_ivl
,
sizeof
(
nak_rpt_ivl
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_NAK_RDATA_IVL
,
&
nak_rdata_ivl
,
sizeof
(
nak_rdata_ivl
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_NAK_DATA_RETRIES
,
&
nak_data_retries
,
sizeof
(
nak_data_retries
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_NAK_NCF_RETRIES
,
&
nak_ncf_retries
,
sizeof
(
nak_ncf_retries
)))
goto
err_abort
;
}
else
{
const
int
send_only
=
1
,
...
...
@@ -220,11 +245,16 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
pgm_secs
(
25
),
pgm_secs
(
30
)
};
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_SEND_ONLY
,
&
send_only
,
sizeof
(
send_only
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_TXW_MAX_RTE
,
&
txw_max_rte
,
sizeof
(
txw_max_rte
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_TXW_SECS
,
&
txw_secs
,
sizeof
(
txw_secs
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_AMBIENT_SPM
,
&
ambient_spm
,
sizeof
(
ambient_spm
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_HEARTBEAT_SPM
,
&
heartbeat_spm
,
sizeof
(
heartbeat_spm
)))
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_SEND_ONLY
,
&
send_only
,
sizeof
(
send_only
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_TXW_MAX_RTE
,
&
txw_max_rte
,
sizeof
(
txw_max_rte
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_TXW_SECS
,
&
txw_secs
,
sizeof
(
txw_secs
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_AMBIENT_SPM
,
&
ambient_spm
,
sizeof
(
ambient_spm
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_HEARTBEAT_SPM
,
&
heartbeat_spm
,
sizeof
(
heartbeat_spm
)))
goto
err_abort
;
}
...
...
@@ -238,13 +268,15 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
if
(
options
.
identity
.
size
()
>
0
)
{
// Create gsi from identity.
if
(
!
pgm_gsi_create_from_data
(
&
addr
.
sa_addr
.
gsi
,
options
.
identity
.
data
(),
options
.
identity
.
size
()))
if
(
!
pgm_gsi_create_from_data
(
&
addr
.
sa_addr
.
gsi
,
options
.
identity
.
data
(),
options
.
identity
.
size
()))
goto
err_abort
;
}
else
{
// Generate random gsi.
std
::
string
gsi_base
=
uuid_t
().
to_string
();
if
(
!
pgm_gsi_create_from_string
(
&
addr
.
sa_addr
.
gsi
,
gsi_base
.
c_str
(),
-
1
))
if
(
!
pgm_gsi_create_from_string
(
&
addr
.
sa_addr
.
gsi
,
gsi_base
.
c_str
(),
-
1
))
goto
err_abort
;
}
...
...
@@ -258,53 +290,63 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
memcpy
(
&
sa6
,
&
res
->
ai_recv_addrs
[
0
].
gsr_group
,
sizeof
(
sa6
));
if_req
.
ir_scope_id
=
sa6
.
sin6_scope_id
;
}
if
(
!
pgm_bind3
(
sock
,
&
addr
,
sizeof
(
addr
),
&
if_req
,
sizeof
(
if_req
),
&
if_req
,
sizeof
(
if_req
),
&
pgm_error
))
{
// Invalid parameters don't set pgm_error_t
if
(
!
pgm_bind3
(
sock
,
&
addr
,
sizeof
(
addr
),
&
if_req
,
sizeof
(
if_req
),
&
if_req
,
sizeof
(
if_req
),
&
pgm_error
))
{
// Invalid parameters don't set pgm_error_t.
zmq_assert
(
pgm_error
!=
NULL
);
if
((
pgm_error
->
domain
==
PGM_ERROR_DOMAIN_SOCKET
||
pgm_error
->
domain
==
PGM_ERROR_DOMAIN_IF
)
&&
(
pgm_error
->
code
!=
PGM_ERROR_INVAL
&&
pgm_error
->
code
!=
PGM_ERROR_BADF
&&
pgm_error
->
code
!=
PGM_ERROR_FAULT
))
// User, host, or network configuration or transient error
// User, host, or network configuration or transient error.
goto
err_abort
;
// Fatal OpenPGM internal error
// Fatal OpenPGM internal error.
zmq_assert
(
false
);
}
// Join IP multicast groups
for
(
unsigned
i
=
0
;
i
<
res
->
ai_recv_addrs_len
;
i
++
)
{
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_JOIN_GROUP
,
&
res
->
ai_recv_addrs
[
i
],
sizeof
(
struct
group_req
)))
// Join IP multicast groups
.
for
(
unsigned
i
=
0
;
i
<
res
->
ai_recv_addrs_len
;
i
++
)
{
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_JOIN_GROUP
,
&
res
->
ai_recv_addrs
[
i
],
sizeof
(
struct
group_req
)))
goto
err_abort
;
}
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_SEND_GROUP
,
&
res
->
ai_send_addrs
[
0
],
sizeof
(
struct
group_req
)))
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_SEND_GROUP
,
&
res
->
ai_send_addrs
[
0
],
sizeof
(
struct
group_req
)))
goto
err_abort
;
pgm_freeaddrinfo
(
res
);
res
=
NULL
;
// Set IP level parameters
// Set IP level parameters
.
{
const
int
nonblocking
=
1
,
multicast_loop
=
options
.
use_multicast_loop
?
1
:
0
,
multicast_hops
=
16
,
dscp
=
0x2e
<<
2
;
/* Expedited Forwarding PHB for network elements, no ECN. */
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_MULTICAST_LOOP
,
&
multicast_loop
,
sizeof
(
multicast_loop
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_MULTICAST_HOPS
,
&
multicast_hops
,
sizeof
(
multicast_hops
)))
// Expedited Forwarding PHB for network elements, no ECN.
dscp
=
0x2e
<<
2
;
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_MULTICAST_LOOP
,
&
multicast_loop
,
sizeof
(
multicast_loop
))
||
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_MULTICAST_HOPS
,
&
multicast_hops
,
sizeof
(
multicast_hops
)))
goto
err_abort
;
if
(
AF_INET6
!=
sa_family
&&
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_TOS
,
&
dscp
,
sizeof
(
dscp
)))
if
(
AF_INET6
!=
sa_family
&&
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_TOS
,
&
dscp
,
sizeof
(
dscp
)))
goto
err_abort
;
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_NOBLOCK
,
&
nonblocking
,
sizeof
(
nonblocking
)))
if
(
!
pgm_setsockopt
(
sock
,
IPPROTO_PGM
,
PGM_NOBLOCK
,
&
nonblocking
,
sizeof
(
nonblocking
)))
goto
err_abort
;
}
// Connect PGM transport to start state machine.
if
(
!
pgm_connect
(
sock
,
&
pgm_error
))
{
// Invalid parameters don't set pgm_error_t
// Invalid parameters don't set pgm_error_t.
zmq_assert
(
pgm_error
!=
NULL
);
goto
err_abort
;
}
...
...
@@ -348,9 +390,8 @@ zmq::pgm_socket_t::~pgm_socket_t ()
pgm_close
(
sock
,
TRUE
);
}
// Get receiver fds. receive_fd_ is signaled for incoming
// packets, waiting_pipe_fd_ is signaled for state driven
// events and data.
// Get receiver fds. receive_fd_ is signaled for incoming packets,
// waiting_pipe_fd_ is signaled for state driven events and data.
void
zmq
::
pgm_socket_t
::
get_receiver_fds
(
int
*
receive_fd_
,
int
*
waiting_pipe_fd_
)
{
...
...
@@ -361,12 +402,14 @@ void zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_,
zmq_assert
(
waiting_pipe_fd_
);
socklen
=
sizeof
(
*
receive_fd_
);
rc
=
pgm_getsockopt
(
sock
,
IPPROTO_PGM
,
PGM_RECV_SOCK
,
receive_fd_
,
&
socklen
);
rc
=
pgm_getsockopt
(
sock
,
IPPROTO_PGM
,
PGM_RECV_SOCK
,
receive_fd_
,
&
socklen
);
zmq_assert
(
rc
);
zmq_assert
(
socklen
==
sizeof
(
*
receive_fd_
));
socklen
=
sizeof
(
*
waiting_pipe_fd_
);
rc
=
pgm_getsockopt
(
sock
,
IPPROTO_PGM
,
PGM_PENDING_SOCK
,
waiting_pipe_fd_
,
&
socklen
);
rc
=
pgm_getsockopt
(
sock
,
IPPROTO_PGM
,
PGM_PENDING_SOCK
,
waiting_pipe_fd_
,
&
socklen
);
zmq_assert
(
rc
);
zmq_assert
(
socklen
==
sizeof
(
*
waiting_pipe_fd_
));
}
...
...
@@ -393,17 +436,20 @@ void zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,
zmq_assert
(
socklen
==
sizeof
(
*
receive_fd_
));
socklen
=
sizeof
(
*
receive_fd_
);
rc
=
pgm_getsockopt
(
sock
,
IPPROTO_PGM
,
PGM_RECV_SOCK
,
receive_fd_
,
&
socklen
);
rc
=
pgm_getsockopt
(
sock
,
IPPROTO_PGM
,
PGM_RECV_SOCK
,
receive_fd_
,
&
socklen
);
zmq_assert
(
rc
);
zmq_assert
(
socklen
==
sizeof
(
*
receive_fd_
));
socklen
=
sizeof
(
*
rdata_notify_fd_
);
rc
=
pgm_getsockopt
(
sock
,
IPPROTO_PGM
,
PGM_REPAIR_SOCK
,
rdata_notify_fd_
,
&
socklen
);
rc
=
pgm_getsockopt
(
sock
,
IPPROTO_PGM
,
PGM_REPAIR_SOCK
,
rdata_notify_fd_
,
&
socklen
);
zmq_assert
(
rc
);
zmq_assert
(
socklen
==
sizeof
(
*
rdata_notify_fd_
));
socklen
=
sizeof
(
*
pending_notify_fd_
);
rc
=
pgm_getsockopt
(
sock
,
IPPROTO_PGM
,
PGM_PENDING_SOCK
,
pending_notify_fd_
,
&
socklen
);
rc
=
pgm_getsockopt
(
sock
,
IPPROTO_PGM
,
PGM_PENDING_SOCK
,
pending_notify_fd_
,
&
socklen
);
zmq_assert
(
rc
);
zmq_assert
(
socklen
==
sizeof
(
*
pending_notify_fd_
));
}
...
...
@@ -416,12 +462,13 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
const
int
status
=
pgm_send
(
sock
,
data_
,
data_len_
,
&
nbytes
);
// We have to write all data as one packet.
//
We have to write all data as one packet.
if
(
nbytes
>
0
)
{
zmq_assert
(
status
==
PGM_IO_STATUS_NORMAL
);
zmq_assert
((
ssize_t
)
nbytes
==
(
ssize_t
)
data_len_
);
}
else
{
zmq_assert
(
status
==
PGM_IO_STATUS_RATE_LIMITED
||
status
==
PGM_IO_STATUS_WOULD_BLOCK
);
zmq_assert
(
status
==
PGM_IO_STATUS_RATE_LIMITED
||
status
==
PGM_IO_STATUS_WOULD_BLOCK
);
if
(
status
==
PGM_IO_STATUS_RATE_LIMITED
)
errno
=
ENOMEM
;
...
...
@@ -429,7 +476,7 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
errno
=
EBUSY
;
}
// Save return value.
//
Save return value.
last_tx_status
=
status
;
return
nbytes
;
...
...
@@ -437,12 +484,15 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
long
zmq
::
pgm_socket_t
::
get_rx_timeout
()
{
if
(
last_rx_status
!=
PGM_IO_STATUS_RATE_LIMITED
&&
last_rx_status
!=
PGM_IO_STATUS_TIMER_PENDING
)
if
(
last_rx_status
!=
PGM_IO_STATUS_RATE_LIMITED
&&
last_rx_status
!=
PGM_IO_STATUS_TIMER_PENDING
)
return
-
1
;
struct
timeval
tv
;
socklen_t
optlen
=
sizeof
(
tv
);
const
bool
rc
=
pgm_getsockopt
(
sock
,
IPPROTO_PGM
,
last_rx_status
==
PGM_IO_STATUS_RATE_LIMITED
?
PGM_RATE_REMAIN
:
PGM_TIME_REMAIN
,
&
tv
,
&
optlen
);
const
bool
rc
=
pgm_getsockopt
(
sock
,
IPPROTO_PGM
,
last_rx_status
==
PGM_IO_STATUS_RATE_LIMITED
?
PGM_RATE_REMAIN
:
PGM_TIME_REMAIN
,
&
tv
,
&
optlen
);
zmq_assert
(
rc
);
const
long
timeout
=
(
tv
.
tv_sec
*
1000
)
+
(
tv
.
tv_usec
/
1000
);
...
...
@@ -457,7 +507,8 @@ long zmq::pgm_socket_t::get_tx_timeout ()
struct
timeval
tv
;
socklen_t
optlen
=
sizeof
(
tv
);
const
bool
rc
=
pgm_getsockopt
(
sock
,
IPPROTO_PGM
,
PGM_RATE_REMAIN
,
&
tv
,
&
optlen
);
const
bool
rc
=
pgm_getsockopt
(
sock
,
IPPROTO_PGM
,
PGM_RATE_REMAIN
,
&
tv
,
&
optlen
);
zmq_assert
(
rc
);
const
long
timeout
=
(
tv
.
tv_sec
*
1000
)
+
(
tv
.
tv_usec
/
1000
);
...
...
@@ -477,9 +528,8 @@ size_t zmq::pgm_socket_t::get_max_tsdu_size ()
return
(
size_t
)
max_tsdu
;
}
// pgm_recvmsgv is called to fill the pgm_msgv array up to
// pgm_msgv_len. In subsequent calls data from pgm_msgv structure are
// returned.
// pgm_recvmsgv is called to fill the pgm_msgv array up to pgm_msgv_len.
// In subsequent calls data from pgm_msgv structure are returned.
ssize_t
zmq
::
pgm_socket_t
::
receive
(
void
**
raw_data_
,
const
pgm_tsi_t
**
tsi_
)
{
size_t
raw_data_len
=
0
;
...
...
@@ -512,7 +562,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
const
int
status
=
pgm_recvmsgv
(
sock
,
pgm_msgv
,
pgm_msgv_len
,
MSG_ERRQUEUE
,
&
nbytes_rec
,
&
pgm_error
);
// Invalid parameters
// Invalid parameters
.
zmq_assert
(
status
!=
PGM_IO_STATUS_ERROR
);
last_rx_status
=
status
;
...
...
@@ -535,8 +585,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
zmq_assert
(
nbytes_rec
==
0
);
// In case if no RDATA/ODATA caused POLLIN 0 is
// returned.
// In case if no RDATA/ODATA caused POLLIN 0 is returned.
nbytes_rec
=
0
;
errno
=
ENOMEM
;
return
0
;
...
...
@@ -547,8 +596,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
zmq_assert
(
nbytes_rec
==
0
);
// In case if no RDATA/ODATA caused POLLIN 0 is
// returned.
// In case if no RDATA/ODATA caused POLLIN 0 is returned.
nbytes_rec
=
0
;
errno
=
EAGAIN
;
return
0
;
...
...
@@ -610,12 +658,13 @@ void zmq::pgm_socket_t::process_upstream ()
const
int
status
=
pgm_recvmsgv
(
sock
,
&
dummy_msg
,
1
,
MSG_ERRQUEUE
,
&
dummy_bytes
,
&
pgm_error
);
// Invalid parameters
// Invalid parameters
.
zmq_assert
(
status
!=
PGM_IO_STATUS_ERROR
);
// No data should be returned.
zmq_assert
(
dummy_bytes
==
0
&&
(
status
==
PGM_IO_STATUS_TIMER_PENDING
||
status
==
PGM_IO_STATUS_RATE_LIMITED
||
status
==
PGM_IO_STATUS_WOULD_BLOCK
));
status
==
PGM_IO_STATUS_RATE_LIMITED
||
status
==
PGM_IO_STATUS_WOULD_BLOCK
));
last_rx_status
=
status
;
...
...
src/pgm_socket.hpp
View file @
3d8eb071
...
...
@@ -40,9 +40,8 @@ namespace zmq
{
public
:
// If receiver_ is true PGM transport is not generating SPM packets.
// interface format: iface;mcast_group:port for raw PGM socket
// udp:iface;mcast_goup:port for UDP encapsulacion
pgm_socket_t
(
bool
receiver_
,
const
options_t
&
options_
);
// Closes the transport.
...
...
@@ -77,7 +76,7 @@ namespace zmq
private
:
// OpenPGM transport
// OpenPGM transport
.
pgm_sock_t
*
sock
;
int
last_rx_status
,
last_tx_status
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment