Commit ef893111 authored by Robert Kimball's avatar Robert Kimball

threads running

parent d3326687
......@@ -38,6 +38,8 @@ using namespace ngraph;
using descriptor::layout::DenseTensorLayout;
atomic<size_t> runtime::interpreter::INTExecutable::m_next_group_id;
runtime::interpreter::INTExecutable::INTExecutable(const shared_ptr<Function>& function,
bool enable_performance_collection)
: m_is_compiled{true}
......@@ -304,3 +306,86 @@ void runtime::interpreter::INTExecutable::save(ostream& out)
string model = serialize(m_function, 0);
writer.write("model", model.data(), model.size());
}
shared_ptr<ngraph::op::Parameter>
runtime::interpreter::INTExecutable::get_parameter(size_t index) const
{
const ParameterVector& parameters = get_parameters();
NGRAPH_CHECK(index < parameters.size(), "create_tensor for input out of bounds");
return parameters[index];
}
shared_ptr<ngraph::op::Result> runtime::interpreter::INTExecutable::get_result(size_t index) const
{
const ResultVector& results = get_results();
NGRAPH_CHECK(index < results.size(), "create_tensor for input out of bounds");
return results[index];
}
shared_ptr<runtime::Tensor>
runtime::interpreter::INTExecutable::create_input_tensor(size_t input_index)
{
shared_ptr<op::Parameter> parameter = get_parameter(input_index);
return make_shared<runtime::HostTensor>(parameter->get_element_type(), parameter->get_shape());
}
shared_ptr<runtime::Tensor>
runtime::interpreter::INTExecutable::create_output_tensor(size_t output_index)
{
shared_ptr<op::Result> result = get_result(output_index);
return make_shared<runtime::HostTensor>(result->get_element_type(), result->get_shape());
}
vector<shared_ptr<runtime::Tensor>>
runtime::interpreter::INTExecutable::create_input_tensor(size_t input_index,
size_t pipeline_depth)
{
vector<shared_ptr<runtime::HostTensor>> tensors;
shared_ptr<op::Parameter> parameter = get_parameter(input_index);
size_t group_id = m_next_group_id.fetch_add(1);
for (size_t i = 0; i < pipeline_depth; i++)
{
shared_ptr<runtime::HostTensor> tensor;
auto t =
make_shared<runtime::HostTensor>(parameter->get_element_type(), parameter->get_shape());
tensor = static_pointer_cast<runtime::HostTensor>(t);
// tensor->m_group_id = group_id;
// tensor->m_buffer_id = i;
// tensor->m_backend = m_backend;
// tensor->m_pipeline_depth = pipeline_depth;
tensors.push_back(tensor);
}
vector<shared_ptr<runtime::Tensor>> result_tensors;
for (const shared_ptr<runtime::HostTensor>& tensor : tensors)
{
// tensor->m_group_tensors = tensors;
result_tensors.push_back(tensor);
}
return result_tensors;
}
vector<shared_ptr<runtime::Tensor>>
runtime::interpreter::INTExecutable::create_output_tensor(size_t output_index,
size_t pipeline_depth)
{
vector<shared_ptr<runtime::HostTensor>> tensors;
shared_ptr<op::Result> result = get_result(output_index);
size_t group_id = m_next_group_id.fetch_add(1);
for (size_t i = 0; i < pipeline_depth; i++)
{
shared_ptr<runtime::HostTensor> tensor;
auto t = make_shared<runtime::HostTensor>(result->get_element_type(), result->get_shape());
tensor = static_pointer_cast<runtime::HostTensor>(t);
// tensor->m_group_id = group_id;
// tensor->m_buffer_id = i;
// tensor->m_backend = m_backend;
// tensor->m_pipeline_depth = pipeline_depth;
tensors.push_back(tensor);
}
vector<shared_ptr<runtime::Tensor>> result_tensors;
for (const shared_ptr<runtime::HostTensor>& tensor : tensors)
{
// tensor->m_group_tensors = tensors;
result_tensors.push_back(tensor);
}
return result_tensors;
}
......@@ -186,9 +186,21 @@ public:
std::vector<PerformanceCounter> get_performance_data() const override;
std::shared_ptr<runtime::Tensor> create_input_tensor(size_t input_index) override;
std::shared_ptr<runtime::Tensor> create_output_tensor(size_t output_index) override;
std::vector<std::shared_ptr<runtime::Tensor>>
create_input_tensor(size_t input_index, size_t pipeline_depth) override;
std::vector<std::shared_ptr<runtime::Tensor>>
create_output_tensor(size_t output_index, size_t pipeline_depth) override;
private:
INTExecutable(const std::string& model_string);
std::shared_ptr<ngraph::op::Parameter> get_parameter(size_t index) const;
std::shared_ptr<ngraph::op::Result> get_result(size_t index) const;
int get_alignment() const { return 64; }
bool m_is_compiled = false;
bool m_nan_check_enabled = false;
......@@ -198,6 +210,7 @@ private:
std::vector<NodeWrapper> m_wrapped_nodes;
std::unordered_map<const Node*, std::shared_ptr<RNGState>> m_states;
std::set<std::string> m_unsupported_op_name_list;
static std::atomic<size_t> m_next_group_id;
static void perform_nan_check(const std::vector<std::shared_ptr<HostTensor>>&,
const Node* op = nullptr);
......
......@@ -14,6 +14,10 @@
// limitations under the License.
//*****************************************************************************
#include <condition_variable>
#include <mutex>
#include <thread>
#include "benchmark.hpp"
#include "benchmark_utils.hpp"
#include "ngraph/file_util.hpp"
......@@ -37,6 +41,29 @@ public:
private:
};
static mutex s_mutex;
static condition_variable s_condition;
static size_t current_iteration = 0;
static void
thread_entry(runtime::Executable* exec, const TensorCollection& tensors, size_t pipeline_stage)
// static void thread_entry(size_t pipeline_stage)
{
NGRAPH_INFO;
unique_lock<mutex> lock(s_mutex);
if ((current_iteration & 1) != pipeline_stage)
{
s_condition.wait(lock);
}
else
{
// our turn to run
NGRAPH_INFO << "stage " << pipeline_stage << " for iteration " << current_iteration;
current_iteration++;
s_condition.notify_all();
}
}
vector<runtime::PerformanceCounter> run_benchmark_pipelined(shared_ptr<Function> f,
const string& backend_name,
size_t iterations,
......@@ -44,6 +71,7 @@ vector<runtime::PerformanceCounter> run_benchmark_pipelined(shared_ptr<Function>
int warmup_iterations,
bool copy_data)
{
NGRAPH_INFO;
constexpr size_t pipeline_depth = 2;
array<TensorCollection, pipeline_depth> tensor_collections;
stopwatch timer;
......@@ -59,15 +87,13 @@ vector<runtime::PerformanceCounter> run_benchmark_pipelined(shared_ptr<Function>
array<vector<shared_ptr<runtime::HostTensor>>, pipeline_depth> parameters_data_set;
for (size_t i = 0; i < pipeline_depth; i++)
{
vector<shared_ptr<runtime::HostTensor>> parameters_data;
for (shared_ptr<op::Parameter> param : f->get_parameters())
{
auto tensor_data =
make_shared<runtime::HostTensor>(param->get_element_type(), param->get_shape());
random_init(tensor_data);
parameters_data.push_back(tensor_data);
tensor_collections[i].parameter_data.push_back(tensor_data);
}
parameters_data_set[i] = parameters_data;
}
// Create input tensors for all Parameters
......@@ -78,7 +104,7 @@ vector<runtime::PerformanceCounter> run_benchmark_pipelined(shared_ptr<Function>
auto input_tensors = exec->create_input_tensor(input_index++, pipeline_depth);
for (size_t i = 0; i < pipeline_depth; i++)
{
input_tensors_array[i].push_back(input_tensors[i]);
tensor_collections[i].input_tensors.push_back(input_tensors[i]);
}
}
......@@ -90,12 +116,25 @@ vector<runtime::PerformanceCounter> run_benchmark_pipelined(shared_ptr<Function>
auto output_tensors = exec->create_output_tensor(output_index++, pipeline_depth);
for (size_t i = 0; i < pipeline_depth; i++)
{
output_tensors_array[i].push_back(output_tensors[i]);
tensor_collections[i].output_tensors.push_back(output_tensors[i]);
}
}
stopwatch t1;
size_t current_iteration = 0;
thread threads[pipeline_depth];
for (size_t i = 0; i < pipeline_depth; i++)
{
threads[i] = thread(thread_entry, exec.get(), tensor_collections[i], i);
// threads[i] = thread(thread_entry, i);
}
for (size_t i = 0; i < pipeline_depth; i++)
{
threads[i].join();
}
// // Before we start we write the first iteration's data
// size_t buffer_number = 0;
// auto args = input_tensors_array[buffer_number];
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment