Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
365c8eda
Commit
365c8eda
authored
May 13, 2016
by
Bitiquinho
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Add dgram_t class (based on stream socket and udp engine)
parent
72f19648
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
498 additions
and
26 deletions
+498
-26
Makefile.am
Makefile.am
+2
-0
zmq.h
include/zmq.h
+1
-0
dgram.cpp
src/dgram.cpp
+321
-0
dgram.hpp
src/dgram.hpp
+107
-0
mechanism.cpp
src/mechanism.cpp
+4
-2
session_base.cpp
src/session_base.cpp
+7
-2
socket_base.cpp
src/socket_base.cpp
+7
-2
udp_engine.cpp
src/udp_engine.cpp
+46
-19
udp_engine.hpp
src/udp_engine.hpp
+3
-1
No files found.
Makefile.am
View file @
365c8eda
...
@@ -45,6 +45,8 @@ src_libzmq_la_SOURCES = \
...
@@ -45,6 +45,8 @@ src_libzmq_la_SOURCES = \
src/decoder.hpp
\
src/decoder.hpp
\
src/devpoll.cpp
\
src/devpoll.cpp
\
src/devpoll.hpp
\
src/devpoll.hpp
\
src/dgram.cpp
\
src/dgram.hpp
\
src/dish.cpp
\
src/dish.cpp
\
src/dish.hpp
\
src/dish.hpp
\
src/dist.cpp
\
src/dist.cpp
\
...
...
include/zmq.h
View file @
365c8eda
...
@@ -523,6 +523,7 @@ ZMQ_EXPORT void zmq_threadclose (void* thread);
...
@@ -523,6 +523,7 @@ ZMQ_EXPORT void zmq_threadclose (void* thread);
#define ZMQ_DISH 15
#define ZMQ_DISH 15
#define ZMQ_GATHER 16
#define ZMQ_GATHER 16
#define ZMQ_SCATTER 17
#define ZMQ_SCATTER 17
#define ZMQ_DGRAM 18
/* DRAFT Socket methods. */
/* DRAFT Socket methods. */
ZMQ_EXPORT
int
zmq_join
(
void
*
s
,
const
char
*
group
);
ZMQ_EXPORT
int
zmq_join
(
void
*
s
,
const
char
*
group
);
...
...
src/dgram.cpp
0 → 100644
View file @
365c8eda
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "precompiled.hpp"
#include "macros.hpp"
#include "dgram.hpp"
#include "pipe.hpp"
#include "wire.hpp"
#include "random.hpp"
#include "likely.hpp"
#include "err.hpp"
zmq
::
dgram_t
::
dgram_t
(
class
ctx_t
*
parent_
,
uint32_t
tid_
,
int
sid_
)
:
socket_base_t
(
parent_
,
tid_
,
sid_
),
prefetched
(
false
),
identity_sent
(
false
),
current_out
(
NULL
),
more_out
(
false
),
next_rid
(
generate_random
())
{
options
.
type
=
ZMQ_DGRAM
;
options
.
raw_socket
=
true
;
prefetched_id
.
init
();
prefetched_msg
.
init
();
}
zmq
::
dgram_t
::~
dgram_t
()
{
zmq_assert
(
outpipes
.
empty
());
prefetched_id
.
close
();
prefetched_msg
.
close
();
}
void
zmq
::
dgram_t
::
xattach_pipe
(
pipe_t
*
pipe_
,
bool
subscribe_to_all_
)
{
LIBZMQ_UNUSED
(
subscribe_to_all_
);
zmq_assert
(
pipe_
);
identify_peer
(
pipe_
);
fq
.
attach
(
pipe_
);
}
void
zmq
::
dgram_t
::
xpipe_terminated
(
pipe_t
*
pipe_
)
{
outpipes_t
::
iterator
it
=
outpipes
.
find
(
pipe_
->
get_identity
());
zmq_assert
(
it
!=
outpipes
.
end
());
outpipes
.
erase
(
it
);
fq
.
pipe_terminated
(
pipe_
);
if
(
pipe_
==
current_out
)
current_out
=
NULL
;
}
void
zmq
::
dgram_t
::
xread_activated
(
pipe_t
*
pipe_
)
{
fq
.
activated
(
pipe_
);
}
void
zmq
::
dgram_t
::
xwrite_activated
(
pipe_t
*
pipe_
)
{
outpipes_t
::
iterator
it
;
for
(
it
=
outpipes
.
begin
();
it
!=
outpipes
.
end
();
++
it
)
if
(
it
->
second
.
pipe
==
pipe_
)
break
;
zmq_assert
(
it
!=
outpipes
.
end
());
zmq_assert
(
!
it
->
second
.
active
);
it
->
second
.
active
=
true
;
}
int
zmq
::
dgram_t
::
xsend
(
msg_t
*
msg_
)
{
// If this is the first part of the message it's the ID of the
// peer to send the message to.
if
(
!
more_out
)
{
zmq_assert
(
!
current_out
);
// If we have malformed message (prefix with no subsequent message)
// then just silently ignore it.
// TODO: The connections should be killed instead.
if
(
msg_
->
flags
()
&
msg_t
::
more
)
{
// Find the pipe associated with the identity stored in the prefix.
// If there's no such pipe return an error
blob_t
identity
((
unsigned
char
*
)
msg_
->
data
(),
msg_
->
size
());
outpipes_t
::
iterator
it
=
outpipes
.
find
(
identity
);
if
(
it
!=
outpipes
.
end
())
{
current_out
=
it
->
second
.
pipe
;
if
(
!
current_out
->
check_write
())
{
it
->
second
.
active
=
false
;
current_out
=
NULL
;
errno
=
EAGAIN
;
return
-
1
;
}
}
else
{
errno
=
EHOSTUNREACH
;
return
-
1
;
}
}
// Expect one more message frame.
more_out
=
true
;
int
rc
=
msg_
->
close
();
errno_assert
(
rc
==
0
);
rc
=
msg_
->
init
();
errno_assert
(
rc
==
0
);
return
0
;
}
// Ignore the MORE flag
msg_
->
reset_flags
(
msg_t
::
more
);
// This is the last part of the message.
more_out
=
false
;
// Push the message into the pipe. If there's no out pipe, just drop it.
if
(
current_out
)
{
// Close the remote connection if user has asked to do so
// by sending zero length message.
// Pending messages in the pipe will be dropped (on receiving term- ack)
if
(
msg_
->
size
()
==
0
)
{
current_out
->
terminate
(
false
);
int
rc
=
msg_
->
close
();
errno_assert
(
rc
==
0
);
rc
=
msg_
->
init
();
errno_assert
(
rc
==
0
);
current_out
=
NULL
;
return
0
;
}
bool
ok
=
current_out
->
write
(
msg_
);
if
(
likely
(
ok
))
current_out
->
flush
();
current_out
=
NULL
;
}
else
{
int
rc
=
msg_
->
close
();
errno_assert
(
rc
==
0
);
}
// Detach the message from the data buffer.
int
rc
=
msg_
->
init
();
errno_assert
(
rc
==
0
);
return
0
;
}
int
zmq
::
dgram_t
::
xsetsockopt
(
int
option_
,
const
void
*
optval_
,
size_t
optvallen_
)
{
bool
is_int
=
(
optvallen_
==
sizeof
(
int
));
int
value
=
0
;
if
(
is_int
)
memcpy
(
&
value
,
optval_
,
sizeof
(
int
));
switch
(
option_
)
{
case
ZMQ_CONNECT_RID
:
if
(
optval_
&&
optvallen_
)
{
connect_rid
.
assign
((
char
*
)
optval_
,
optvallen_
);
return
0
;
}
break
;
case
ZMQ_STREAM_NOTIFY
:
if
(
is_int
&&
(
value
==
0
||
value
==
1
))
{
options
.
raw_notify
=
(
value
!=
0
);
return
0
;
}
break
;
default
:
break
;
}
errno
=
EINVAL
;
return
-
1
;
}
int
zmq
::
dgram_t
::
xrecv
(
msg_t
*
msg_
)
{
if
(
prefetched
)
{
if
(
!
identity_sent
)
{
int
rc
=
msg_
->
move
(
prefetched_id
);
errno_assert
(
rc
==
0
);
identity_sent
=
true
;
}
else
{
int
rc
=
msg_
->
move
(
prefetched_msg
);
errno_assert
(
rc
==
0
);
prefetched
=
false
;
}
return
0
;
}
pipe_t
*
pipe
=
NULL
;
int
rc
=
fq
.
recvpipe
(
&
prefetched_msg
,
&
pipe
);
if
(
rc
!=
0
)
return
-
1
;
zmq_assert
(
pipe
!=
NULL
);
zmq_assert
((
prefetched_msg
.
flags
()
&
msg_t
::
more
)
==
0
);
// We have received a frame with TCP data.
// Rather than sending this frame, we keep it in prefetched
// buffer and send a frame with peer's ID.
blob_t
identity
=
pipe
->
get_identity
();
rc
=
msg_
->
close
();
errno_assert
(
rc
==
0
);
rc
=
msg_
->
init_size
(
identity
.
size
());
errno_assert
(
rc
==
0
);
// forward metadata (if any)
metadata_t
*
metadata
=
prefetched_msg
.
metadata
();
if
(
metadata
)
msg_
->
set_metadata
(
metadata
);
memcpy
(
msg_
->
data
(),
identity
.
data
(),
identity
.
size
());
msg_
->
set_flags
(
msg_t
::
more
);
prefetched
=
true
;
identity_sent
=
true
;
return
0
;
}
bool
zmq
::
dgram_t
::
xhas_in
()
{
// We may already have a message pre-fetched.
if
(
prefetched
)
return
true
;
// Try to read the next message.
// The message, if read, is kept in the pre-fetch buffer.
pipe_t
*
pipe
=
NULL
;
int
rc
=
fq
.
recvpipe
(
&
prefetched_msg
,
&
pipe
);
if
(
rc
!=
0
)
return
false
;
zmq_assert
(
pipe
!=
NULL
);
zmq_assert
((
prefetched_msg
.
flags
()
&
msg_t
::
more
)
==
0
);
blob_t
identity
=
pipe
->
get_identity
();
rc
=
prefetched_id
.
init_size
(
identity
.
size
());
errno_assert
(
rc
==
0
);
// forward metadata (if any)
metadata_t
*
metadata
=
prefetched_msg
.
metadata
();
if
(
metadata
)
prefetched_id
.
set_metadata
(
metadata
);
memcpy
(
prefetched_id
.
data
(),
identity
.
data
(),
identity
.
size
());
prefetched_id
.
set_flags
(
msg_t
::
more
);
prefetched
=
true
;
identity_sent
=
false
;
return
true
;
}
bool
zmq
::
dgram_t
::
xhas_out
()
{
// In theory, STREAM socket is always ready for writing. Whether actual
// attempt to write succeeds depends on which pipe the message is going
// to be routed to.
return
true
;
}
void
zmq
::
dgram_t
::
identify_peer
(
pipe_t
*
pipe_
)
{
// Always assign identity for raw-socket
unsigned
char
buffer
[
5
];
buffer
[
0
]
=
0
;
blob_t
identity
;
if
(
connect_rid
.
length
())
{
identity
=
blob_t
((
unsigned
char
*
)
connect_rid
.
c_str
(),
connect_rid
.
length
());
connect_rid
.
clear
();
outpipes_t
::
iterator
it
=
outpipes
.
find
(
identity
);
zmq_assert
(
it
==
outpipes
.
end
());
}
else
{
put_uint32
(
buffer
+
1
,
next_rid
++
);
identity
=
blob_t
(
buffer
,
sizeof
buffer
);
memcpy
(
options
.
identity
,
identity
.
data
(),
identity
.
size
());
options
.
identity_size
=
(
unsigned
char
)
identity
.
size
();
}
pipe_
->
set_identity
(
identity
);
// Add the record into output pipes lookup table
outpipe_t
outpipe
=
{
pipe_
,
true
};
const
bool
ok
=
outpipes
.
insert
(
outpipes_t
::
value_type
(
identity
,
outpipe
)).
second
;
zmq_assert
(
ok
);
}
src/dgram.hpp
0 → 100644
View file @
365c8eda
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_DGRAM_HPP_INCLUDED__
#define __ZMQ_DGRAM_HPP_INCLUDED__
#include <map>
#include "router.hpp"
namespace
zmq
{
class
ctx_t
;
class
pipe_t
;
class
dgram_t
:
public
socket_base_t
{
public
:
dgram_t
(
zmq
::
ctx_t
*
parent_
,
uint32_t
tid_
,
int
sid
);
~
dgram_t
();
// Overrides of functions from socket_base_t.
void
xattach_pipe
(
zmq
::
pipe_t
*
pipe_
,
bool
subscribe_to_all_
);
int
xsend
(
zmq
::
msg_t
*
msg_
);
int
xrecv
(
zmq
::
msg_t
*
msg_
);
bool
xhas_in
();
bool
xhas_out
();
void
xread_activated
(
zmq
::
pipe_t
*
pipe_
);
void
xwrite_activated
(
zmq
::
pipe_t
*
pipe_
);
void
xpipe_terminated
(
zmq
::
pipe_t
*
pipe_
);
int
xsetsockopt
(
int
option_
,
const
void
*
optval_
,
size_t
optvallen_
);
private
:
// Generate peer's id and update lookup map
void
identify_peer
(
pipe_t
*
pipe_
);
// Fair queueing object for inbound pipes.
fq_t
fq
;
// True iff there is a message held in the pre-fetch buffer.
bool
prefetched
;
// If true, the receiver got the message part with
// the peer's identity.
bool
identity_sent
;
// Holds the prefetched identity.
msg_t
prefetched_id
;
// Holds the prefetched message.
msg_t
prefetched_msg
;
struct
outpipe_t
{
zmq
::
pipe_t
*
pipe
;
bool
active
;
};
// Outbound pipes indexed by the peer IDs.
typedef
std
::
map
<
blob_t
,
outpipe_t
>
outpipes_t
;
outpipes_t
outpipes
;
// The pipe we are currently writing to.
zmq
::
pipe_t
*
current_out
;
// If true, more outgoing message parts are expected.
bool
more_out
;
// Routing IDs are generated. It's a simple increment and wrap-over
// algorithm. This value is the next ID to use (if not used already).
uint32_t
next_rid
;
dgram_t
(
const
dgram_t
&
);
const
dgram_t
&
operator
=
(
const
dgram_t
&
);
};
}
#endif
src/mechanism.cpp
View file @
365c8eda
...
@@ -78,8 +78,8 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type) const
...
@@ -78,8 +78,8 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type) const
"XPUB"
,
"XSUB"
,
"STREAM"
,
"XPUB"
,
"XSUB"
,
"STREAM"
,
"SERVER"
,
"CLIENT"
,
"SERVER"
,
"CLIENT"
,
"RADIO"
,
"DISH"
,
"RADIO"
,
"DISH"
,
"GATHER"
,
"SCATTER"
};
"GATHER"
,
"SCATTER"
,
"DGRAM"
};
zmq_assert
(
socket_type
>=
0
&&
socket_type
<=
1
7
);
zmq_assert
(
socket_type
>=
0
&&
socket_type
<=
1
8
);
return
names
[
socket_type
];
return
names
[
socket_type
];
}
}
...
@@ -203,6 +203,8 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const
...
@@ -203,6 +203,8 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const
return
type_
==
"SCATTER"
;
return
type_
==
"SCATTER"
;
case
ZMQ_SCATTER
:
case
ZMQ_SCATTER
:
return
type_
==
"GATHER"
;
return
type_
==
"GATHER"
;
case
ZMQ_DGRAM
:
return
type_
==
"DGRAM"
;
default
:
default
:
break
;
break
;
}
}
...
...
src/session_base.cpp
View file @
365c8eda
...
@@ -83,6 +83,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
...
@@ -83,6 +83,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
case
ZMQ_CLIENT
:
case
ZMQ_CLIENT
:
case
ZMQ_GATHER
:
case
ZMQ_GATHER
:
case
ZMQ_SCATTER
:
case
ZMQ_SCATTER
:
case
ZMQ_DGRAM
:
s
=
new
(
std
::
nothrow
)
session_base_t
(
io_thread_
,
active_
,
s
=
new
(
std
::
nothrow
)
session_base_t
(
io_thread_
,
active_
,
socket_
,
options_
,
addr_
);
socket_
,
options_
,
addr_
);
break
;
break
;
...
@@ -572,9 +573,9 @@ void zmq::session_base_t::start_connecting (bool wait_)
...
@@ -572,9 +573,9 @@ void zmq::session_base_t::start_connecting (bool wait_)
#endif
#endif
if
(
addr
->
protocol
==
"udp"
)
{
if
(
addr
->
protocol
==
"udp"
)
{
zmq_assert
(
options
.
type
==
ZMQ_DISH
||
options
.
type
==
ZMQ_RADIO
);
zmq_assert
(
options
.
type
==
ZMQ_DISH
||
options
.
type
==
ZMQ_RADIO
||
options
.
type
==
ZMQ_DGRAM
);
udp_engine_t
*
engine
=
new
(
std
::
nothrow
)
udp_engine_t
();
udp_engine_t
*
engine
=
new
(
std
::
nothrow
)
udp_engine_t
(
options
);
alloc_assert
(
engine
);
alloc_assert
(
engine
);
bool
recv
=
false
;
bool
recv
=
false
;
...
@@ -588,6 +589,10 @@ void zmq::session_base_t::start_connecting (bool wait_)
...
@@ -588,6 +589,10 @@ void zmq::session_base_t::start_connecting (bool wait_)
send
=
false
;
send
=
false
;
recv
=
true
;
recv
=
true
;
}
}
else
if
(
options
.
type
==
ZMQ_DGRAM
)
{
send
=
true
;
recv
=
true
;
}
int
rc
=
engine
->
init
(
addr
,
send
,
recv
);
int
rc
=
engine
->
init
(
addr
,
send
,
recv
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
...
...
src/socket_base.cpp
View file @
365c8eda
...
@@ -95,6 +95,7 @@
...
@@ -95,6 +95,7 @@
#include "dish.hpp"
#include "dish.hpp"
#include "gather.hpp"
#include "gather.hpp"
#include "scatter.hpp"
#include "scatter.hpp"
#include "dgram.hpp"
#define ENTER_MUTEX() \
#define ENTER_MUTEX() \
if (thread_safe) \
if (thread_safe) \
...
@@ -168,6 +169,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
...
@@ -168,6 +169,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
case
ZMQ_SCATTER
:
case
ZMQ_SCATTER
:
s
=
new
(
std
::
nothrow
)
scatter_t
(
parent_
,
tid_
,
sid_
);
s
=
new
(
std
::
nothrow
)
scatter_t
(
parent_
,
tid_
,
sid_
);
break
;
break
;
case
ZMQ_DGRAM
:
s
=
new
(
std
::
nothrow
)
dgram_t
(
parent_
,
tid_
,
sid_
);
break
;
default
:
default
:
errno
=
EINVAL
;
errno
=
EINVAL
;
return
NULL
;
return
NULL
;
...
@@ -304,7 +308,8 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
...
@@ -304,7 +308,8 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
#endif
#endif
if
(
protocol_
==
"udp"
&&
(
options
.
type
!=
ZMQ_DISH
&&
if
(
protocol_
==
"udp"
&&
(
options
.
type
!=
ZMQ_DISH
&&
options
.
type
!=
ZMQ_RADIO
))
{
options
.
type
!=
ZMQ_RADIO
&&
options
.
type
!=
ZMQ_DGRAM
))
{
errno
=
ENOCOMPATPROTO
;
errno
=
ENOCOMPATPROTO
;
return
-
1
;
return
-
1
;
}
}
...
@@ -878,7 +883,7 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -878,7 +883,7 @@ int zmq::socket_base_t::connect (const char *addr_)
if
(
protocol
==
"udp"
)
{
if
(
protocol
==
"udp"
)
{
paddr
->
resolved
.
udp_addr
=
new
(
std
::
nothrow
)
udp_address_t
();
paddr
->
resolved
.
udp_addr
=
new
(
std
::
nothrow
)
udp_address_t
();
alloc_assert
(
paddr
->
resolved
.
udp_addr
);
alloc_assert
(
paddr
->
resolved
.
udp_addr
);
rc
=
paddr
->
resolved
.
udp_addr
->
resolve
(
address
.
c_str
(),
options
.
type
==
ZMQ_DISH
);
rc
=
paddr
->
resolved
.
udp_addr
->
resolve
(
address
.
c_str
(),
(
options
.
type
==
ZMQ_DISH
||
options
.
type
==
ZMQ_DGRAM
)
);
if
(
rc
!=
0
)
{
if
(
rc
!=
0
)
{
LIBZMQ_DELETE
(
paddr
);
LIBZMQ_DELETE
(
paddr
);
EXIT_MUTEX
();
EXIT_MUTEX
();
...
...
src/udp_engine.cpp
View file @
365c8eda
...
@@ -43,12 +43,13 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
...
@@ -43,12 +43,13 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "err.hpp"
#include "err.hpp"
#include "ip.hpp"
#include "ip.hpp"
zmq
::
udp_engine_t
::
udp_engine_t
()
:
zmq
::
udp_engine_t
::
udp_engine_t
(
const
options_t
&
options_
)
:
plugged
(
false
),
plugged
(
false
),
fd
(
-
1
),
fd
(
-
1
),
session
(
NULL
),
session
(
NULL
),
handle
(
NULL
),
handle
(
NULL
),
address
(
NULL
),
address
(
NULL
),
options
(
options_
),
send_enabled
(
false
),
send_enabled
(
false
),
recv_enabled
(
false
)
recv_enabled
(
false
)
{
{
...
@@ -165,10 +166,23 @@ void zmq::udp_engine_t::out_event()
...
@@ -165,10 +166,23 @@ void zmq::udp_engine_t::out_event()
size_t
body_size
=
body_msg
.
size
();
size_t
body_size
=
body_msg
.
size
();
size_t
size
=
group_size
+
body_size
+
1
;
size_t
size
=
group_size
+
body_size
+
1
;
// TODO: check if larger than maximum size
struct
sockaddr
*
out_address
=
(
struct
sockaddr
*
)
address
->
resolved
.
udp_addr
->
dest_addr
();
out_buffer
[
0
]
=
(
unsigned
char
)
group_size
;
socklen_t
out_addrlen
=
address
->
resolved
.
udp_addr
->
dest_addrlen
();
memcpy
(
out_buffer
+
1
,
group_msg
.
data
(),
group_size
);
memcpy
(
out_buffer
+
1
+
group_size
,
body_msg
.
data
(),
body_size
);
if
(
options
.
raw_socket
)
{
if
(
group_size
>
0
)
{
out_address
=
(
struct
sockaddr
*
)
group_msg
.
data
();
out_addrlen
=
group_size
;
size
=
body_size
;
}
memcpy
(
out_buffer
,
body_msg
.
data
(),
body_size
);
}
else
{
// TODO: check if larger than maximum size
out_buffer
[
0
]
=
(
unsigned
char
)
group_size
;
memcpy
(
out_buffer
+
1
,
group_msg
.
data
(),
group_size
);
memcpy
(
out_buffer
+
1
+
group_size
,
body_msg
.
data
(),
body_size
);
}
rc
=
group_msg
.
close
();
rc
=
group_msg
.
close
();
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
...
@@ -182,9 +196,7 @@ void zmq::udp_engine_t::out_event()
...
@@ -182,9 +196,7 @@ void zmq::udp_engine_t::out_event()
(
int
)
address
->
resolved
.
udp_addr
->
dest_addrlen
());
(
int
)
address
->
resolved
.
udp_addr
->
dest_addrlen
());
wsa_assert
(
rc
!=
SOCKET_ERROR
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
#else
#else
rc
=
sendto
(
fd
,
out_buffer
,
size
,
0
,
rc
=
sendto
(
fd
,
out_buffer
,
size
,
0
,
out_address
,
out_addrlen
);
address
->
resolved
.
udp_addr
->
dest_addr
(),
address
->
resolved
.
udp_addr
->
dest_addrlen
());
errno_assert
(
rc
!=
-
1
);
errno_assert
(
rc
!=
-
1
);
#endif
#endif
}
}
...
@@ -208,8 +220,10 @@ void zmq::udp_engine_t::restart_output()
...
@@ -208,8 +220,10 @@ void zmq::udp_engine_t::restart_output()
void
zmq
::
udp_engine_t
::
in_event
()
void
zmq
::
udp_engine_t
::
in_event
()
{
{
struct
sockaddr_in
in_address
;
socklen_t
in_addrlen
;
#ifdef ZMQ_HAVE_WINDOWS
#ifdef ZMQ_HAVE_WINDOWS
int
nbytes
=
recv
(
fd
,
(
char
*
)
in_buffer
,
MAX_UDP_MSG
,
0
);
int
nbytes
=
recv
from
(
fd
,
(
char
*
)
in_buffer
,
MAX_UDP_MSG
,
0
,
(
sockaddr
*
)
&
address
,
&
addrlen
);
const
int
last_error
=
WSAGetLastError
();
const
int
last_error
=
WSAGetLastError
();
if
(
nbytes
==
SOCKET_ERROR
)
{
if
(
nbytes
==
SOCKET_ERROR
)
{
wsa_assert
(
wsa_assert
(
...
@@ -219,7 +233,7 @@ void zmq::udp_engine_t::in_event()
...
@@ -219,7 +233,7 @@ void zmq::udp_engine_t::in_event()
return
;
return
;
}
}
#else
#else
int
nbytes
=
recv
(
fd
,
in_buffer
,
MAX_UDP_MSG
,
0
);
int
nbytes
=
recv
from
(
fd
,
in_buffer
,
MAX_UDP_MSG
,
0
,
(
sockaddr
*
)
&
in_address
,
&
in_addrlen
);
if
(
nbytes
==
-
1
)
{
if
(
nbytes
==
-
1
)
{
errno_assert
(
errno
!=
EBADF
errno_assert
(
errno
!=
EBADF
&&
errno
!=
EFAULT
&&
errno
!=
EFAULT
...
@@ -229,20 +243,33 @@ void zmq::udp_engine_t::in_event()
...
@@ -229,20 +243,33 @@ void zmq::udp_engine_t::in_event()
}
}
#endif
#endif
int
group_size
=
in_buffer
[
0
];
void
*
group_buffer
;
int
group_size
;
// This doesn't fit, just ingore
int
body_size
;
if
(
nbytes
-
1
<
group_size
)
msg_t
msg
;
return
;
if
(
options
.
raw_socket
)
{
group_buffer
=
(
void
*
)
&
(
in_address
);
group_size
=
in_addrlen
;
body_size
=
nbytes
-
1
;
}
else
{
group_buffer
=
in_buffer
+
1
;
group_size
=
in_buffer
[
0
];
int
body_size
=
nbytes
-
1
-
group_size
;
// This doesn't fit, just ingore
if
(
nbytes
-
1
<
group_size
)
return
;
msg_t
msg
;
body_size
=
nbytes
-
1
-
group_size
;
}
int
rc
=
msg
.
init_size
(
group_size
);
int
rc
=
msg
.
init_size
(
group_size
);
errno_assert
(
rc
==
0
);
errno_assert
(
rc
==
0
);
msg
.
set_flags
(
msg_t
::
more
);
msg
.
set_flags
(
msg_t
::
more
);
memcpy
(
msg
.
data
(),
in_buffer
+
1
,
group_size
);
memcpy
(
msg
.
data
(),
group_buffer
,
group_size
);
rc
=
session
->
push_msg
(
&
msg
);
rc
=
session
->
push_msg
(
&
msg
);
errno_assert
(
rc
==
0
||
(
rc
==
-
1
&&
errno
==
EAGAIN
));
errno_assert
(
rc
==
0
||
(
rc
==
-
1
&&
errno
==
EAGAIN
));
...
...
src/udp_engine.hpp
View file @
365c8eda
...
@@ -17,7 +17,7 @@ namespace zmq
...
@@ -17,7 +17,7 @@ namespace zmq
class
udp_engine_t
:
public
io_object_t
,
public
i_engine
class
udp_engine_t
:
public
io_object_t
,
public
i_engine
{
{
public
:
public
:
udp_engine_t
();
udp_engine_t
(
const
options_t
&
options_
);
~
udp_engine_t
();
~
udp_engine_t
();
int
init
(
address_t
*
address_
,
bool
send_
,
bool
recv_
);
int
init
(
address_t
*
address_
,
bool
send_
,
bool
recv_
);
...
@@ -51,6 +51,8 @@ namespace zmq
...
@@ -51,6 +51,8 @@ namespace zmq
session_base_t
*
session
;
session_base_t
*
session
;
handle_t
handle
;
handle_t
handle
;
address_t
*
address
;
address_t
*
address
;
options_t
options
;
unsigned
char
out_buffer
[
MAX_UDP_MSG
];
unsigned
char
out_buffer
[
MAX_UDP_MSG
];
unsigned
char
in_buffer
[
MAX_UDP_MSG
];
unsigned
char
in_buffer
[
MAX_UDP_MSG
];
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment