Commit d9f615b7 authored by Sandeep's avatar Sandeep Committed by Robert Kimball

enable hybrid test with graph splits (#1960)

* size_t for placement in node

* enable hybrid backend test

* style

* cp placement functions

* placement size_t based functions

* placement based on backends

* add placement based on size_t

* backend size_t based placement

* call

* update

* resolve bug

* format

* revert cmake changes

* address PR comments

* ci error

* pr comments
parent 6b3f3a0a
......@@ -355,6 +355,35 @@ pair<shared_ptr<op::Result>, shared_ptr<op::Parameter>>
return make_pair(res_node, par_node);
}
// Suffix *_size as a part of function name is temporary, this suffix
// will be removed when the backends move to the latest Hybrid backend
pair<shared_ptr<op::Result>, shared_ptr<op::Parameter>>
ngraph::insert_result_parameter_split_size(const shared_ptr<Node>& src_node,
const shared_ptr<Node>& dst_node)
{
if (src_node->get_output_size() != 1)
{
throw ngraph_error("Multiple output per op not supported in graph partition yet.");
}
// Make parameter node
shared_ptr<op::Parameter> par_node = make_shared<op::Parameter>(
src_node->get_output_element_type(0), src_node->get_output_shape(0));
par_node->set_placement(dst_node->get_placement_size());
// Fix input / output among src, dst and par
descriptor::Input* dst_input = dst_node->get_input_from(src_node);
descriptor::Output* src_output = src_node->get_output_to(dst_node);
src_output->remove_input(dst_input); // Remove [0]
dst_input->replace_output(par_node, 0); // Remove [0] (again), add [8], remove [1], add [9]
// Add res node
shared_ptr<op::Result> res_node = make_shared<op::Result>(src_node); // Add [4], [5], [6], [7]
res_node->set_placement(src_node->get_placement_size());
return make_pair(res_node, par_node);
}
// Insert unary node between two nodes like S->D => S->N->D
// Before: | After:
// +-----+---+ +---+-----+ | +-----+---+ +---+-----+---+ +---+-----+
......@@ -429,6 +458,31 @@ Placement ngraph::get_colocated_function_placement(shared_ptr<Function> func)
return function_placement;
}
// Suffix *_size as a part of function name is temporary, this suffix
// will be removed when the backends move to the latest Hybrid backend
// Assert that nodes in the function is colocated and return that placement
size_t ngraph::get_colocated_function_placement_size(shared_ptr<Function> func)
{
auto ops = func->get_ops();
//it's okay to not do Placement::DEFAULT check; the same node will be checked in the loop below
size_t function_placement = ops.front()->get_placement_size();
for (auto op : ops)
{
size_t node_placement = op->get_placement_size();
if (node_placement == 0)
{
throw ngraph_error("Node should have a device placement, not Placement::DEFAULT");
}
if (function_placement != node_placement)
{
throw ngraph_error("Function contains nodes of two different placements");
}
}
return function_placement;
}
std::shared_ptr<Node> ngraph::make_zero(const element::Type& element_type, const Shape& shape)
{
std::shared_ptr<Node> zero = op::Constant::create(element_type, Shape{}, {0.0});
......
......@@ -283,11 +283,16 @@ namespace ngraph
// Assert that nodes in the function is colocated and return that placement
Placement get_colocated_function_placement(std::shared_ptr<Function> func);
size_t get_colocated_function_placement_size(std::shared_ptr<Function> func);
std::pair<std::shared_ptr<op::Result>, std::shared_ptr<op::Parameter>>
insert_result_parameter_split(const std::shared_ptr<Node>& src_node,
const std::shared_ptr<Node>& dst_node);
std::pair<std::shared_ptr<op::Result>, std::shared_ptr<op::Parameter>>
insert_result_parameter_split_size(const std::shared_ptr<Node>& src_node,
const std::shared_ptr<Node>& dst_node);
void insert_new_node_between(const std::shared_ptr<Node>& src_node,
const std::shared_ptr<Node>& dst_node,
const std::shared_ptr<Node>& new_node);
......
......@@ -150,6 +150,16 @@ void Node::set_placement(Placement placement)
m_placement = placement;
}
size_t Node::get_placement_size() const
{
return m_placement_size;
}
void Node::set_placement(size_t placement)
{
m_placement_size = placement;
}
std::shared_ptr<Node> Node::get_argument(size_t index) const
{
for (auto& i : get_inputs())
......
......@@ -232,6 +232,12 @@ namespace ngraph
/// Set device placement
void set_placement(Placement placement);
/// Get device placement
size_t get_placement_size() const;
/// Set device placement
void set_placement(size_t placement);
/// Get input descriptor that is connected to src
descriptor::Input* get_input_from(const std::shared_ptr<Node>& src);
......@@ -257,6 +263,7 @@ namespace ngraph
std::deque<descriptor::Output> m_outputs;
std::unordered_map<Node*, autodiff::Adjoints> m_adjoint_map;
Placement m_placement = Placement::DEFAULT;
size_t m_placement_size = 0;
};
class NodeValidationError : public AssertionFailure
......
......@@ -36,7 +36,7 @@ void op::Result::validate_and_infer_types()
<< " outputs (1 expected).";
// always borrow the placement conf even the default one
set_placement(get_argument(0)->get_placement());
set_placement(get_argument(0)->get_placement_size());
set_output_type(0, get_input_element_type(0), get_input_partial_shape(0));
}
......
......@@ -15,8 +15,10 @@
//*****************************************************************************
#include "ngraph/pass/assign_placement.hpp"
#include "ngraph/log.hpp"
#include "ngraph/node.hpp"
#include "ngraph/placement.hpp"
#include "ngraph/runtime/backend.hpp"
using namespace ngraph;
using namespace std;
......@@ -26,8 +28,31 @@ pass::AssignPlacement::AssignPlacement(function<Placement(shared_ptr<Node>)> pla
{
}
pass::AssignPlacement::AssignPlacement(vector<shared_ptr<runtime::Backend>> placement_backends)
: m_placement_backends(placement_backends)
{
}
bool pass::AssignPlacement::run_on_node(shared_ptr<Node> node)
{
if (!m_placement_backends.empty())
{
size_t backend_index = 0;
for (auto backend : m_placement_backends)
{
backend_index += 1;
if (backend->is_supported(*node))
{
node->set_placement(backend_index);
return false;
}
}
}
else
{
node->set_placement(m_placement_policy(node));
}
return false;
}
......@@ -32,9 +32,14 @@ namespace ngraph
public:
// TODO: make policy a class
AssignPlacement(std::function<Placement(std::shared_ptr<Node>)> placement_policy);
AssignPlacement(
std::vector<std::shared_ptr<ngraph::runtime::Backend>> placement_backends);
private:
bool run_on_node(std::shared_ptr<Node> node) override;
std::vector<std::shared_ptr<ngraph::runtime::Backend>> m_placement_backends;
std::function<Placement(std::shared_ptr<Node>)> m_placement_policy;
};
}
......
......@@ -224,3 +224,186 @@ pair<vector<shared_ptr<Function>>, unordered_map<shared_ptr<op::Parameter>, shar
return make_pair(sub_functions, map_parameter_to_result);
}
static Node* take_independent_node_with_placement_priority_size(
map<size_t, deque<Node*>>& independent_nodes_by_placement, size_t placement)
{
Node* selected_node = nullptr;
if (independent_nodes_by_placement.find(placement) != independent_nodes_by_placement.end() &&
independent_nodes_by_placement.at(placement).size() != 0)
{
selected_node = independent_nodes_by_placement.at(placement).front();
independent_nodes_by_placement.at(placement).pop_front();
}
else
{
for (auto& it : independent_nodes_by_placement)
{
if (it.second.size() > 0)
{
selected_node = it.second.front();
it.second.pop_front();
break;
}
}
}
return selected_node;
}
static vector<unordered_set<shared_ptr<Node>>>
group_function_nodes_to_clusters_size(const shared_ptr<Function>& f)
{
// Topologically sort nodes by picking independent node with the same placement as the
// previously picked node greedily
map<size_t, deque<Node*>> independent_nodes_by_placement;
unordered_map<Node*, size_t> node_dependency_count;
unordered_map<ngraph::Node*, shared_ptr<ngraph::Node>> node_map;
for (shared_ptr<Node> node : f->get_ops())
{
size_t dependency_count = node->get_arguments().size();
node_map[node.get()] = node;
node_dependency_count[node.get()] = dependency_count;
if (dependency_count == 0)
{
independent_nodes_by_placement[node->get_placement_size()].push_back(node.get());
}
}
list<shared_ptr<Node>> sorted_nodes;
size_t previous_placement = 0; // Placement::DEFAULT
while (Node* independent_node = take_independent_node_with_placement_priority_size(
independent_nodes_by_placement, previous_placement))
{
previous_placement = independent_node->get_placement_size();
sorted_nodes.push_back(node_map.at(independent_node));
for (auto user : independent_node->get_users())
{
Node* user_node = user.get();
node_dependency_count.at(user_node) -= 1;
if (node_dependency_count.at(user_node) == 0)
{
independent_nodes_by_placement[user_node->get_placement_size()].push_back(
user_node);
}
}
}
if (sorted_nodes.size() != f->get_ops().size())
{
throw ngraph_error("sorted_nodes.size()== " + to_string(sorted_nodes.size()) +
" != f->get_ops().size()== " + to_string(f->get_ops().size()) +
". Internal error with topological sort.");
}
// Build clusters from the sorted_nodes
previous_placement = 0; // Placement::DEFAULT;
vector<unordered_set<shared_ptr<Node>>> clusters;
for (shared_ptr<Node> node : sorted_nodes)
{
size_t node_placement = node->get_placement_size();
if (node_placement != previous_placement)
{
unordered_set<shared_ptr<Node>> new_cluster;
clusters.push_back(new_cluster);
}
clusters.back().insert(node);
previous_placement = node_placement;
}
// Sanity check for node duplication and full node coverage
unordered_set<shared_ptr<Node>> cluster_nodes;
for (auto cluster : clusters)
{
for (auto node : cluster)
{
if (cluster_nodes.find(node) != cluster_nodes.end())
{
throw ngraph_error("Node " + node->get_name() + " is duplicated in clusters");
}
cluster_nodes.insert(node);
}
}
unordered_set<shared_ptr<Node>> f_nodes;
for (auto node : f->get_ordered_ops())
{
f_nodes.insert(node);
}
if (cluster_nodes != f_nodes)
{
throw ngraph_error(
"Cluster's nodes are not the same as function's nodes. cluster_nodes.size()=" +
to_string(cluster_nodes.size()) + ", f_nodes.size()=" + to_string(f_nodes.size()));
}
return clusters;
}
// Suffix *_size as a part of function name is temporary, this suffix
// will be removed when the backends move to the latest Hybrid backend
pair<vector<shared_ptr<Function>>, unordered_map<shared_ptr<op::Parameter>, shared_ptr<op::Result>>>
ngraph::split_function_by_placement_size(const shared_ptr<Function>& f)
{
// Split functions to clusters of nodes that can be computed together
vector<unordered_set<shared_ptr<Node>>> clusters = group_function_nodes_to_clusters_size(f);
// Map from (intermediate) parameter to result node, for guiding data copy among devices
unordered_map<shared_ptr<op::Parameter>, shared_ptr<op::Result>> map_parameter_to_result;
// Split neighboring nodes if they belong to different clusters
// TODO: optimization to group multiple result node from the same source,
// and to group the parameter node in the same cluster with the same result node source
unordered_map<shared_ptr<Node>, unordered_set<shared_ptr<Node>>*> map_node_to_cluster;
for (auto& cluster : clusters)
{
for (auto node : cluster)
{
map_node_to_cluster[node] = &cluster;
}
}
for (auto dst_node : f->get_ordered_ops())
{
for (auto src_node : dst_node->get_arguments())
{
auto src_cluster = map_node_to_cluster.at(src_node);
auto dst_cluster = map_node_to_cluster.at(dst_node);
if (src_cluster != dst_cluster)
{
// Split src_node and dst_node
pair<shared_ptr<op::Result>, shared_ptr<op::Parameter>> res_par_pair =
insert_result_parameter_split_size(src_node, dst_node);
shared_ptr<op::Result> res_node = res_par_pair.first;
shared_ptr<op::Parameter> par_node = res_par_pair.second;
map_parameter_to_result[par_node] = res_node;
// Insert newly created nodes into clusters
src_cluster->insert(res_node);
dst_cluster->insert(par_node);
}
}
}
// Create functions from clusters
vector<shared_ptr<Function>> sub_functions;
for (auto cluster : clusters)
{
op::ParameterVector par_vector;
ResultVector res_vector;
for (auto node : cluster)
{
if (auto res_node = dynamic_pointer_cast<op::Result>(node))
{
res_vector.push_back(res_node);
}
else if (auto par_node = dynamic_pointer_cast<op::Parameter>(node))
{
par_vector.push_back(par_node);
}
}
auto sub_function = make_shared<Function>(res_vector, par_vector);
sub_functions.push_back(sub_function);
}
return make_pair(sub_functions, map_parameter_to_result);
}
......@@ -51,4 +51,9 @@ namespace ngraph
std::pair<std::vector<std::shared_ptr<Function>>,
std::unordered_map<std::shared_ptr<op::Parameter>, std::shared_ptr<op::Result>>>
split_function_by_placement(const std::shared_ptr<Function>& f);
// Split function to function(s) with unique placement
std::pair<std::vector<std::shared_ptr<Function>>,
std::unordered_map<std::shared_ptr<op::Parameter>, std::shared_ptr<op::Result>>>
split_function_by_placement_size(const std::shared_ptr<Function>& f);
}
......@@ -49,7 +49,7 @@ static runtime::Backend* hybrid1_creator(const char* config)
return new TestBackend(backend_list);
}
TEST(DISABLED_HYBRID, abc)
TEST(HYBRID, abc)
{
const string backend_name = "HYBRID1";
runtime::BackendManager::register_backend(backend_name, hybrid1_creator);
......
......@@ -15,6 +15,9 @@
//*****************************************************************************
#include "hybrid_utils.hpp"
#include "ngraph/ngraph.hpp"
#include "ngraph/pass/assign_placement.hpp"
#include "ngraph/pass/manager.hpp"
using namespace std;
using namespace ngraph;
......@@ -43,22 +46,113 @@ shared_ptr<runtime::Tensor> TestBackend::create_tensor(const element::Type& elem
bool TestBackend::compile(shared_ptr<Function> func)
{
return m_backend_list[0]->compile(func);
if (m_function_map.find(func) == m_function_map.end())
{
// Clone function
FunctionInstance instance;
instance.m_function = clone_function(*func);
// Run placement pass
pass::Manager pass_manager;
pass_manager.register_pass<pass::AssignPlacement>(m_backend_list);
pass_manager.run_passes(instance.m_function);
// Split function to sub_functions
tie(instance.m_sub_functions, instance.m_map_parameter_to_result) =
split_function_by_placement_size(instance.m_function);
m_function_map.insert({func, instance});
// Compile subfunctions in corresponding backends
for (shared_ptr<Function>& sub_function : instance.m_sub_functions)
{
size_t placement = get_colocated_function_placement_size(sub_function);
auto backend =
m_backend_list[(placement - 1)]; // (placement-1) as 0 is default placement
backend->compile(sub_function);
}
}
return true;
}
bool TestBackend::call(shared_ptr<Function> func,
const vector<shared_ptr<runtime::Tensor>>& outputs,
const vector<shared_ptr<runtime::Tensor>>& inputs)
{
throw runtime_error("TestBackend call not supported");
// for (auto backend : m_backend_list)
// {
// if (backend->is_supported(node))
// {
// // backend supports the op
// }
// }
// return true;
// Get FunctionInstance
bool rc = true;
auto it = m_function_map.find(func);
if (it == m_function_map.end())
{
compile(func);
it = m_function_map.find(func);
}
if (it == m_function_map.end())
{
throw runtime_error("Error constructing backend.");
}
FunctionInstance& instance = it->second;
// Parameter and result node in sub_function maps to one Tensor
unordered_map<shared_ptr<Node>, shared_ptr<runtime::Tensor>> map_node_to_tensor_view;
for (size_t i = 0; i < inputs.size(); ++i)
{
map_node_to_tensor_view[instance.m_function->get_parameters()[i]] = inputs[i];
}
for (size_t i = 0; i < outputs.size(); ++i)
{
map_node_to_tensor_view[instance.m_function->get_results()[i]] = outputs[i];
}
// Call subfunctions
for (shared_ptr<Function>& sub_function : instance.m_sub_functions)
{
// Init backend
size_t placement = get_colocated_function_placement_size(sub_function);
auto backend = m_backend_list[(placement - 1)]; // (placement-1) as 0 is default placement
// Prepare parameter TensorViews
vector<shared_ptr<runtime::Tensor>> parameter_tvs;
for (auto parameter_node : sub_function->get_parameters())
{
if (map_node_to_tensor_view.find(parameter_node) != map_node_to_tensor_view.end())
{
parameter_tvs.push_back(map_node_to_tensor_view.at(parameter_node));
}
else
{
auto result_node = instance.m_map_parameter_to_result.at(parameter_node);
auto result_tv = map_node_to_tensor_view.at(result_node);
auto parameter_tv = backend->create_tensor(parameter_node->get_element_type(),
parameter_node->get_shape());
copy_data(parameter_tv, read_vector<float>(result_tv));
map_node_to_tensor_view[parameter_node] = parameter_tv;
parameter_tvs.push_back(parameter_tv);
}
}
// Prepare result TensorViews
vector<shared_ptr<runtime::Tensor>> result_tvs;
for (auto result_node : sub_function->get_results())
{
if (map_node_to_tensor_view.find(result_node) != map_node_to_tensor_view.end())
{
result_tvs.push_back(map_node_to_tensor_view.at(result_node));
}
else
{
auto result_tv = backend->create_tensor(result_node->get_element_type(),
result_node->get_shape());
map_node_to_tensor_view[result_node] = result_tv;
result_tvs.push_back(result_tv);
}
}
// Call
backend->call_with_validate(sub_function, result_tvs, parameter_tvs);
}
return rc;
}
BackendWrapper::BackendWrapper(const string& backend_name,
......
......@@ -17,6 +17,7 @@
#pragma once
#include "ngraph/ngraph.hpp"
#include "ngraph/node.hpp"
#include "ngraph/runtime/backend.hpp"
#include "ngraph/runtime/backend_manager.hpp"
#include "util/all_close.hpp"
......@@ -50,6 +51,19 @@ private:
// This list of backends is in order of priority with the first backend higher priority
// than the second.
std::vector<std::shared_ptr<ngraph::runtime::Backend>> m_backend_list;
protected:
class FunctionInstance
{
public:
std::shared_ptr<ngraph::Function> m_function;
std::vector<std::shared_ptr<ngraph::Function>> m_sub_functions;
std::unordered_map<std::shared_ptr<ngraph::op::Parameter>,
std::shared_ptr<ngraph::op::Result>>
m_map_parameter_to_result;
};
std::map<std::shared_ptr<ngraph::Function>, FunctionInstance> m_function_map;
};
class BackendWrapper : public ngraph::runtime::Backend
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment