Commit 41f23962 authored by Robert Kimball's avatar Robert Kimball

wip

parent b3b29563
......@@ -20,6 +20,7 @@
#include "ngraph/runtime/backend.hpp"
#include "ngraph/runtime/backend_manager.hpp"
#include "ngraph/runtime/dynamic/dynamic_backend.hpp"
#include "ngraph/runtime/executable.hpp"
#include "ngraph/util.hpp"
using namespace std;
......@@ -100,39 +101,42 @@ std::shared_ptr<runtime::Executable> runtime::Backend::load(istream& input_strea
}
runtime::Backend::AsyncEvent::AsyncEvent(Type type,
size_t buffer_number,
const shared_ptr<Tensor>& tensor,
void* p,
size_t size_in_bytes)
size_t size_in_bytes,
size_t buffer_number)
: m_type{type}
, m_buffer_number{buffer_number}
, m_data{p}
, m_size_in_bytes{size_in_bytes}
, m_executable{nullptr}
, m_tensor{tensor}
, m_outputs{nullptr}
, m_inputs{nullptr}
{
(void)m_buffer_number;
}
runtime::Backend::AsyncEvent::AsyncEvent(size_t buffer_number,
const shared_ptr<Executable>& executable,
runtime::Backend::AsyncEvent::AsyncEvent(const shared_ptr<Executable>& executable,
const vector<shared_ptr<runtime::Tensor>>& outputs,
const vector<shared_ptr<runtime::Tensor>>& inputs)
: m_type{Type::EXECUTE}
, m_buffer_number{buffer_number}
, m_buffer_number{0}
, m_data{nullptr}
, m_size_in_bytes{0}
, m_executable{executable}
, m_outputs{&outputs}
, m_inputs{&inputs}
, m_tensor{nullptr}
, m_outputs{outputs}
, m_inputs{inputs}
{
(void)m_buffer_number;
}
future<void>
runtime::Backend::post_async_read_event(void* p, size_t size_in_bytes, size_t buffer_number)
future<void> runtime::Backend::post_async_read_event(const shared_ptr<Tensor>& tensor,
void* p,
size_t size_in_bytes,
size_t buffer_number)
{
auto event = make_shared<AsyncEvent>(AsyncEvent::Type::READ, buffer_number, p, size_in_bytes);
auto event =
make_shared<AsyncEvent>(AsyncEvent::Type::READ, tensor, p, size_in_bytes, buffer_number);
unique_lock<std::mutex> lock(m_event_queue_mutex);
m_event_queue.push_back(event);
NGRAPH_INFO << "read";
......@@ -141,10 +145,13 @@ future<void>
return event->get_future();
}
future<void>
runtime::Backend::post_async_write_event(void* p, size_t size_in_bytes, size_t buffer_number)
future<void> runtime::Backend::post_async_write_event(const shared_ptr<Tensor>& tensor,
const void* p,
size_t size_in_bytes,
size_t buffer_number)
{
auto event = make_shared<AsyncEvent>(AsyncEvent::Type::WRITE, buffer_number, p, size_in_bytes);
auto event = make_shared<AsyncEvent>(
AsyncEvent::Type::WRITE, tensor, const_cast<void*>(p), size_in_bytes, buffer_number);
unique_lock<std::mutex> lock(m_event_queue_mutex);
m_event_queue.push_back(event);
NGRAPH_INFO << "write";
......@@ -156,10 +163,9 @@ future<void>
future<void> runtime::Backend::post_async_execute_event(
const std::shared_ptr<Executable>& executable,
const std::vector<std::shared_ptr<runtime::Tensor>>& outputs,
const std::vector<std::shared_ptr<runtime::Tensor>>& inputs,
size_t buffer_number)
const std::vector<std::shared_ptr<runtime::Tensor>>& inputs)
{
auto event = make_shared<AsyncEvent>(buffer_number, executable, outputs, inputs);
auto event = make_shared<AsyncEvent>(executable, outputs, inputs);
unique_lock<std::mutex> lock(m_event_queue_mutex);
m_event_queue.push_back(event);
NGRAPH_INFO << "execute";
......@@ -193,7 +199,21 @@ void runtime::Backend::async_thread_stop()
void runtime::Backend::async_thread_process(const shared_ptr<AsyncEvent>& event)
{
NGRAPH_INFO << "process";
switch (event->get_type())
{
case AsyncEvent::Type::READ:
NGRAPH_INFO << "process read";
event->get_tensor()->read(event->get_data(), 0, event->get_size_in_bytes());
break;
case AsyncEvent::Type::WRITE:
NGRAPH_INFO << "process write";
event->get_tensor()->write(event->get_data(), 0, event->get_size_in_bytes());
break;
case AsyncEvent::Type::EXECUTE:
NGRAPH_INFO << "process execute";
event->get_executable()->call(event->get_outputs(), event->get_inputs());
break;
}
}
void runtime::Backend::async_thread_entry()
......
......@@ -144,6 +144,7 @@ public:
protected:
friend class ngraph::runtime::Tensor;
friend class ngraph::runtime::Executable;
class AsyncEvent
{
......@@ -154,20 +155,25 @@ protected:
WRITE,
EXECUTE
};
AsyncEvent(Type type, size_t buffer_number, void* p, size_t size_in_bytes);
AsyncEvent(size_t buffer_number,
const std::shared_ptr<Executable>& m_executable,
AsyncEvent(Type type,
const std::shared_ptr<Tensor>& tensor,
void* p,
size_t size_in_bytes,
size_t buffer_number);
AsyncEvent(const std::shared_ptr<Executable>& m_executable,
const std::vector<std::shared_ptr<runtime::Tensor>>& m_outputs,
const std::vector<std::shared_ptr<runtime::Tensor>>& m_inputs);
void* get_data() const { return m_data; }
bool get_size_in_bytes() const { return m_size_in_bytes; }
Type get_type() const { return m_type; }
size_t get_buffer_number() const { return m_buffer_number; }
std::shared_ptr<Executable> get_executable() const { return m_executable; }
const std::vector<std::shared_ptr<runtime::Tensor>>* get_outputs() const
std::shared_ptr<Tensor> get_tensor() const { return m_tensor; }
const std::vector<std::shared_ptr<runtime::Tensor>>& get_outputs() const
{
return m_outputs;
}
const std::vector<std::shared_ptr<runtime::Tensor>>* get_inputs() const { return m_inputs; }
const std::vector<std::shared_ptr<runtime::Tensor>>& get_inputs() const { return m_inputs; }
std::future<void> get_future() { return m_promise.get_future(); }
private:
const Type m_type;
......@@ -175,18 +181,24 @@ protected:
void* m_data;
const size_t m_size_in_bytes;
std::shared_ptr<Executable> m_executable;
const std::vector<std::shared_ptr<runtime::Tensor>>* m_outputs;
const std::vector<std::shared_ptr<runtime::Tensor>>* m_inputs;
std::shared_ptr<Tensor> m_tensor;
std::vector<std::shared_ptr<runtime::Tensor>> m_outputs;
std::vector<std::shared_ptr<runtime::Tensor>> m_inputs;
std::promise<void> m_promise;
};
std::future<void> post_async_read_event(void* p, size_t size_in_bytes, size_t buffer_number);
std::future<void> post_async_write_event(void* p, size_t size_in_bytes, size_t buffer_number);
std::future<void> post_async_read_event(const std::shared_ptr<Tensor>& tensor,
void* p,
size_t size_in_bytes,
size_t buffer_number);
std::future<void> post_async_write_event(const std::shared_ptr<Tensor>& tensor,
const void* p,
size_t size_in_bytes,
size_t buffer_number);
std::future<void>
post_async_execute_event(const std::shared_ptr<Executable>& executable,
const std::vector<std::shared_ptr<runtime::Tensor>>& outputs,
const std::vector<std::shared_ptr<runtime::Tensor>>& inputs,
size_t buffer_number);
const std::vector<std::shared_ptr<runtime::Tensor>>& inputs);
void async_thread_start();
void async_thread_stop();
......
......@@ -17,6 +17,7 @@
#include <sstream>
#include "ngraph/file_util.hpp"
#include "ngraph/runtime/backend.hpp"
#include "ngraph/runtime/executable.hpp"
#include "ngraph/runtime/tensor.hpp"
#include "ngraph/util.hpp"
......@@ -28,6 +29,11 @@ runtime::Executable::Executable()
{
}
runtime::Executable::Executable(const shared_ptr<Backend>& backend)
: m_backend{backend}
{
}
runtime::Executable::~Executable()
{
}
......@@ -139,10 +145,15 @@ bool runtime::Executable::begin_execute_helper(const vector<shared_ptr<runtime::
return rc;
}
future<bool> runtime::Executable::begin_execute(const vector<shared_ptr<runtime::Tensor>>& outputs,
future<void> runtime::Executable::begin_execute(const vector<shared_ptr<runtime::Tensor>>& outputs,
const vector<shared_ptr<runtime::Tensor>>& inputs)
{
using namespace std::placeholders;
auto bound_f = bind(&Executable::begin_execute_helper, this, _1, _2);
return async(bound_f, outputs, inputs);
if (m_backend)
{
return m_backend->post_async_execute_event(shared_from_this(), outputs, inputs);
}
else
{
throw runtime_error("Async operations not supported for this backend");
}
}
......@@ -30,13 +30,15 @@ namespace ngraph
{
class Tensor;
class Executable;
class Backend;
}
}
class ngraph::runtime::Executable
class ngraph::runtime::Executable : public std::enable_shared_from_this<Executable>
{
public:
Executable();
Executable(const std::shared_ptr<Backend>& backend);
virtual ~Executable();
/// \param outputs vector of runtime::Tensor used as outputs
......@@ -58,7 +60,7 @@ public:
/// \param inputs vector of runtime::Tensor used as inputs
/// \returns a valid std::future to monitor the execution. Use future.get() to get the results
/// or future.wait*() to wait for completion.
virtual std::future<bool>
virtual std::future<void>
begin_execute(const std::vector<std::shared_ptr<runtime::Tensor>>& outputs,
const std::vector<std::shared_ptr<runtime::Tensor>>& inputs);
......@@ -96,4 +98,5 @@ protected:
private:
ngraph::ParameterVector m_parameters;
ngraph::ResultVector m_results;
std::shared_ptr<Backend> m_backend;
};
......@@ -67,7 +67,7 @@ shared_ptr<runtime::Executable>
runtime::interpreter::INTBackend::compile(shared_ptr<Function> function,
bool enable_performance_collection)
{
return make_shared<INTExecutable>(function, enable_performance_collection);
return make_shared<INTExecutable>(shared_from_this(), function, enable_performance_collection);
}
bool runtime::interpreter::INTBackend::is_supported(const Node& node) const
......
......@@ -37,9 +37,11 @@ using namespace ngraph;
using descriptor::layout::DenseTensorLayout;
runtime::interpreter::INTExecutable::INTExecutable(const shared_ptr<Function>& function,
runtime::interpreter::INTExecutable::INTExecutable(const shared_ptr<runtime::Backend>& backend,
const shared_ptr<Function>& function,
bool enable_performance_collection)
: m_is_compiled{true}
: Executable{backend}
, m_is_compiled{true}
, m_performance_counters_enabled{enable_performance_collection}
{
m_function = clone_function(*function);
......
......@@ -62,6 +62,7 @@
#include "ngraph/op/topk.hpp"
#include "ngraph/runtime/aligned_buffer.hpp"
#include "ngraph/runtime/backend.hpp"
#include "ngraph/runtime/executable.hpp"
#include "ngraph/runtime/host_tensor.hpp"
#include "ngraph/runtime/hybrid/op/function_call.hpp"
#include "ngraph/runtime/interpreter/node_wrapper.hpp"
......@@ -163,7 +164,8 @@ class ngraph::runtime::interpreter::INTExecutable : public Executable
friend class INTBackend;
public:
INTExecutable(const std::shared_ptr<Function>& function,
INTExecutable(const std::shared_ptr<runtime::Backend>& backend,
const std::shared_ptr<Function>& function,
bool enable_performance_collection = false);
bool call(const std::vector<std::shared_ptr<Tensor>>& outputs,
......
......@@ -104,7 +104,7 @@ future<void> runtime::Tensor::begin_write(const void* p, size_t size_in_bytes, s
{
// auto f = m_promise.get_future();
return m_backend->post_async_write_event(
const_cast<void*>(p), size_in_bytes, buffer_number);
shared_from_this(), p, size_in_bytes, buffer_number);
}
else
{
......@@ -122,7 +122,8 @@ future<void> runtime::Tensor::begin_read(void* p, size_t size_in_bytes, size_t b
if (m_backend)
{
// auto f = m_promise.get_future();
return m_backend->post_async_read_event(p, size_in_bytes, buffer_number);
return m_backend->post_async_read_event(
shared_from_this(), p, size_in_bytes, buffer_number);
}
else
{
......
......@@ -36,7 +36,7 @@ namespace ngraph
namespace runtime
{
class Tensor
class Tensor : public std::enable_shared_from_this<Tensor>
{
friend class Executable;
......
......@@ -54,6 +54,7 @@ TEST(async, execute)
TEST(async, tensor_read_write)
{
chrono::milliseconds ten_ms(100);
Shape shape{100000};
auto A = make_shared<op::Parameter>(element::f32, shape);
auto B = make_shared<op::Parameter>(element::f32, shape);
......@@ -72,17 +73,17 @@ TEST(async, tensor_read_write)
auto future_a = a->begin_write(data.data(), data.size() * sizeof(float), 0);
auto future_b = b->begin_write(data.data(), data.size() * sizeof(float), 0);
auto future_r = r->begin_read(data_r.data(), data_r.size() * sizeof(float), 0);
ASSERT_TRUE(future_a.valid());
ASSERT_TRUE(future_b.valid());
ASSERT_TRUE(future_r.valid());
chrono::milliseconds ten_ms(100);
EXPECT_EQ(future_r.wait_for(ten_ms), future_status::timeout);
auto future = handle->begin_execute({r}, {a, b});
// get() waits for the result to be ready
future.get();
auto future_r = r->begin_read(data_r.data(), data_r.size() * sizeof(float), 0);
ASSERT_TRUE(future_r.valid());
EXPECT_EQ(future_a.wait_for(ten_ms), future_status::ready);
EXPECT_EQ(future_b.wait_for(ten_ms), future_status::ready);
EXPECT_EQ(future_r.wait_for(ten_ms), future_status::ready);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment