Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
f9d7eea6
Commit
f9d7eea6
authored
Oct 26, 2017
by
sigiesec
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Problem: code duplication
Solution: unified Windows & non-Windows code further
parent
e7817ad3
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
147 additions
and
151 deletions
+147
-151
select.cpp
src/select.cpp
+52
-57
select.hpp
src/select.hpp
+95
-94
No files found.
src/select.cpp
View file @
f9d7eea6
...
...
@@ -54,7 +54,6 @@ zmq::select_t::select_t (const zmq::ctx_t &ctx_) :
current_family_entry_it
(
family_entries
.
end
()),
#else
maxfd
(
retired_fd
),
retired
(
false
),
#endif
stopping
(
false
)
{
...
...
@@ -79,12 +78,11 @@ zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
u_short
family
=
get_fd_family
(
fd_
);
wsa_assert
(
family
!=
AF_UNSPEC
);
family_entry_t
&
family_entry
=
family_entries
[
family
];
#endif
family_entry
.
fd_entries
.
push_back
(
fd_entry
);
FD_SET
(
fd_
,
&
family_entry
.
fds_set
.
error
);
#else
fd_entries
.
push_back
(
fd_entry
);
FD_SET
(
fd_
,
&
fds_set
.
error
);
#if !defined ZMQ_HAVE_WINDOWS
if
(
fd_
>
maxfd
)
maxfd
=
fd_
;
#endif
...
...
@@ -147,6 +145,7 @@ void zmq::select_t::trigger_events (const fd_entries_t &fd_entries_,
}
}
#if defined ZMQ_HAVE_WINDOWS
bool
zmq
::
select_t
::
try_remove_fd_entry
(
family_entries_t
::
iterator
family_entry_it
,
zmq
::
fd_t
&
handle_
)
{
...
...
@@ -170,6 +169,7 @@ bool zmq::select_t::try_remove_fd_entry (
family_entry
.
fds_set
.
remove_fd
(
handle_
);
return
true
;
}
#endif
void
zmq
::
select_t
::
rm_fd
(
handle_t
handle_
)
{
...
...
@@ -195,21 +195,21 @@ void zmq::select_t::rm_fd (handle_t handle_)
}
#else
fd_entries_t
::
iterator
fd_entry_it
=
find_fd_entry_by_handle
(
fd_entries
,
handle_
);
find_fd_entry_by_handle
(
f
amily_entry
.
f
d_entries
,
handle_
);
assert
(
fd_entry_it
!=
fd_entries
.
end
());
fd_entry_it
->
fd
=
retired_fd
;
fds_set
.
remove_fd
(
handle_
);
f
amily_entry
.
f
ds_set
.
remove_fd
(
handle_
);
if
(
handle_
==
maxfd
)
{
maxfd
=
retired_fd
;
for
(
fd_entry_it
=
fd_entries
.
begin
();
fd_entry_it
!=
fd_entries
.
end
();
++
fd_entry_it
)
for
(
fd_entry_it
=
f
amily_entry
.
f
d_entries
.
begin
();
fd_entry_it
!=
f
amily_entry
.
f
d_entries
.
end
();
++
fd_entry_it
)
if
(
fd_entry_it
->
fd
>
maxfd
)
maxfd
=
fd_entry_it
->
fd
;
}
retired
=
true
;
family_entry
.
retired
=
true
;
#endif
adjust_load
(
-
1
);
}
...
...
@@ -219,10 +219,9 @@ void zmq::select_t::set_pollin (handle_t handle_)
#if defined ZMQ_HAVE_WINDOWS
u_short
family
=
get_fd_family
(
handle_
);
wsa_assert
(
family
!=
AF_UNSPEC
);
FD_SET
(
handle_
,
&
family_entries
[
family
].
fds_set
.
read
);
#else
FD_SET
(
handle_
,
&
fds_set
.
read
);
family_entry_t
&
family_entry
=
family_entries
[
family
];
#endif
FD_SET
(
handle_
,
&
family_entry
.
fds_set
.
read
);
}
void
zmq
::
select_t
::
reset_pollin
(
handle_t
handle_
)
...
...
@@ -230,10 +229,9 @@ void zmq::select_t::reset_pollin (handle_t handle_)
#if defined ZMQ_HAVE_WINDOWS
u_short
family
=
get_fd_family
(
handle_
);
wsa_assert
(
family
!=
AF_UNSPEC
);
FD_CLR
(
handle_
,
&
family_entries
[
family
].
fds_set
.
read
);
#else
FD_CLR
(
handle_
,
&
fds_set
.
read
);
family_entry_t
&
family_entry
=
family_entries
[
family
];
#endif
FD_CLR
(
handle_
,
&
family_entry
.
fds_set
.
read
);
}
void
zmq
::
select_t
::
set_pollout
(
handle_t
handle_
)
...
...
@@ -241,10 +239,9 @@ void zmq::select_t::set_pollout (handle_t handle_)
#if defined ZMQ_HAVE_WINDOWS
u_short
family
=
get_fd_family
(
handle_
);
wsa_assert
(
family
!=
AF_UNSPEC
);
FD_SET
(
handle_
,
&
family_entries
[
family
].
fds_set
.
write
);
#else
FD_SET
(
handle_
,
&
fds_set
.
write
);
family_entry_t
&
family_entry
=
family_entries
[
family
];
#endif
FD_SET
(
handle_
,
&
family_entry
.
fds_set
.
write
);
}
void
zmq
::
select_t
::
reset_pollout
(
handle_t
handle_
)
...
...
@@ -252,10 +249,9 @@ void zmq::select_t::reset_pollout (handle_t handle_)
#if defined ZMQ_HAVE_WINDOWS
u_short
family
=
get_fd_family
(
handle_
);
wsa_assert
(
family
!=
AF_UNSPEC
);
FD_CLR
(
handle_
,
&
family_entries
[
family
].
fds_set
.
write
);
#else
FD_CLR
(
handle_
,
&
fds_set
.
write
);
family_entry_t
&
family_entry
=
family_entries
[
family
];
#endif
FD_CLR
(
handle_
,
&
family_entry
.
fds_set
.
write
);
}
void
zmq
::
select_t
::
start
()
...
...
@@ -364,54 +360,53 @@ void zmq::select_t::loop ()
++
current_family_entry_it
)
{
family_entry_t
&
family_entry
=
current_family_entry_it
->
second
;
// select will fail when run with empty sets.
fd_entries_t
&
fd_entries
=
family_entry
.
fd_entries
;
if
(
fd_entries
.
empty
())
continue
;
fds_set_t
local_fds_set
=
family_entry
.
fds_set
;
if
(
use_wsa_events
)
{
// There is no reason to wait again after WSAWaitForMultipleEvents.
// Simply collect what is ready.
struct
timeval
tv_nodelay
=
{
0
,
0
};
rc
=
select
(
0
,
&
local_fds_set
.
read
,
&
local_fds_set
.
write
,
&
local_fds_set
.
error
,
&
tv_nodelay
);
}
else
rc
=
select
(
0
,
&
local_fds_set
.
read
,
&
local_fds_set
.
write
,
&
local_fds_set
.
error
,
timeout
>
0
?
&
tv
:
NULL
);
wsa_assert
(
rc
!=
SOCKET_ERROR
);
trigger_events
(
fd_entries
,
local_fds_set
,
rc
);
if
(
family_entry
.
retired
)
{
family_entry
.
retired
=
false
;
family_entry
.
fd_entries
.
erase
(
std
::
remove_if
(
fd_entries
.
begin
(),
fd_entries
.
end
(),
is_retired_fd
),
family_entry
.
fd_entries
.
end
());
select_family_entry
(
family_entry
,
0
,
true
,
tv_nodelay
);
}
else
{
select_family_entry
(
family_entry
,
0
,
timeout
>
0
,
tv
);
}
}
#else
fds_set_t
local_fds_set
=
fds_set
;
rc
=
select
(
maxfd
+
1
,
&
local_fds_set
.
read
,
&
local_fds_set
.
write
,
&
local_fds_set
.
error
,
timeout
?
&
tv
:
NULL
);
select_family_entry
(
family_entry
,
maxfd
,
timeout
>
0
,
tv
);
#endif
}
}
if
(
rc
==
-
1
)
{
errno_assert
(
errno
==
EINTR
);
continue
;
}
void
zmq
::
select_t
::
select_family_entry
(
family_entry_t
&
family_entry_
,
const
int
max_fd_
,
const
bool
use_timeout_
,
struct
timeval
&
tv_
)
{
// select will fail when run with empty sets.
fd_entries_t
&
fd_entries
=
family_entry_
.
fd_entries
;
if
(
fd_entries
.
empty
())
return
;
trigger_events
(
fd_entries
,
local_fds_set
,
rc
);
fds_set_t
local_fds_set
=
family_entry_
.
fds_set
;
int
rc
=
select
(
max_fd_
,
&
local_fds_set
.
read
,
&
local_fds_set
.
write
,
&
local_fds_set
.
error
,
use_timeout_
?
&
tv_
:
NULL
);
if
(
retired
)
{
retired
=
false
;
fd_entries
.
erase
(
std
::
remove_if
(
fd_entries
.
begin
(),
fd_entries
.
end
(),
is_retired_fd
),
fd_entries
.
end
());
}
#if defined ZMQ_HAVE_WINDOWS
wsa_assert
(
rc
!=
SOCKET_ERROR
);
#else
if
(
rc
==
-
1
)
{
errno_assert
(
errno
==
EINTR
);
return
;
}
#endif
trigger_events
(
fd_entries
,
local_fds_set
,
rc
);
if
(
family_entry_
.
retired
)
{
family_entry_
.
retired
=
false
;
family_entry_
.
fd_entries
.
erase
(
std
::
remove_if
(
fd_entries
.
begin
(),
fd_entries
.
end
(),
is_retired_fd
),
family_entry_
.
fd_entries
.
end
());
}
}
...
...
src/select.hpp
View file @
f9d7eea6
...
...
@@ -53,129 +53,130 @@
namespace
zmq
{
struct
i_poll_events
;
struct
i_poll_events
;
// Implements socket polling mechanism using POSIX.1-2001 select()
// function.
// Implements socket polling mechanism using POSIX.1-2001 select()
// function.
class
select_t
:
public
poller_base_t
{
public
:
typedef
fd_t
handle_t
;
class
select_t
:
public
poller_base_t
{
public
:
select_t
(
const
ctx_t
&
ctx_
);
~
select_t
();
typedef
fd_t
handle_t
;
// "poller" concept.
handle_t
add_fd
(
fd_t
fd_
,
zmq
::
i_poll_events
*
events_
);
void
rm_fd
(
handle_t
handle_
);
void
set_pollin
(
handle_t
handle_
);
void
reset_pollin
(
handle_t
handle_
);
void
set_pollout
(
handle_t
handle_
);
void
reset_pollout
(
handle_t
handle_
);
void
start
();
void
stop
();
select_t
(
const
ctx_t
&
ctx_
);
~
select_t
();
static
int
max_fds
();
// "poller" concept.
handle_t
add_fd
(
fd_t
fd_
,
zmq
::
i_poll_events
*
events_
);
void
rm_fd
(
handle_t
handle_
);
void
set_pollin
(
handle_t
handle_
);
void
reset_pollin
(
handle_t
handle_
);
void
set_pollout
(
handle_t
handle_
);
void
reset_pollout
(
handle_t
handle_
);
void
start
();
void
stop
();
private
:
// Main worker thread routine.
static
void
worker_routine
(
void
*
arg_
);
static
int
max_fds
();
// Main event loop.
void
loop
();
private
:
// Reference to ZMQ context.
const
ctx_t
&
ctx
;
// Main worker thread routine.
static
void
worker_routine
(
void
*
arg_
);
// Internal state.
struct
fds_set_t
{
fds_set_t
();
fds_set_t
(
const
fds_set_t
&
other_
);
fds_set_t
&
operator
=
(
const
fds_set_t
&
other_
);
// Convenience method to descriptor from all sets.
void
remove_fd
(
const
fd_t
&
fd_
);
fd_set
read
;
fd_set
write
;
fd_set
error
;
};
// Main event loop.
void
loop
();
struct
fd_entry_t
{
fd_t
fd
;
zmq
::
i_poll_events
*
events
;
};
typedef
std
::
vector
<
fd_entry_t
>
fd_entries_t
;
// Reference to ZMQ context.
const
ctx_t
&
ctx
;
void
trigger_events
(
const
fd_entries_t
&
fd_entries_
,
const
fds_set_t
&
local_fds_set_
,
int
event_count_
);
// Internal state.
struct
fds_set_t
{
fds_set_t
();
fds_set_t
(
const
fds_set_t
&
other_
);
fds_set_t
&
operator
=
(
const
fds_set_t
&
other_
);
// Convenience method to descriptor from all sets.
void
remove_fd
(
const
fd_t
&
fd_
);
struct
family_entry_t
{
family_entry_t
();
fd_set
read
;
fd_set
write
;
fd_set
error
;
};
fd_entries_t
fd_entries
;
fds_set_t
fds_set
;
bool
retired
;
};
struct
fd_entry_t
{
fd_t
fd
;
zmq
::
i_poll_events
*
events
;
};
typedef
std
::
vector
<
fd_entry_t
>
fd_entries_t
;
void
select_family_entry
(
family_entry_t
&
family_entry_
,
int
max_fd_
,
bool
use_timeout_
,
struct
timeval
&
tv_
);
#if defined ZMQ_HAVE_WINDOWS
struct
family_entry_t
{
family_entry_t
();
fd_entries_t
fd_entries
;
fds_set_t
fds_set
;
bool
retired
;
};
typedef
std
::
map
<
u_short
,
family_entry_t
>
family_entries_t
;
struct
wsa_events_t
{
wsa_events_t
();
~
wsa_events_t
();
// read, write, error and readwrite
WSAEVENT
events
[
4
];
};
#endif
typedef
std
::
map
<
u_short
,
family_entry_t
>
family_entries_t
;
void
trigger_events
(
const
fd_entries_t
&
fd_entries_
,
const
fds_set_t
&
local_fds_set_
,
int
event_count_
);
struct
wsa_events_t
{
wsa_events_t
();
~
wsa_events_t
();
#if defined ZMQ_HAVE_WINDOWS
family_entries_t
family_entries
;
// See loop for details.
family_entries_t
::
iterator
current_family_entry_it
;
// read, write, error and readwrite
WSAEVENT
events
[
4
];
};
bool
try_remove_fd_entry
(
family_entries_t
::
iterator
family_entry_it
,
zmq
::
fd_t
&
handle_
);
#else
fd_entries_t
fd_entries
;
fds_set_t
fds_set
;
fd_t
maxfd
;
bool
retired
;
#endif
family_entries_t
family_entries
;
// See loop for details.
family_entries_t
::
iterator
current_family_entry_it
;
#if defined ZMQ_HAVE_WINDOWS
static
const
size_t
fd_family_cache_size
=
8
;
std
::
pair
<
fd_t
,
u_short
>
fd_family_cache
[
fd_family_cache_size
];
bool
try_remove_fd_entry
(
family_entries_t
::
iterator
family_entry_it
,
zmq
::
fd_t
&
handle_
);
u_short
get_fd_family
(
fd_t
fd_
);
static
const
size_t
fd_family_cache_size
=
8
;
std
::
pair
<
fd_t
,
u_short
>
fd_family_cache
[
fd_family_cache_size
];
// Socket's family or AF_UNSPEC on error.
static
u_short
determine_fd_family
(
fd_t
fd_
);
u_short
get_fd_family
(
fd_t
fd_
);
// Socket's family or AF_UNSPEC on error.
static
u_short
determine_fd_family
(
fd_t
fd_
);
#else
// on non-Windows, we can treat all fds as one family
family_entry_t
family_entry
;
fd_t
maxfd
;
bool
retired
;
#endif
// Checks if an fd_entry_t is retired.
static
bool
is_retired_fd
(
const
fd_entry_t
&
entry
);
static
fd_entries_t
::
iterator
find_fd_entry_by_handle
(
fd_entries_t
&
fd_entries
,
handle_t
handle_
);
// Checks if an fd_entry_t is retired.
static
bool
is_retired_fd
(
const
fd_entry_t
&
entry
);
// If true, thread is shutting down.
bool
stopping
;
static
fd_entries_t
::
iterator
find_fd_entry_by_handle
(
fd_entries_t
&
fd_entries
,
handle_t
handle_
)
;
// Handle of the physical thread doing the I/O work
.
thread_t
worker
;
// If true, thread is shutting down
.
bool
stopping
;
select_t
(
const
select_t
&
);
const
select_t
&
operator
=
(
const
select_t
&
);
};
// Handle of the physical thread doing the I/O work.
thread_t
worker
;
typedef
select_t
poller_t
;
select_t
(
const
select_t
&
);
const
select_t
&
operator
=
(
const
select_t
&
);
};
typedef
select_t
poller_t
;
}
#endif
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment