Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
ded0e5a6
Commit
ded0e5a6
authored
May 16, 2016
by
somdoron
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
problem: udp_engine didn't work with dgram socket type
parent
6db8f1e7
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
103 additions
and
29 deletions
+103
-29
udp_engine.cpp
src/udp_engine.cpp
+94
-28
udp_engine.hpp
src/udp_engine.hpp
+9
-1
No files found.
src/udp_engine.cpp
View file @
ded0e5a6
...
...
@@ -101,8 +101,18 @@ void zmq::udp_engine_t::plug (io_thread_t* io_thread_, session_base_t *session_)
io_object_t
::
plug
(
io_thread_
);
handle
=
add_fd
(
fd
);
if
(
send_enabled
)
if
(
send_enabled
)
{
if
(
!
options
.
raw_socket
)
{
out_address
=
address
->
resolved
.
udp_addr
->
dest_addr
();
out_addrlen
=
address
->
resolved
.
udp_addr
->
dest_addrlen
();
}
else
{
out_address
=
(
sockaddr
*
)
&
raw_address
;
out_addrlen
=
sizeof
(
sockaddr_in
);
}
set_pollout
(
handle
);
}
if
(
recv_enabled
)
{
int
on
=
1
;
...
...
@@ -152,6 +162,54 @@ void zmq::udp_engine_t::terminate()
delete
this
;
}
void
zmq
::
udp_engine_t
::
sockaddr_to_msg
(
zmq
::
msg_t
*
msg
,
sockaddr_in
*
addr
)
{
char
*
name
=
inet_ntoa
(
addr
->
sin_addr
);
char
port
[
6
];
snprintf
(
port
,
6
,
"%d"
,
(
int
)
ntohs
(
addr
->
sin_port
));
int
size
=
strlen
(
name
)
+
strlen
(
port
)
+
1
+
1
;
// Colon + NULL
int
rc
=
msg
->
init_size
(
size
);
errno_assert
(
rc
==
0
);
msg
->
set_flags
(
msg_t
::
more
);
char
*
address
=
(
char
*
)
msg
->
data
();
strcpy
(
address
,
name
);
strcat
(
address
,
":"
);
strcat
(
address
,
port
);
}
int
zmq
::
udp_engine_t
::
resolve_raw_address
(
char
*
name_
,
int
length_
)
{
const
char
*
delimiter
=
strrchr
(
name_
,
':'
);
if
(
!
delimiter
)
{
errno
=
EINVAL
;
return
-
1
;
}
std
::
string
addr_str
(
name_
,
delimiter
-
name_
);
std
::
string
port_str
(
delimiter
+
1
);
// Parse the port number (0 is not a valid port).
uint16_t
port
=
(
uint16_t
)
atoi
(
port_str
.
c_str
());
if
(
port
==
0
)
{
errno
=
EINVAL
;
return
-
1
;
}
raw_address
.
sin_family
=
AF_INET
;
raw_address
.
sin_port
=
htons
(
port
);
raw_address
.
sin_addr
.
s_addr
=
inet_addr
(
addr_str
.
c_str
());
if
(
raw_address
.
sin_addr
.
s_addr
==
INADDR_NONE
)
{
errno
=
EINVAL
;
return
-
1
;
}
return
0
;
}
void
zmq
::
udp_engine_t
::
out_event
()
{
msg_t
group_msg
;
...
...
@@ -164,20 +222,29 @@ void zmq::udp_engine_t::out_event()
size_t
group_size
=
group_msg
.
size
();
size_t
body_size
=
body_msg
.
size
();
size_t
size
=
group_size
+
body_size
+
1
;
size_t
size
;
struct
sockaddr
*
out_address
=
(
struct
sockaddr
*
)
address
->
resolved
.
udp_addr
->
dest_addr
();
socklen_t
out_addrlen
=
address
->
resolved
.
udp_addr
->
dest_addrlen
();
if
(
options
.
raw_socket
)
{
if
(
group_size
>
0
)
{
out_address
=
(
struct
sockaddr
*
)
group_msg
.
data
();
out_addrlen
=
group_size
;
size
=
body_size
;
rc
=
resolve_raw_address
((
char
*
)
group_msg
.
data
(),
group_size
);
// We discard the message if address is not valid
if
(
rc
!=
0
)
{
rc
=
group_msg
.
close
();
errno_assert
(
rc
==
0
);
body_msg
.
close
();
errno_assert
(
rc
==
0
);
return
;
}
size
=
body_size
;
memcpy
(
out_buffer
,
body_msg
.
data
(),
body_size
);
}
else
{
size
=
group_size
+
body_size
+
1
;
// TODO: check if larger than maximum size
out_buffer
[
0
]
=
(
unsigned
char
)
group_size
;
memcpy
(
out_buffer
+
1
,
group_msg
.
data
(),
group_size
);
...
...
@@ -192,8 +259,7 @@ void zmq::udp_engine_t::out_event()
#ifdef ZMQ_HAVE_WINDOWS
rc
=
sendto
(
fd
,
(
const
char
*
)
out_buffer
,
(
int
)
size
,
0
,
address
->
resolved
.
udp_addr
->
dest_addr
(),
(
int
)
address
->
resolved
.
udp_addr
->
dest_addrlen
());
out_address
,
(
int
)
out_addrlen
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
#else
rc
=
sendto
(
fd
,
out_buffer
,
size
,
0
,
out_address
,
out_addrlen
);
...
...
@@ -242,34 +308,34 @@ void zmq::udp_engine_t::in_event()
return
;
}
#endif
void
*
group_buffer
;
int
group_size
;
int
rc
;
int
body_size
;
int
body_offset
;
msg_t
msg
;
if
(
options
.
raw_socket
)
{
group_buffer
=
(
void
*
)
&
(
in_address
);
group_size
=
in_addrlen
;
body_
size
=
nbytes
-
1
;
sockaddr_to_msg
(
&
msg
,
&
in_address
);
body_size
=
nbytes
;
body_
offset
=
0
;
}
else
{
group_buffer
=
in_buffer
+
1
;
group_size
=
in_buffer
[
0
];
char
*
group_buffer
=
(
char
*
)
in_buffer
+
1
;
int
group_size
=
in_buffer
[
0
];
rc
=
msg
.
init_size
(
group_size
);
errno_assert
(
rc
==
0
);
msg
.
set_flags
(
msg_t
::
more
);
memcpy
(
msg
.
data
(),
group_buffer
,
group_size
);
// This doesn't fit, just ingore
if
(
nbytes
-
1
<
group_size
)
return
;
body_size
=
nbytes
-
1
-
group_size
;
body_offset
=
1
+
group_size
;
}
int
rc
=
msg
.
init_size
(
group_size
);
errno_assert
(
rc
==
0
);
msg
.
set_flags
(
msg_t
::
more
);
memcpy
(
msg
.
data
(),
group_buffer
,
group_size
);
rc
=
session
->
push_msg
(
&
msg
);
errno_assert
(
rc
==
0
||
(
rc
==
-
1
&&
errno
==
EAGAIN
));
...
...
@@ -286,7 +352,7 @@ void zmq::udp_engine_t::in_event()
errno_assert
(
rc
==
0
);
rc
=
msg
.
init_size
(
body_size
);
errno_assert
(
rc
==
0
);
memcpy
(
msg
.
data
(),
in_buffer
+
1
+
group_size
,
body_size
);
memcpy
(
msg
.
data
(),
in_buffer
+
body_offset
,
body_size
);
rc
=
session
->
push_msg
(
&
msg
);
errno_assert
(
rc
==
0
);
rc
=
msg
.
close
();
...
...
src/udp_engine.hpp
View file @
ded0e5a6
...
...
@@ -6,6 +6,7 @@
#include "i_engine.hpp"
#include "address.hpp"
#include "udp_address.hpp"
#include "msg.hpp"
#define MAX_UDP_MSG 8192
...
...
@@ -45,15 +46,22 @@ namespace zmq
private
:
int
resolve_raw_address
(
char
*
addr_
,
int
length_
);
void
sockaddr_to_msg
(
zmq
::
msg_t
*
msg
,
sockaddr_in
*
addr
);
bool
plugged
;
fd_t
fd
;
session_base_t
*
session
;
handle_t
handle
;
address_t
*
address
;
options_t
options
;
sockaddr_in
raw_address
;
const
struct
sockaddr
*
out_address
;
socklen_t
out_addrlen
;
unsigned
char
out_buffer
[
MAX_UDP_MSG
];
unsigned
char
in_buffer
[
MAX_UDP_MSG
];
bool
send_enabled
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment