Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
S
spdlog
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
spdlog
Commits
d409e536
Commit
d409e536
authored
Jul 10, 2018
by
gabime
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Revert
d5468e50
parent
d5468e50
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
170 additions
and
168 deletions
+170
-168
thread_pool.h
include/spdlog/details/thread_pool.h
+170
-168
No files found.
include/spdlog/details/thread_pool.h
View file @
d409e536
...
@@ -10,209 +10,210 @@
...
@@ -10,209 +10,210 @@
#include <vector>
#include <vector>
namespace
spdlog
{
namespace
spdlog
{
namespace
details
{
namespace
details
{
using
async_logger_ptr
=
std
::
shared_ptr
<
spdlog
::
async_logger
>
;
using
async_logger_ptr
=
std
::
shared_ptr
<
spdlog
::
async_logger
>
;
enum
class
async_msg_type
enum
class
async_msg_type
{
{
log
,
log
,
flush
,
flush
,
terminate
terminate
};
};
// Async msg to move to/from the queue
// Async msg to move to/from the queue
// Movable only. should never be copied
// Movable only. should never be copied
struct
async_msg
struct
async_msg
{
{
async_msg_type
msg_type
;
async_msg_type
msg_type
;
level
::
level_enum
level
;
level
::
level_enum
level
;
log_clock
::
time_point
time
;
log_clock
::
time_point
time
;
size_t
thread_id
;
size_t
thread_id
;
fmt
::
basic_memory_buffer
<
char
,
176
>
raw
;
fmt
::
basic_memory_buffer
<
char
,
176
>
raw
;
size_t
msg_id
;
async_logger_ptr
worker_ptr
;
size_t
msg_id
;
async_logger_ptr
worker_ptr
;
async_msg
()
=
default
;
~
async_msg
()
=
default
;
async_msg
()
=
default
;
~
async_msg
()
=
default
;
// should only be moved in or out of the queue..
async_msg
(
const
async_msg
&
)
=
delete
;
// should only be moved in or out of the queue..
async_msg
(
const
async_msg
&
)
=
delete
;
async_msg
(
async_msg
&&
other
)
SPDLOG_NOEXCEPT
:
msg_type
(
other
.
msg_type
),
#if defined(_MSC_VER) && _MSC_VER <= 1800 // support for vs2013 move
async_msg
(
async_msg
&&
other
)
SPDLOG_NOEXCEPT
:
msg_type
(
other
.
msg_type
),
level
(
other
.
level
),
level
(
other
.
level
),
time
(
other
.
time
),
time
(
other
.
time
),
thread_id
(
other
.
thread_id
),
thread_id
(
other
.
thread_id
),
raw
(
move
(
other
.
raw
)),
msg_id
(
other
.
msg_id
),
msg_id
(
other
.
msg_id
),
worker_ptr
(
std
::
move
(
other
.
worker_ptr
))
worker_ptr
(
std
::
move
(
other
.
worker_ptr
))
{
{
fmt_helper
::
append_buf
(
other
.
raw
,
raw
);
other
.
raw
.
resize
(
0
);
}
}
async_msg
&
operator
=
(
async_msg
&&
other
)
SPDLOG_NOEXCEPT
async_msg
&
operator
=
(
async_msg
&&
other
)
SPDLOG_NOEXCEPT
{
{
if
(
this
==
&
other
)
return
*
this
;
msg_type
=
other
.
msg_type
;
msg_type
=
other
.
msg_type
;
level
=
other
.
level
;
level
=
other
.
level
;
time
=
other
.
time
;
time
=
other
.
time
;
thread_id
=
other
.
thread_id
;
thread_id
=
other
.
thread_id
;
raw
.
resize
(
0
);
raw
=
std
::
move
(
other
.
raw
);
fmt_helper
::
append_buf
(
other
.
raw
,
raw
);
msg_id
=
other
.
msg_id
;
msg_id
=
other
.
msg_id
;
worker_ptr
=
std
::
move
(
other
.
worker_ptr
);
worker_ptr
=
std
::
move
(
other
.
worker_ptr
);
return
*
this
;
return
*
this
;
}
// construct from log_msg with given type
async_msg
(
async_logger_ptr
&&
worker
,
async_msg_type
the_type
,
details
::
log_msg
&&
m
)
:
msg_type
(
the_type
)
,
level
(
m
.
level
)
,
time
(
m
.
time
)
,
thread_id
(
m
.
thread_id
)
,
msg_id
(
m
.
msg_id
)
,
worker_ptr
(
std
::
forward
<
async_logger_ptr
>
(
worker
))
{
fmt_helper
::
append_buf
(
m
.
raw
,
raw
);
}
async_msg
(
async_logger_ptr
&&
worker
,
async_msg_type
the_type
)
:
async_msg
(
std
::
forward
<
async_logger_ptr
>
(
worker
),
the_type
,
details
::
log_msg
())
{
}
}
#else
async_msg
(
async_msg
&&
other
)
=
default
;
async_msg
&
operator
=
(
async_msg
&&
other
)
=
default
;
#endif
// construct from log_msg with given type
async_msg
(
async_logger_ptr
&&
worker
,
async_msg_type
the_type
,
details
::
log_msg
&&
m
)
:
msg_type
(
the_type
)
,
level
(
m
.
level
)
,
time
(
m
.
time
)
,
thread_id
(
m
.
thread_id
)
,
msg_id
(
m
.
msg_id
)
,
worker_ptr
(
std
::
forward
<
async_logger_ptr
>
(
worker
))
{
fmt_helper
::
append_buf
(
m
.
raw
,
raw
);
}
async_msg
(
async_msg_type
the_type
)
async_msg
(
async_logger_ptr
&&
worker
,
async_msg_type
the_type
)
:
async_msg
(
nullptr
,
the_type
,
details
::
log_msg
())
:
async_msg
(
std
::
forward
<
async_logger_ptr
>
(
worker
)
,
the_type
,
details
::
log_msg
())
{
{
}
}
// copy into log_msg
async_msg
(
async_msg_type
the_type
)
void
to_log_msg
(
log_msg
&
msg
)
:
async_msg
(
nullptr
,
the_type
,
details
::
log_msg
())
{
{
msg
.
logger_name
=
&
worker_ptr
->
name
();
}
msg
.
level
=
level
;
msg
.
time
=
time
;
msg
.
thread_id
=
thread_id
;
fmt_helper
::
append_buf
(
raw
,
msg
.
raw
);
msg
.
msg_id
=
msg_id
;
msg
.
color_range_start
=
0
;
msg
.
color_range_end
=
0
;
}
};
class
thread_pool
// copy into log_msg
{
void
to_log_msg
(
log_msg
&
msg
)
public
:
{
using
item_type
=
async_msg
;
msg
.
logger_name
=
&
worker_ptr
->
name
();
using
q_type
=
details
::
mpmc_blocking_queue
<
item_type
>
;
msg
.
level
=
level
;
using
clock_type
=
std
::
chrono
::
steady_clock
;
msg
.
time
=
time
;
msg
.
thread_id
=
thread_id
;
fmt_helper
::
append_buf
(
raw
,
msg
.
raw
);
msg
.
msg_id
=
msg_id
;
msg
.
color_range_start
=
0
;
msg
.
color_range_end
=
0
;
}
};
thread_pool
(
size_t
q_max_items
,
size_t
threads_n
)
class
thread_pool
:
q_
(
q_max_items
)
{
// std::cout << "thread_pool() q_size_bytes: " << q_size_bytes << "\tthreads_n: " << threads_n << std::endl;
if
(
threads_n
==
0
||
threads_n
>
1000
)
{
{
throw
spdlog_ex
(
"spdlog::thread_pool(): invalid threads_n param (valid range is 1-1000)"
);
public
:
}
using
item_type
=
async_msg
;
for
(
size_t
i
=
0
;
i
<
threads_n
;
i
++
)
using
q_type
=
details
::
mpmc_blocking_queue
<
item_type
>
;
{
using
clock_type
=
std
::
chrono
::
steady_clock
;
threads_
.
emplace_back
(
std
::
bind
(
&
thread_pool
::
worker_loop_
,
this
));
}
}
// message all threads to terminate gracefully join them
thread_pool
(
size_t
q_max_items
,
size_t
threads_n
)
~
thread_pool
()
:
q_
(
q_max_items
)
{
try
{
for
(
size_t
i
=
0
;
i
<
threads_
.
size
();
i
++
)
{
{
post_async_msg_
(
async_msg
(
async_msg_type
::
terminate
),
async_overflow_policy
::
block
);
// std::cout << "thread_pool() q_size_bytes: " << q_size_bytes << "\tthreads_n: " << threads_n << std::endl;
if
(
threads_n
==
0
||
threads_n
>
1000
)
{
throw
spdlog_ex
(
"spdlog::thread_pool(): invalid threads_n param (valid range is 1-1000)"
);
}
for
(
size_t
i
=
0
;
i
<
threads_n
;
i
++
)
{
threads_
.
emplace_back
(
std
::
bind
(
&
thread_pool
::
worker_loop_
,
this
));
}
}
}
for
(
auto
&
t
:
threads_
)
// message all threads to terminate gracefully join them
~
thread_pool
()
{
{
t
.
join
();
try
{
for
(
size_t
i
=
0
;
i
<
threads_
.
size
();
i
++
)
{
post_async_msg_
(
async_msg
(
async_msg_type
::
terminate
),
async_overflow_policy
::
block
);
}
for
(
auto
&
t
:
threads_
)
{
t
.
join
();
}
}
catch
(...)
{
}
}
}
}
catch
(...)
{
}
}
void
post_log
(
async_logger_ptr
&&
worker_ptr
,
details
::
log_msg
&&
msg
,
async_overflow_policy
overflow_policy
)
void
post_log
(
async_logger_ptr
&&
worker_ptr
,
details
::
log_msg
&&
msg
,
async_overflow_policy
overflow_policy
)
{
{
async_msg
async_m
(
std
::
forward
<
async_logger_ptr
>
(
worker_ptr
),
async_msg_type
::
log
,
std
::
forward
<
log_msg
>
(
msg
));
async_msg
async_m
(
std
::
forward
<
async_logger_ptr
>
(
worker_ptr
),
async_msg_type
::
log
,
std
::
forward
<
log_msg
>
(
msg
));
post_async_msg_
(
std
::
move
(
async_m
),
overflow_policy
);
post_async_msg_
(
std
::
move
(
async_m
),
overflow_policy
);
}
}
void
post_flush
(
async_logger_ptr
&&
worker_ptr
,
async_overflow_policy
overflow_policy
)
{
post_async_msg_
(
async_msg
(
std
::
move
(
worker_ptr
),
async_msg_type
::
flush
),
overflow_policy
);
}
private
:
q_type
q_
;
std
::
vector
<
std
::
thread
>
threads_
;
void
post_flush
(
async_logger_ptr
&&
worker_ptr
,
async_overflow_policy
overflow_policy
)
{
post_async_msg_
(
async_msg
(
std
::
move
(
worker_ptr
),
async_msg_type
::
flush
),
overflow_policy
);
}
void
post_async_msg_
(
async_msg
&&
new_msg
,
async_overflow_policy
overflow_policy
)
private
:
{
q_type
q_
;
if
(
overflow_policy
==
async_overflow_policy
::
block
)
{
q_
.
enqueue
(
std
::
move
(
new_msg
));
}
else
{
q_
.
enqueue_nowait
(
std
::
move
(
new_msg
));
}
}
void
worker_loop_
()
std
::
vector
<
std
::
thread
>
threads_
;
{
while
(
process_next_msg_
())
{
};
}
// process next message in the queue
void
post_async_msg_
(
async_msg
&&
new_msg
,
async_overflow_policy
overflow_policy
)
// return true if this thread should still be active (while no terminate msg was received)
{
bool
process_next_msg_
()
if
(
overflow_policy
==
async_overflow_policy
::
block
)
{
{
async_msg
incoming_async_msg
;
q_
.
enqueue
(
std
::
move
(
new_msg
));
bool
dequeued
=
q_
.
dequeue_for
(
incoming_async_msg
,
std
::
chrono
::
seconds
(
10
));
}
if
(
!
dequeued
)
else
{
{
return
true
;
q_
.
enqueue_nowait
(
std
::
move
(
new_msg
));
}
}
}
switch
(
incoming_async_msg
.
msg_type
)
{
case
async_msg_type
:
:
flush
:
{
incoming_async_msg
.
worker_ptr
->
backend_flush_
();
return
true
;
}
case
async_msg_type
:
:
terminate
:
void
worker_loop_
()
{
{
return
false
;
while
(
process_next_msg_
())
}
{
};
}
default:
// process next message in the queue
{
// return true if this thread should still be active (while no terminate msg was received)
log_msg
msg
;
bool
process_next_msg_
()
incoming_async_msg
.
to_log_msg
(
msg
);
{
incoming_async_msg
.
worker_ptr
->
backend_log_
(
msg
);
async_msg
incoming_async_msg
;
return
true
;
bool
dequeued
=
q_
.
dequeue_for
(
incoming_async_msg
,
std
::
chrono
::
seconds
(
10
));
}
if
(
!
dequeued
)
}
{
assert
(
false
);
return
true
;
return
true
;
// should not be reached
}
}
};
switch
(
incoming_async_msg
.
msg_type
)
{
case
async_msg_type
:
:
flush
:
{
incoming_async_msg
.
worker_ptr
->
backend_flush_
();
return
true
;
}
case
async_msg_type
:
:
terminate
:
{
return
false
;
}
default:
{
log_msg
msg
;
incoming_async_msg
.
to_log_msg
(
msg
);
incoming_async_msg
.
worker_ptr
->
backend_log_
(
msg
);
return
true
;
}
}
assert
(
false
);
return
true
;
// should not be reached
}
};
}
// namespace details
}
// namespace details
}
// namespace spdlog
}
// namespace spdlog
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment