Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
3d9984ef
Commit
3d9984ef
authored
Feb 03, 2016
by
Pieter Hintjens
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #1764 from GreatFruitOmsk/master
Make VMCI work on Windows
parents
ad286d94
538e5d47
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
478 additions
and
123 deletions
+478
-123
local_thr.cpp
perf/local_thr.cpp
+1
-1
select.cpp
src/select.cpp
+323
-81
select.hpp
src/select.hpp
+52
-19
vmci.cpp
src/vmci.cpp
+4
-4
vmci_connecter.cpp
src/vmci_connecter.cpp
+52
-13
vmci_connecter.hpp
src/vmci_connecter.hpp
+2
-2
vmci_listener.cpp
src/vmci_listener.cpp
+44
-3
No files found.
perf/local_thr.cpp
View file @
3d9984ef
...
...
@@ -117,7 +117,7 @@ int main (int argc, char *argv [])
throughput
=
(
unsigned
long
)
((
double
)
message_count
/
(
double
)
elapsed
*
1000000
);
megabits
=
(
double
)
(
throughput
*
message_size
*
8
)
/
1000000
;
megabits
=
(
(
double
)
throughput
*
message_size
*
8
)
/
1000000
;
printf
(
"message size: %d [B]
\n
"
,
(
int
)
message_size
);
printf
(
"message count: %d
\n
"
,
(
int
)
message_count
);
...
...
src/select.cpp
View file @
3d9984ef
...
...
@@ -31,6 +31,7 @@
#if defined ZMQ_USE_SELECT
#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#elif defined ZMQ_HAVE_HPUX
...
...
@@ -44,23 +45,21 @@
#include <sys/select.h>
#endif
#include <string.h>
#include <algorithm>
#include "err.hpp"
#include "config.hpp"
#include "i_poll_events.hpp"
zmq
::
select_t
::
select_t
(
const
zmq
::
ctx_t
&
ctx_
)
:
ctx
(
ctx_
),
maxfd
(
retired_fd
),
ctx
(
ctx_
),
#if defined ZMQ_HAVE_WINDOWS
// Fine as long as map is not cleared.
current_family_entry_it
(
family_entries
.
end
()),
#else
retired
(
false
),
maxfd
(
retired_fd
),
#endif
stopping
(
false
)
{
// Clear file descriptor sets.
FD_ZERO
(
&
source_set_in
);
FD_ZERO
(
&
source_set_out
);
FD_ZERO
(
&
source_set_err
);
}
zmq
::
select_t
::~
select_t
()
...
...
@@ -70,22 +69,24 @@ zmq::select_t::~select_t ()
zmq
::
select_t
::
handle_t
zmq
::
select_t
::
add_fd
(
fd_t
fd_
,
i_poll_events
*
events_
)
{
// Store the file descriptor.
fd_entry_t
entry
=
{
fd_
,
events_
};
fds
.
push_back
(
entry
);
// Ensure we do not attempt to select () on more than FD_SETSIZE
// file descriptors.
zmq_assert
(
fds
.
size
()
<=
FD_SETSIZE
);
fd_entry_t
fd_entry
;
fd_entry
.
fd
=
fd_
;
fd_entry
.
events
=
events_
;
// Start polling on errors.
FD_SET
(
fd_
,
&
source_set_err
);
#if defined ZMQ_HAVE_WINDOWS
u_short
family
=
get_fd_family
(
fd_
);
wsa_assert
(
family
!=
AF_UNSPEC
);
family_entry_t
&
family_entry
=
family_entries
[
family
];
family_entry
.
fd_entries
.
push_back
(
fd_entry
);
FD_SET
(
fd_
,
&
family_entry
.
fds_set
.
error
);
#else
fd_entries
.
push_back
(
fd_entry
);
FD_SET
(
fd_
,
&
fds_set
.
error
);
// Adjust maxfd if necessary.
if
(
fd_
>
maxfd
)
maxfd
=
fd_
;
#endif
// Increase the load metric of the thread.
adjust_load
(
1
);
return
fd_
;
...
...
@@ -93,56 +94,106 @@ zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
void
zmq
::
select_t
::
rm_fd
(
handle_t
handle_
)
{
// Mark the descriptor as retired.
fd_set_t
::
iterator
it
;
for
(
it
=
fds
.
begin
();
it
!=
fds
.
end
();
++
it
)
if
(
it
->
fd
==
handle_
)
#if defined ZMQ_HAVE_WINDOWS
u_short
family
=
get_fd_family
(
handle_
);
wsa_assert
(
family
!=
AF_UNSPEC
);
family_entries_t
::
iterator
family_entry_it
=
family_entries
.
find
(
family
);
family_entry_t
&
family_entry
=
family_entry_it
->
second
;
if
(
family_entry_it
!=
current_family_entry_it
)
{
// Family is not currently being iterated and can be safely
// modified in palce. So later it can be skipped withour re-verifying
// its content.
fd_entries_t
::
iterator
fd_entry_it
;
for
(
fd_entry_it
=
family_entry
.
fd_entries
.
begin
();
fd_entry_it
!=
family_entry
.
fd_entries
.
end
();
++
fd_entry_it
)
if
(
fd_entry_it
->
fd
==
handle_
)
break
;
zmq_assert
(
fd_entry_it
!=
family_entry
.
fd_entries
.
end
());
family_entry
.
fd_entries
.
erase
(
fd_entry_it
);
family_entry
.
fds_set
.
remove_fd
(
handle_
);
}
else
{
// Otherwise mark removed entries as retired. It will be cleaned up
// at the end of the iteration. See zmq::select_t::loop
fd_entries_t
::
iterator
fd_entry_it
;
for
(
fd_entry_it
=
family_entry
.
fd_entries
.
begin
();
fd_entry_it
!=
family_entry
.
fd_entries
.
end
();
++
fd_entry_it
)
if
(
fd_entry_it
->
fd
==
handle_
)
break
;
zmq_assert
(
fd_entry_it
!=
family_entry
.
fd_entries
.
end
());
fd_entry_it
->
fd
=
retired_fd
;
family_entry
.
fds_set
.
remove_fd
(
handle_
);
family_entry
.
retired
=
true
;
}
#else
fd_entries_t
::
iterator
fd_entry_it
;
for
(
fd_entry_it
=
fd_entries
.
begin
();
fd_entry_it
!=
fd_entries
.
end
();
++
fd_entry_it
)
if
(
fd_entry_it
->
fd
==
handle_
)
break
;
zmq_assert
(
it
!=
fds
.
end
());
it
->
fd
=
retired_fd
;
retired
=
true
;
// Stop polling on the descriptor.
FD_CLR
(
handle_
,
&
source_set_in
);
FD_CLR
(
handle_
,
&
source_set_out
);
FD_CLR
(
handle_
,
&
source_set_err
);
zmq_assert
(
fd_entry_it
!=
fd_entries
.
end
());
// Discard all events generated on this file descriptor.
FD_CLR
(
handle_
,
&
readfds
);
FD_CLR
(
handle_
,
&
writefds
);
FD_CLR
(
handle_
,
&
exceptfds
);
fd_entry_it
->
fd
=
retired_fd
;
fds_set
.
remove_fd
(
handle_
);
// Adjust the maxfd attribute if we have removed the
// highest-numbered file descriptor.
if
(
handle_
==
maxfd
)
{
maxfd
=
retired_fd
;
for
(
fd_set_t
::
iterator
it
=
fds
.
begin
();
it
!=
fds
.
end
();
++
it
)
if
(
it
->
fd
>
maxfd
)
maxfd
=
it
->
fd
;
for
(
fd_entry_it
=
fd_entries
.
begin
();
fd_entry_it
!=
fd_entries
.
end
();
++
fd_entry_it
)
if
(
fd_entry_it
->
fd
>
maxfd
)
maxfd
=
fd_entry_it
->
fd
;
}
// Decrease the load metric of the thread.
retired
=
true
;
#endif
adjust_load
(
-
1
);
}
void
zmq
::
select_t
::
set_pollin
(
handle_t
handle_
)
{
FD_SET
(
handle_
,
&
source_set_in
);
#if defined ZMQ_HAVE_WINDOWS
u_short
family
=
get_fd_family
(
handle_
);
wsa_assert
(
family
!=
AF_UNSPEC
);
FD_SET
(
handle_
,
&
family_entries
[
family
].
fds_set
.
read
);
#else
FD_SET
(
handle_
,
&
fds_set
.
read
);
#endif
}
void
zmq
::
select_t
::
reset_pollin
(
handle_t
handle_
)
{
FD_CLR
(
handle_
,
&
source_set_in
);
#if defined ZMQ_HAVE_WINDOWS
u_short
family
=
get_fd_family
(
handle_
);
wsa_assert
(
family
!=
AF_UNSPEC
);
FD_CLR
(
handle_
,
&
family_entries
[
family
].
fds_set
.
read
);
#else
FD_CLR
(
handle_
,
&
fds_set
.
read
);
#endif
}
void
zmq
::
select_t
::
set_pollout
(
handle_t
handle_
)
{
FD_SET
(
handle_
,
&
source_set_out
);
#if defined ZMQ_HAVE_WINDOWS
u_short
family
=
get_fd_family
(
handle_
);
wsa_assert
(
family
!=
AF_UNSPEC
);
FD_SET
(
handle_
,
&
family_entries
[
family
].
fds_set
.
write
);
#else
FD_SET
(
handle_
,
&
fds_set
.
write
);
#endif
}
void
zmq
::
select_t
::
reset_pollout
(
handle_t
handle_
)
{
FD_CLR
(
handle_
,
&
source_set_out
);
#if defined ZMQ_HAVE_WINDOWS
u_short
family
=
get_fd_family
(
handle_
);
wsa_assert
(
family
!=
AF_UNSPEC
);
FD_CLR
(
handle_
,
&
family_entries
[
family
].
fds_set
.
write
);
#else
FD_CLR
(
handle_
,
&
fds_set
.
write
);
#endif
}
void
zmq
::
select_t
::
start
()
...
...
@@ -163,61 +214,181 @@ int zmq::select_t::max_fds ()
void
zmq
::
select_t
::
loop
()
{
while
(
!
stopping
)
{
// Execute any due timers.
int
timeout
=
(
int
)
execute_timers
();
// Intialise the pollsets.
memcpy
(
&
readfds
,
&
source_set_in
,
sizeof
source_set_in
);
memcpy
(
&
writefds
,
&
source_set_out
,
sizeof
source_set_out
);
memcpy
(
&
exceptfds
,
&
source_set_err
,
sizeof
source_set_err
);
// Wait for events.
#ifdef ZMQ_HAVE_OSX
struct
timeval
tv
=
{(
long
)
(
timeout
/
1000
),
timeout
%
1000
*
1000
};
#if defined ZMQ_HAVE_OSX
struct
timeval
tv
=
{
(
long
)
(
timeout
/
1000
),
timeout
%
1000
*
1000
};
#else
struct
timeval
tv
=
{(
long
)
(
timeout
/
1000
),
(
long
)
(
timeout
%
1000
*
1000
)};
struct
timeval
tv
=
{
(
long
)
(
timeout
/
1000
),
(
long
)
(
timeout
%
1000
*
1000
)
};
#endif
#ifdef ZMQ_HAVE_WINDOWS
int
rc
=
select
(
0
,
&
readfds
,
&
writefds
,
&
exceptfds
,
timeout
?
&
tv
:
NULL
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
int
rc
=
0
;
#if defined ZMQ_HAVE_WINDOWS
/*
On Windows select does not allow to mix descriptors from different
service providers. It seems to work for AF_INET and AF_INET6,
but fails for AF_INET and VMCI. The workaround is to use
WSAEventSelect and WSAWaitForMultipleEvents to wait, then use
select to find out what actually changed. WSAWaitForMultipleEvents
cannot be used alone, because it does not support more than 64 events
which is not enough.
To reduce unncessary overhead, WSA is only used when there are more
than one family. Moreover, AF_INET and AF_INET6 are considered the same
family because Windows seems to handle them properly.
See get_fd_family for details.
*/
wsa_events_t
wsa_events
;
// If there is just one family, there is no reason to use WSA events.
if
(
family_entries
.
size
()
>
1
)
{
for
(
family_entries_t
::
iterator
family_entry_it
=
family_entries
.
begin
();
family_entry_it
!=
family_entries
.
end
();
++
family_entry_it
)
{
family_entry_t
&
family_entry
=
family_entry_it
->
second
;
for
(
fd_entries_t
::
iterator
fd_entry_it
=
family_entry
.
fd_entries
.
begin
();
fd_entry_it
!=
family_entry
.
fd_entries
.
end
();
++
fd_entry_it
)
{
fd_t
fd
=
fd_entry_it
->
fd
;
// http://stackoverflow.com/q/35043420/188530
if
(
FD_ISSET
(
fd
,
&
family_entry
.
fds_set
.
read
)
&&
FD_ISSET
(
fd
,
&
family_entry
.
fds_set
.
write
))
rc
=
WSAEventSelect
(
fd
,
wsa_events
.
events
[
3
],
FD_READ
|
FD_ACCEPT
|
FD_CLOSE
|
FD_WRITE
|
FD_CONNECT
|
FD_OOB
);
else
if
(
FD_ISSET
(
fd
,
&
family_entry
.
fds_set
.
read
))
rc
=
WSAEventSelect
(
fd
,
wsa_events
.
events
[
0
],
FD_READ
|
FD_ACCEPT
|
FD_CLOSE
|
FD_OOB
);
else
if
(
FD_ISSET
(
fd
,
&
family_entry
.
fds_set
.
write
))
rc
=
WSAEventSelect
(
fd
,
wsa_events
.
events
[
1
],
FD_WRITE
|
FD_CONNECT
|
FD_OOB
);
else
if
(
FD_ISSET
(
fd
,
&
family_entry
.
fds_set
.
error
))
rc
=
WSAEventSelect
(
fd
,
wsa_events
.
events
[
2
],
FD_OOB
);
else
rc
=
0
;
wsa_assert
(
rc
!=
SOCKET_ERROR
);
}
}
}
#endif
#if defined ZMQ_HAVE_WINDOWS
if
(
family_entries
.
size
()
>
1
)
{
rc
=
WSAWaitForMultipleEvents
(
4
,
wsa_events
.
events
,
FALSE
,
timeout
?
timeout
:
INFINITE
,
FALSE
);
wsa_assert
(
rc
!=
WSA_WAIT_FAILED
);
zmq_assert
(
rc
!=
WSA_WAIT_IO_COMPLETION
);
if
(
rc
==
WSA_WAIT_TIMEOUT
)
continue
;
}
for
(
current_family_entry_it
=
family_entries
.
begin
();
current_family_entry_it
!=
family_entries
.
end
();
++
current_family_entry_it
)
{
family_entry_t
&
family_entry
=
current_family_entry_it
->
second
;
// select will fail when run with empty sets.
if
(
family_entry
.
fd_entries
.
empty
())
continue
;
fds_set_t
local_fds_set
=
family_entry
.
fds_set
;
if
(
family_entries
.
size
()
>
1
)
{
// There is no reason to wait again after WSAWaitForMultipleEvents.
// Simply collect what is ready.
struct
timeval
tv_nodelay
=
{
0
,
0
};
rc
=
select
(
0
,
&
local_fds_set
.
read
,
&
local_fds_set
.
write
,
&
local_fds_set
.
error
,
&
tv_nodelay
);
}
else
rc
=
select
(
0
,
&
local_fds_set
.
read
,
&
local_fds_set
.
write
,
&
local_fds_set
.
error
,
timeout
>
0
?
&
tv
:
NULL
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
// Size is cached to avoid iteration through recently added descriptors.
for
(
fd_entries_t
::
size_type
i
=
0
,
size
=
family_entry
.
fd_entries
.
size
();
i
<
size
&&
rc
>
0
;
++
i
)
{
fd_entry_t
&
fd_entry
=
family_entry
.
fd_entries
[
i
];
if
(
fd_entry
.
fd
==
retired_fd
)
continue
;
if
(
FD_ISSET
(
fd_entry
.
fd
,
&
local_fds_set
.
read
))
{
fd_entry
.
events
->
in_event
();
--
rc
;
}
if
(
fd_entry
.
fd
==
retired_fd
||
rc
==
0
)
continue
;
if
(
FD_ISSET
(
fd_entry
.
fd
,
&
local_fds_set
.
write
))
{
fd_entry
.
events
->
out_event
();
--
rc
;
}
if
(
fd_entry
.
fd
==
retired_fd
||
rc
==
0
)
continue
;
if
(
FD_ISSET
(
fd_entry
.
fd
,
&
local_fds_set
.
error
))
{
fd_entry
.
events
->
in_event
();
--
rc
;
}
}
if
(
family_entry
.
retired
)
{
family_entry
.
retired
=
false
;
family_entry
.
fd_entries
.
erase
(
std
::
remove_if
(
family_entry
.
fd_entries
.
begin
(),
family_entry
.
fd_entries
.
end
(),
is_retired_fd
),
family_entry
.
fd_entries
.
end
());
}
}
#else
int
rc
=
select
(
maxfd
+
1
,
&
readfds
,
&
writefds
,
&
exceptfds
,
timeout
?
&
tv
:
NULL
);
fds_set_t
local_fds_set
=
fds_set
;
rc
=
select
(
maxfd
+
1
,
&
local_fds_set
.
read
,
&
local_fds_set
.
write
,
&
local_fds_set
.
error
,
timeout
?
&
tv
:
NULL
);
if
(
rc
==
-
1
)
{
errno_assert
(
errno
==
EINTR
);
continue
;
}
#endif
// If there are no events (i.e. it's a timeout) there's no point
// in checking the pollset.
if
(
rc
==
0
)
continue
;
// Size is cached to avoid iteration through just added descriptors.
for
(
fd_entries_t
::
size_type
i
=
0
,
size
=
fd_entries
.
size
();
i
<
size
&&
rc
>
0
;
++
i
)
{
fd_entry_t
&
fd_entry
=
fd_entries
[
i
];
for
(
fd_set_t
::
size_type
i
=
0
;
i
<
fds
.
size
();
i
++
)
{
if
(
fds
[
i
].
fd
==
retired_fd
)
if
(
fd_entry
.
fd
==
retired_fd
)
continue
;
if
(
FD_ISSET
(
fds
[
i
].
fd
,
&
exceptfds
))
fds
[
i
].
events
->
in_event
();
if
(
fds
[
i
].
fd
==
retired_fd
)
if
(
FD_ISSET
(
fd_entry
.
fd
,
&
local_fds_set
.
read
))
{
fd_entry
.
events
->
in_event
();
--
rc
;
}
if
(
fd_entry
.
fd
==
retired_fd
||
rc
==
0
)
continue
;
if
(
FD_ISSET
(
fds
[
i
].
fd
,
&
writefds
))
fds
[
i
].
events
->
out_event
();
if
(
fds
[
i
].
fd
==
retired_fd
)
if
(
FD_ISSET
(
fd_entry
.
fd
,
&
local_fds_set
.
write
))
{
fd_entry
.
events
->
out_event
();
--
rc
;
}
if
(
fd_entry
.
fd
==
retired_fd
||
rc
==
0
)
continue
;
if
(
FD_ISSET
(
fds
[
i
].
fd
,
&
readfds
))
fds
[
i
].
events
->
in_event
();
if
(
FD_ISSET
(
fd_entry
.
fd
,
&
local_fds_set
.
error
))
{
fd_entry
.
events
->
in_event
();
--
rc
;
}
}
// Destroy retired event sources.
if
(
retired
)
{
fds
.
erase
(
std
::
remove_if
(
fds
.
begin
(),
fds
.
end
(),
zmq
::
select_t
::
is_retired_fd
),
fds
.
end
());
retired
=
false
;
fd_entries
.
erase
(
std
::
remove_if
(
fd_entries
.
begin
(),
fd_entries
.
end
(),
is_retired_fd
),
fd_entries
.
end
());
}
#endif
}
}
...
...
@@ -226,9 +397,80 @@ void zmq::select_t::worker_routine (void *arg_)
((
select_t
*
)
arg_
)
->
loop
();
}
zmq
::
select_t
::
fds_set_t
::
fds_set_t
()
{
FD_ZERO
(
&
read
);
FD_ZERO
(
&
write
);
FD_ZERO
(
&
error
);
}
zmq
::
select_t
::
fds_set_t
::
fds_set_t
(
const
fds_set_t
&
other_
)
{
memcpy
(
&
read
,
&
other_
.
read
,
sizeof
other_
.
read
);
memcpy
(
&
write
,
&
other_
.
write
,
sizeof
other_
.
write
);
memcpy
(
&
error
,
&
other_
.
error
,
sizeof
other_
.
error
);
}
zmq
::
select_t
::
fds_set_t
&
zmq
::
select_t
::
fds_set_t
::
operator
=
(
const
fds_set_t
&
other_
)
{
memcpy
(
&
read
,
&
other_
.
read
,
sizeof
other_
.
read
);
memcpy
(
&
write
,
&
other_
.
write
,
sizeof
other_
.
write
);
memcpy
(
&
error
,
&
other_
.
error
,
sizeof
other_
.
error
);
return
*
this
;
}
void
zmq
::
select_t
::
fds_set_t
::
remove_fd
(
const
fd_t
&
fd_
)
{
FD_CLR
(
fd_
,
&
read
);
FD_CLR
(
fd_
,
&
write
);
FD_CLR
(
fd_
,
&
error
);
}
bool
zmq
::
select_t
::
is_retired_fd
(
const
fd_entry_t
&
entry
)
{
return
(
entry
.
fd
==
retired_fd
);
}
#if defined ZMQ_HAVE_WINDOWS
u_short
zmq
::
select_t
::
get_fd_family
(
fd_t
fd_
)
{
sockaddr
addr
{
0
};
int
addr_size
=
sizeof
addr
;
int
rc
=
getsockname
(
fd_
,
&
addr
,
&
addr_size
);
// AF_INET and AF_INET6 can be mixed in select
// TODO: If proven otherwise, should simply return addr.sa_family
if
(
rc
!=
SOCKET_ERROR
)
return
addr
.
sa_family
==
AF_INET6
?
AF_INET
:
addr
.
sa_family
;
else
return
AF_UNSPEC
;
}
zmq
::
select_t
::
family_entry_t
::
family_entry_t
()
:
retired
(
false
)
{
}
zmq
::
select_t
::
wsa_events_t
::
wsa_events_t
()
{
events
[
0
]
=
WSACreateEvent
();
wsa_assert
(
events
[
0
]
!=
WSA_INVALID_EVENT
);
events
[
1
]
=
WSACreateEvent
();
wsa_assert
(
events
[
1
]
!=
WSA_INVALID_EVENT
);
events
[
2
]
=
WSACreateEvent
();
wsa_assert
(
events
[
2
]
!=
WSA_INVALID_EVENT
);
events
[
3
]
=
WSACreateEvent
();
wsa_assert
(
events
[
3
]
!=
WSA_INVALID_EVENT
);
}
zmq
::
select_t
::
wsa_events_t
::~
wsa_events_t
()
{
wsa_assert
(
WSACloseEvent
(
events
[
0
]));
wsa_assert
(
WSACloseEvent
(
events
[
1
]));
wsa_assert
(
WSACloseEvent
(
events
[
2
]));
wsa_assert
(
WSACloseEvent
(
events
[
3
]));
}
#endif
#endif
src/select.hpp
View file @
3d9984ef
...
...
@@ -38,9 +38,10 @@
#include <stddef.h>
#include <vector>
#include <map>
#if
def
ZMQ_HAVE_WINDOWS
#include
<winsock2.h>
#if
defined
ZMQ_HAVE_WINDOWS
#include
"windows.hpp"
#elif defined ZMQ_HAVE_OPENVMS
#include <sys/types.h>
#include <sys/time.h>
...
...
@@ -90,36 +91,68 @@ namespace zmq
// Main event loop.
void
loop
();
// Reference to ZMQ context.
//
Reference to ZMQ context.
const
ctx_t
&
ctx
;
// Internal state.
struct
fds_set_t
{
fds_set_t
();
fds_set_t
(
const
fds_set_t
&
other_
);
fds_set_t
&
operator
=
(
const
fds_set_t
&
other_
);
// Convinient method to descriptor from all sets.
void
remove_fd
(
const
fd_t
&
fd_
);
fd_set
read
;
fd_set
write
;
fd_set
error
;
};
struct
fd_entry_t
{
fd_t
fd
;
zmq
::
i_poll_events
*
events
;
zmq
::
i_poll_events
*
events
;
};
typedef
std
::
vector
<
fd_entry_t
>
fd_entries_t
;
// Checks if an fd_entry_t is retired.
static
bool
is_retired_fd
(
const
fd_entry_t
&
entry
);
#if defined ZMQ_HAVE_WINDOWS
struct
family_entry_t
{
family_entry_t
();
// Set of file descriptors that are used to retrieve
// information for fd_set.
typedef
std
::
vector
<
fd_entry_t
>
fd_set_t
;
fd_set_t
fds
;
fd_entries_t
fd_entries
;
fds_set_t
fds_set
;
bool
retired
;
};
typedef
std
::
map
<
u_short
,
family_entry_t
>
family_entries_t
;
fd_set
source_set_in
;
fd_set
source_set_out
;
fd_set
source_set_err
;
struct
wsa_events_t
{
wsa_events_t
();
~
wsa_events_t
();
fd_set
readfds
;
fd_set
writefds
;
fd_set
exceptfds
;
// read, write, error and readwrite
WSAEVENT
events
[
4
];
};
#endif
// Maximum file descriptor.
#if defined ZMQ_HAVE_WINDOWS
family_entries_t
family_entries
;
// See loop for details.
family_entries_t
::
iterator
current_family_entry_it
;
#else
fd_entries_t
fd_entries
;
fds_set_t
fds_set
;
fd_t
maxfd
;
// If true, at least one file descriptor has retired.
bool
retired
;
#endif
#if defined ZMQ_HAVE_WINDOWS
// Socket's family or AF_UNSPEC on error.
static
u_short
get_fd_family
(
fd_t
fd_
);
#endif
// Checks if an fd_entry_t is retired.
static
bool
is_retired_fd
(
const
fd_entry_t
&
entry
);
// If true, thread is shutting down.
bool
stopping
;
...
...
src/vmci.cpp
View file @
3d9984ef
...
...
@@ -40,7 +40,7 @@ void zmq::tune_vmci_buffer_size (ctx_t *context_, fd_t sockfd_, uint64_t default
assert
(
family
!=
-
1
);
if
(
default_size_
!=
0
)
{
int
rc
=
setsockopt
(
sockfd_
,
family
,
SO_VMCI_BUFFER_SIZE
,
&
default_size_
,
sizeof
default_size_
);
int
rc
=
setsockopt
(
sockfd_
,
family
,
SO_VMCI_BUFFER_SIZE
,
(
char
*
)
&
default_size_
,
sizeof
default_size_
);
#if defined ZMQ_HAVE_WINDOWS
wsa_assert
(
rc
!=
SOCKET_ERROR
);
#else
...
...
@@ -49,7 +49,7 @@ void zmq::tune_vmci_buffer_size (ctx_t *context_, fd_t sockfd_, uint64_t default
}
if
(
min_size_
!=
0
)
{
int
rc
=
setsockopt
(
sockfd_
,
family
,
SO_VMCI_BUFFER_SIZE
,
&
min_size_
,
sizeof
min_size_
);
int
rc
=
setsockopt
(
sockfd_
,
family
,
SO_VMCI_BUFFER_SIZE
,
(
char
*
)
&
min_size_
,
sizeof
min_size_
);
#if defined ZMQ_HAVE_WINDOWS
wsa_assert
(
rc
!=
SOCKET_ERROR
);
#else
...
...
@@ -58,7 +58,7 @@ void zmq::tune_vmci_buffer_size (ctx_t *context_, fd_t sockfd_, uint64_t default
}
if
(
max_size_
!=
0
)
{
int
rc
=
setsockopt
(
sockfd_
,
family
,
SO_VMCI_BUFFER_SIZE
,
&
max_size_
,
sizeof
max_size_
);
int
rc
=
setsockopt
(
sockfd_
,
family
,
SO_VMCI_BUFFER_SIZE
,
(
char
*
)
&
max_size_
,
sizeof
max_size_
);
#if defined ZMQ_HAVE_WINDOWS
wsa_assert
(
rc
!=
SOCKET_ERROR
);
#else
...
...
@@ -76,7 +76,7 @@ void zmq::tune_vmci_connect_timeout (ctx_t *context_, fd_t sockfd_, struct timev
int
family
=
context_
->
get_vmci_socket_family
();
assert
(
family
!=
-
1
);
int
rc
=
setsockopt
(
sockfd_
,
family
,
SO_VMCI_CONNECT_TIMEOUT
,
&
timeout_
,
sizeof
timeout_
);
int
rc
=
setsockopt
(
sockfd_
,
family
,
SO_VMCI_CONNECT_TIMEOUT
,
(
char
*
)
&
timeout_
,
sizeof
timeout_
);
#if defined ZMQ_HAVE_WINDOWS
wsa_assert
(
rc
!=
SOCKET_ERROR
);
#else
...
...
src/vmci_connecter.cpp
View file @
3d9984ef
...
...
@@ -32,7 +32,6 @@
#if defined ZMQ_HAVE_VMCI
#include <new>
#include <string>
#include "stream_engine.hpp"
#include "io_thread.hpp"
...
...
@@ -118,7 +117,8 @@ void zmq::vmci_connecter_t::out_event ()
return
;
}
tune_vmci_buffer_size
(
this
->
get_ctx
(),
fd
,
options
.
vmci_buffer_size
,
options
.
vmci_buffer_min_size
,
options
.
vmci_buffer_max_size
);
tune_vmci_buffer_size
(
this
->
get_ctx
(),
fd
,
options
.
vmci_buffer_size
,
options
.
vmci_buffer_min_size
,
options
.
vmci_buffer_max_size
);
if
(
options
.
vmci_connect_timeout
>
0
)
{
...
...
@@ -218,8 +218,15 @@ int zmq::vmci_connecter_t::open ()
// Create the socket.
s
=
open_socket
(
family
,
SOCK_STREAM
,
0
);
#ifdef ZMQ_HAVE_WINDOWS
if
(
s
==
INVALID_SOCKET
)
{
errno
=
wsa_error_to_errno
(
WSAGetLastError
());
return
-
1
;
}
#else
if
(
s
==
-
1
)
return
-
1
;
#endif
// Set the non-blocking flag.
unblock_socket
(
s
);
...
...
@@ -233,12 +240,18 @@ int zmq::vmci_connecter_t::open ()
if
(
rc
==
0
)
return
0
;
// Translate
other
error codes indicating asynchronous connect has been
// Translate error codes indicating asynchronous connect has been
// launched to a uniform EINPROGRESS.
if
(
rc
==
-
1
&&
errno
==
EINTR
)
{
#ifdef ZMQ_HAVE_WINDOWS
const
int
error_code
=
WSAGetLastError
();
if
(
error_code
==
WSAEINPROGRESS
||
error_code
==
WSAEWOULDBLOCK
)
errno
=
EINPROGRESS
;
return
-
1
;
}
else
errno
=
wsa_error_to_errno
(
error_code
);
#else
if
(
errno
==
EINTR
)
errno
=
EINPROGRESS
;
#endif
// Forward the error.
return
-
1
;
...
...
@@ -269,19 +282,45 @@ zmq::fd_t zmq::vmci_connecter_t::connect ()
socklen_t
len
=
sizeof
(
err
);
#endif
int
rc
=
getsockopt
(
s
,
SOL_SOCKET
,
SO_ERROR
,
(
char
*
)
&
err
,
&
len
);
// Assert if the error was caused by 0MQ bug.
// Networking problems are OK. No need to assert.
#ifdef ZMQ_HAVE_WINDOWS
zmq_assert
(
rc
==
0
);
if
(
err
!=
0
)
{
if
(
err
!=
WSAECONNREFUSED
&&
err
!=
WSAETIMEDOUT
&&
err
!=
WSAECONNABORTED
&&
err
!=
WSAEHOSTUNREACH
&&
err
!=
WSAENETUNREACH
&&
err
!=
WSAENETDOWN
&&
err
!=
WSAEACCES
&&
err
!=
WSAEINVAL
&&
err
!=
WSAEADDRINUSE
&&
err
!=
WSAECONNRESET
)
{
wsa_assert_no
(
err
);
}
return
retired_fd
;
}
#else
// Following code should handle both Berkeley-derived socket
// implementations and Solaris.
if
(
rc
==
-
1
)
err
=
errno
;
if
(
err
!=
0
)
{
// Assert if the error was caused by 0MQ bug.
// Networking problems are OK. No need to assert.
errno
=
err
;
errno_assert
(
errno
==
ECONNREFUSED
||
errno
==
ECONNRESET
||
errno
==
ETIMEDOUT
||
errno
==
EHOSTUNREACH
||
errno
==
ENETUNREACH
||
errno
==
ENETDOWN
);
errno_assert
(
errno
==
ECONNREFUSED
||
errno
==
ECONNRESET
||
errno
==
ETIMEDOUT
||
errno
==
EHOSTUNREACH
||
errno
==
ENETUNREACH
||
errno
==
ENETDOWN
||
errno
==
EINVAL
);
return
retired_fd
;
}
#endif
fd_t
result
=
s
;
s
=
retired_fd
;
...
...
src/vmci_connecter.hpp
View file @
3d9984ef
...
...
@@ -27,8 +27,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __VMCI_CONNECTER_HPP_INCLUDED__
#define __VMCI_CONNECTER_HPP_INCLUDED__
#ifndef __
ZMQ_
VMCI_CONNECTER_HPP_INCLUDED__
#define __
ZMQ_
VMCI_CONNECTER_HPP_INCLUDED__
#include "platform.hpp"
...
...
src/vmci_listener.cpp
View file @
3d9984ef
...
...
@@ -33,8 +33,6 @@
#include <new>
#include <string.h>
#include "stream_engine.hpp"
#include "vmci_address.hpp"
#include "io_thread.hpp"
...
...
@@ -153,20 +151,46 @@ int zmq::vmci_listener_t::set_address (const char *addr_)
// Create a listening socket.
s
=
open_socket
(
this
->
get_ctx
()
->
get_vmci_socket_family
(),
SOCK_STREAM
,
0
);
#ifdef ZMQ_HAVE_WINDOWS
if
(
s
==
INVALID_SOCKET
)
{
errno
=
wsa_error_to_errno
(
WSAGetLastError
());
return
-
1
;
}
#if !defined _WIN32_WCE
// On Windows, preventing sockets to be inherited by child processes.
BOOL
brc
=
SetHandleInformation
((
HANDLE
)
s
,
HANDLE_FLAG_INHERIT
,
0
);
win_assert
(
brc
);
#endif
#else
if
(
s
==
-
1
)
return
-
1
;
#endif
address
.
to_string
(
endpoint
);
// Bind the socket.
rc
=
bind
(
s
,
address
.
addr
(),
address
.
addrlen
());
#ifdef ZMQ_HAVE_WINDOWS
if
(
rc
==
SOCKET_ERROR
)
{
errno
=
wsa_error_to_errno
(
WSAGetLastError
());
goto
error
;
}
#else
if
(
rc
!=
0
)
goto
error
;
#endif
// Listen for incoming connections.
rc
=
listen
(
s
,
options
.
backlog
);
#ifdef ZMQ_HAVE_WINDOWS
if
(
rc
==
SOCKET_ERROR
)
{
errno
=
wsa_error_to_errno
(
WSAGetLastError
());
goto
error
;
}
#else
if
(
rc
!=
0
)
goto
error
;
#endif
socket
->
event_listening
(
endpoint
,
s
);
return
0
;
...
...
@@ -199,12 +223,29 @@ zmq::fd_t zmq::vmci_listener_t::accept ()
// resources is considered valid and treated by ignoring the connection.
zmq_assert
(
s
!=
retired_fd
);
fd_t
sock
=
::
accept
(
s
,
NULL
,
NULL
);
#ifdef ZMQ_HAVE_WINDOWS
if
(
sock
==
INVALID_SOCKET
)
{
wsa_assert
(
WSAGetLastError
()
==
WSAEWOULDBLOCK
||
WSAGetLastError
()
==
WSAECONNRESET
||
WSAGetLastError
()
==
WSAEMFILE
||
WSAGetLastError
()
==
WSAENOBUFS
);
return
retired_fd
;
}
#if !defined _WIN32_WCE
// On Windows, preventing sockets to be inherited by child processes.
BOOL
brc
=
SetHandleInformation
((
HANDLE
)
sock
,
HANDLE_FLAG_INHERIT
,
0
);
win_assert
(
brc
);
#endif
#else
if
(
sock
==
-
1
)
{
errno_assert
(
errno
==
EAGAIN
||
errno
==
EWOULDBLOCK
||
errno_assert
(
errno
==
EAGAIN
||
errno
==
EWOULDBLOCK
||
errno
==
EINTR
||
errno
==
ECONNABORTED
||
errno
==
EPROTO
||
errno
==
ENOBUFS
||
errno
==
ENOMEM
||
errno
==
EMFILE
||
errno
==
ENFILE
);
return
retired_fd
;
}
#endif
// Race condition can cause socket not to be closed (if fork happens
// between accept and this point).
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment