Commit f5a24919 authored by Robert Kimball's avatar Robert Kimball

wip

parent 8927ccbf
......@@ -122,15 +122,23 @@ runtime::Backend::AsyncEvent::AsyncEvent(size_t buffer_number,
future<void>
runtime::Backend::post_async_read_event(void* p, size_t size_in_bytes, size_t buffer_number)
{
async_thread_start();
auto event = make_shared<AsyncEvent>(AsyncEvent::Type::READ, buffer_number, p, size_in_bytes);
m_event_queue.post_event(event);
unique_lock<std::mutex> lock(m_event_queue_mutex);
m_event_queue.push_back(event);
m_event_queue_condition.notify_one();
return event->get_future();
}
future<void> runtime::Backend::post_async_write_event(const void* p,
size_t size_in_bytes,
size_t buffer_number)
future<void>
runtime::Backend::post_async_write_event(void* p, size_t size_in_bytes, size_t buffer_number)
{
async_thread_start();
auto event = make_shared<AsyncEvent>(AsyncEvent::Type::WRITE, buffer_number, p, size_in_bytes);
unique_lock<std::mutex> lock(m_event_queue_mutex);
m_event_queue.push_back(event);
m_event_queue_condition.notify_one();
return event->get_future();
}
future<void> runtime::Backend::post_async_execute_event(
......@@ -139,4 +147,53 @@ future<void> runtime::Backend::post_async_execute_event(
const std::vector<std::shared_ptr<runtime::Tensor>>& inputs,
size_t buffer_number)
{
async_thread_start();
auto event = make_shared<AsyncEvent>(buffer_number, executable, outputs, inputs);
unique_lock<std::mutex> lock(m_event_queue_mutex);
m_event_queue.push_back(event);
m_event_queue_condition.notify_one();
return event->get_future();
}
void runtime::Backend::async_thread_start()
{
if (!m_event_queue_active)
{
m_event_queue_active = true;
m_event_queue_thread =
unique_ptr<thread>(new thread(&runtime::Backend::async_thread_entry, this));
}
}
void runtime::Backend::async_thread_stop()
{
if (m_event_queue_active)
{
{
unique_lock<std::mutex> lk(m_event_queue_mutex);
m_event_queue_active = false;
m_event_queue_condition.notify_one();
}
m_event_queue_thread->join();
}
}
void runtime::Backend::async_thread_process(const shared_ptr<AsyncEvent>& event)
{
NGRAPH_INFO << "process";
}
void runtime::Backend::async_thread_entry()
{
NGRAPH_INFO << "******************** inside thread";
unique_lock<std::mutex> lk(m_event_queue_mutex);
while (m_event_queue_active)
{
m_event_queue_condition.wait(lk);
while (!m_event_queue.empty())
{
async_thread_process(m_event_queue.front());
m_event_queue.pop_front();
}
}
}
......@@ -180,11 +180,21 @@ protected:
};
std::future<void> post_async_read_event(void* p, size_t size_in_bytes, size_t buffer_number);
std::future<void>
post_async_write_event(const void* p, size_t size_in_bytes, size_t buffer_number);
std::future<void> post_async_write_event(void* p, size_t size_in_bytes, size_t buffer_number);
std::future<void>
post_async_execute_event(const std::shared_ptr<Executable>& executable,
const std::vector<std::shared_ptr<runtime::Tensor>>& outputs,
const std::vector<std::shared_ptr<runtime::Tensor>>& inputs,
size_t buffer_number);
void async_thread_start();
void async_thread_stop();
void async_thread_process(const std::shared_ptr<AsyncEvent>& event);
void async_thread_entry();
std::deque<std::shared_ptr<AsyncEvent>> m_event_queue;
std::mutex m_event_queue_mutex;
std::condition_variable m_event_queue_condition;
std::unique_ptr<std::thread> m_event_queue_thread;
bool m_event_queue_active = false;
};
......@@ -103,7 +103,8 @@ future<void> runtime::Tensor::begin_write(const void* p, size_t size_in_bytes, s
if (m_backend)
{
// auto f = m_promise.get_future();
return m_backend->post_async_write_event(p, size_in_bytes, buffer_number);
return m_backend->post_async_write_event(
const_cast<void*>(p), size_in_bytes, buffer_number);
}
else
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment