Commit b3b29563 authored by Robert Kimball's avatar Robert Kimball

async thread working

parent d687c08f
...@@ -25,9 +25,14 @@ ...@@ -25,9 +25,14 @@
using namespace std; using namespace std;
using namespace ngraph; using namespace ngraph;
runtime::Backend::Backend()
{
async_thread_start();
}
runtime::Backend::~Backend() runtime::Backend::~Backend()
{ {
NGRAPH_INFO; NGRAPH_INFO << "m_event_queue_condition " << &m_event_queue_condition;
async_thread_stop(); async_thread_stop();
NGRAPH_INFO; NGRAPH_INFO;
} }
...@@ -127,22 +132,24 @@ runtime::Backend::AsyncEvent::AsyncEvent(size_t buffer_number, ...@@ -127,22 +132,24 @@ runtime::Backend::AsyncEvent::AsyncEvent(size_t buffer_number,
future<void> future<void>
runtime::Backend::post_async_read_event(void* p, size_t size_in_bytes, size_t buffer_number) runtime::Backend::post_async_read_event(void* p, size_t size_in_bytes, size_t buffer_number)
{ {
async_thread_start();
auto event = make_shared<AsyncEvent>(AsyncEvent::Type::READ, buffer_number, p, size_in_bytes); auto event = make_shared<AsyncEvent>(AsyncEvent::Type::READ, buffer_number, p, size_in_bytes);
unique_lock<std::mutex> lock(m_event_queue_mutex); unique_lock<std::mutex> lock(m_event_queue_mutex);
m_event_queue.push_back(event); m_event_queue.push_back(event);
m_event_queue_condition.notify_one(); NGRAPH_INFO << "read";
NGRAPH_INFO << "m_event_queue_condition " << &m_event_queue_condition;
m_event_queue_condition.notify_all();
return event->get_future(); return event->get_future();
} }
future<void> future<void>
runtime::Backend::post_async_write_event(void* p, size_t size_in_bytes, size_t buffer_number) runtime::Backend::post_async_write_event(void* p, size_t size_in_bytes, size_t buffer_number)
{ {
async_thread_start();
auto event = make_shared<AsyncEvent>(AsyncEvent::Type::WRITE, buffer_number, p, size_in_bytes); auto event = make_shared<AsyncEvent>(AsyncEvent::Type::WRITE, buffer_number, p, size_in_bytes);
unique_lock<std::mutex> lock(m_event_queue_mutex); unique_lock<std::mutex> lock(m_event_queue_mutex);
m_event_queue.push_back(event); m_event_queue.push_back(event);
m_event_queue_condition.notify_one(); NGRAPH_INFO << "write";
NGRAPH_INFO << "m_event_queue_condition " << &m_event_queue_condition;
m_event_queue_condition.notify_all();
return event->get_future(); return event->get_future();
} }
...@@ -152,11 +159,12 @@ future<void> runtime::Backend::post_async_execute_event( ...@@ -152,11 +159,12 @@ future<void> runtime::Backend::post_async_execute_event(
const std::vector<std::shared_ptr<runtime::Tensor>>& inputs, const std::vector<std::shared_ptr<runtime::Tensor>>& inputs,
size_t buffer_number) size_t buffer_number)
{ {
async_thread_start();
auto event = make_shared<AsyncEvent>(buffer_number, executable, outputs, inputs); auto event = make_shared<AsyncEvent>(buffer_number, executable, outputs, inputs);
unique_lock<std::mutex> lock(m_event_queue_mutex); unique_lock<std::mutex> lock(m_event_queue_mutex);
m_event_queue.push_back(event); m_event_queue.push_back(event);
m_event_queue_condition.notify_one(); NGRAPH_INFO << "execute";
NGRAPH_INFO << "m_event_queue_condition " << &m_event_queue_condition;
m_event_queue_condition.notify_all();
return event->get_future(); return event->get_future();
} }
...@@ -175,9 +183,9 @@ void runtime::Backend::async_thread_stop() ...@@ -175,9 +183,9 @@ void runtime::Backend::async_thread_stop()
if (m_event_queue_active) if (m_event_queue_active)
{ {
{ {
unique_lock<std::mutex> lk(m_event_queue_mutex); unique_lock<std::mutex> lock(m_event_queue_mutex);
m_event_queue_active = false; m_event_queue_active = false;
m_event_queue_condition.notify_one(); m_event_queue_condition.notify_all();
} }
m_event_queue_thread->join(); m_event_queue_thread->join();
} }
...@@ -191,10 +199,10 @@ void runtime::Backend::async_thread_process(const shared_ptr<AsyncEvent>& event) ...@@ -191,10 +199,10 @@ void runtime::Backend::async_thread_process(const shared_ptr<AsyncEvent>& event)
void runtime::Backend::async_thread_entry() void runtime::Backend::async_thread_entry()
{ {
NGRAPH_INFO << "******************** inside thread"; NGRAPH_INFO << "******************** inside thread";
unique_lock<std::mutex> lk(m_event_queue_mutex); unique_lock<std::mutex> lock(m_event_queue_mutex);
while (m_event_queue_active) while (m_event_queue_active)
{ {
m_event_queue_condition.wait(lk); m_event_queue_condition.wait(lock);
while (!m_event_queue.empty()) while (!m_event_queue.empty())
{ {
async_thread_process(m_event_queue.front()); async_thread_process(m_event_queue.front());
......
...@@ -41,6 +41,7 @@ namespace ngraph ...@@ -41,6 +41,7 @@ namespace ngraph
class ngraph::runtime::Backend class ngraph::runtime::Backend
{ {
public: public:
Backend();
virtual ~Backend(); virtual ~Backend();
/// \brief Create a new Backend object /// \brief Create a new Backend object
/// \param type The name of a registered backend, such as "CPU" or "GPU". /// \param type The name of a registered backend, such as "CPU" or "GPU".
......
...@@ -31,7 +31,6 @@ set(SRC ...@@ -31,7 +31,6 @@ set(SRC
aligned_buffer.cpp aligned_buffer.cpp
all_close_f.cpp all_close_f.cpp
assertion.cpp assertion.cpp
async.cpp
bfloat16.cpp bfloat16.cpp
build_graph.cpp build_graph.cpp
builder_autobroadcast.cpp builder_autobroadcast.cpp
...@@ -86,6 +85,7 @@ set_source_files_properties(includes.cpp PROPERTIES COMPILE_DEFINITIONS ...@@ -86,6 +85,7 @@ set_source_files_properties(includes.cpp PROPERTIES COMPILE_DEFINITIONS
if (NGRAPH_INTERPRETER_ENABLE) if (NGRAPH_INTERPRETER_ENABLE)
list(APPEND SRC list(APPEND SRC
async.cpp
backend_debug_api.cpp backend_debug_api.cpp
builder.cpp builder.cpp
backend_api.cpp) backend_api.cpp)
......
...@@ -77,15 +77,7 @@ TEST(async, tensor_read_write) ...@@ -77,15 +77,7 @@ TEST(async, tensor_read_write)
ASSERT_TRUE(future_b.valid()); ASSERT_TRUE(future_b.valid());
ASSERT_TRUE(future_r.valid()); ASSERT_TRUE(future_r.valid());
chrono::milliseconds ten_ms(10); chrono::milliseconds ten_ms(100);
EXPECT_EQ(future_a.wait_for(ten_ms), future_status::timeout);
EXPECT_EQ(future_b.wait_for(ten_ms), future_status::timeout);
EXPECT_EQ(future_r.wait_for(ten_ms), future_status::timeout);
this_thread::sleep_for(chrono::milliseconds(500));
EXPECT_EQ(future_a.wait_for(ten_ms), future_status::timeout);
EXPECT_EQ(future_b.wait_for(ten_ms), future_status::timeout);
EXPECT_EQ(future_r.wait_for(ten_ms), future_status::timeout); EXPECT_EQ(future_r.wait_for(ten_ms), future_status::timeout);
auto future = handle->begin_execute({r}, {a, b}); auto future = handle->begin_execute({r}, {a, b});
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment