Commit dbf3703a authored by Aleksey Marchuk's avatar Aleksey Marchuk Committed by Scott Cyphers

Integration of MLSL library (#1520)

parent 18034315
...@@ -290,6 +290,10 @@ else() ...@@ -290,6 +290,10 @@ else()
endif() endif()
include(cmake/external_tbb.cmake) include(cmake/external_tbb.cmake)
if (NGRAPH_DISTRIBUTED_ENABLE)
include(cmake/external_mlsl.cmake)
endif()
if (NGRAPH_HALIDE) if (NGRAPH_HALIDE)
message(WARNING "Halide build system integration is currently using an older LLVM release \ message(WARNING "Halide build system integration is currently using an older LLVM release \
......
# ******************************************************************************
# Copyright 2017-2018 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
# Enable ExternalProject CMake module
include(ExternalProject)
#------------------------------------------------------------------------------
# Download MLSL
#------------------------------------------------------------------------------
set(MLSL_GIT_URL https://github.com/intel/MLSL)
set(MLSL_GIT_TAG d1bcc74cccdd86cae8841dab67723c811ddbd592)
find_program(MAKE_EXE NAMES gmake nmake make)
ExternalProject_Add(
MLSL
PREFIX MLSL
GIT_REPOSITORY ${MLSL_GIT_URL}
GIT_TAG ${MLSL_GIT_TAG}
UPDATE_COMMAND ""
CONFIGURE_COMMAND ""
BUILD_COMMAND ${MAKE_EXE} -j 1 ENABLE_INTERNAL_ENV_UPDATE=1
INSTALL_COMMAND ${MAKE_EXE} install PREFIX=${EXTERNAL_PROJECTS_ROOT}/MLSL/install
BUILD_IN_SOURCE TRUE
TMP_DIR "${EXTERNAL_PROJECTS_ROOT}/MLSL/tmp"
STAMP_DIR "${EXTERNAL_PROJECTS_ROOT}/MLSL/stamp"
SOURCE_DIR "${EXTERNAL_PROJECTS_ROOT}/MLSL/src"
INSTALL_DIR "${EXTERNAL_PROJECTS_ROOT}/MLSL/install"
EXCLUDE_FROM_ALL TRUE
)
ExternalProject_Get_Property(MLSL SOURCE_DIR)
ExternalProject_Get_Property(MLSL INSTALL_DIR)
add_library(libmlsl INTERFACE)
target_include_directories(libmlsl SYSTEM INTERFACE ${SOURCE_DIR}/include)
target_link_libraries(libmlsl INTERFACE ${INSTALL_DIR}/intel64/lib/thread/libmlsl.so)
link_directories(${INSTALL_DIR}/intel64/lib/thread)
add_dependencies(libmlsl MLSL)
#installation
#mlsl & mpi & fabric libraries
install(DIRECTORY "${INSTALL_DIR}/intel64/lib/thread/"
DESTINATION ${NGRAPH_INSTALL_LIB})
#install mpi binaries
install(DIRECTORY "${INSTALL_DIR}/intel64/bin/thread/"
USE_SOURCE_PERMISSIONS
DESTINATION ${NGRAPH_INSTALL_BIN})
#install mpi tunning data
install(DIRECTORY "${INSTALL_DIR}/intel64/etc/"
DESTINATION ${CMAKE_INSTALL_PREFIX}/etc)
#mlsl header
install(FILES ${SOURCE_DIR}/include/mlsl.hpp
DESTINATION ${NGRAPH_INSTALL_INCLUDE}/ngraph)
...@@ -19,6 +19,7 @@ add_dependencies(mnist_mlp ngraph cpu_backend) ...@@ -19,6 +19,7 @@ add_dependencies(mnist_mlp ngraph cpu_backend)
target_link_libraries(mnist_mlp ngraph cpu_backend) target_link_libraries(mnist_mlp ngraph cpu_backend)
if (NGRAPH_DISTRIBUTED_ENABLE) if (NGRAPH_DISTRIBUTED_ENABLE)
add_executable(dist_mnist_mlp mnist_loader.cpp dist_mnist_mlp.cpp) add_executable(dist_mnist_mlp mnist_loader.cpp dist_mnist_mlp.cpp)
add_dependencies(dist_mnist_mlp ngraph cpu_backend) target_compile_definitions(dist_mnist_mlp PRIVATE NGRAPH_DISTRIBUTED)
target_link_libraries(dist_mnist_mlp ngraph cpu_backend) target_include_directories(dist_mnist_mlp SYSTEM PRIVATE libmlsl)
target_link_libraries(dist_mnist_mlp ngraph cpu_backend libmlsl)
endif() endif()
...@@ -106,7 +106,7 @@ float test_accuracy(MNistDataLoader& loader, ...@@ -106,7 +106,7 @@ float test_accuracy(MNistDataLoader& loader,
static_cast<float>(sample_count); static_cast<float>(sample_count);
} }
int main(int argc, const char* argv[]) int main(int argc, char* argv[])
{ {
ngraph::Distributed dist; ngraph::Distributed dist;
......
...@@ -83,8 +83,8 @@ void MNistLoader::open() ...@@ -83,8 +83,8 @@ void MNistLoader::open()
m_file = fopen(m_filename.c_str(), "rb"); m_file = fopen(m_filename.c_str(), "rb");
if (m_file == nullptr) if (m_file == nullptr)
{ {
throw std::runtime_error(std::string("File") + m_filename + throw std::runtime_error(std::string("File ") + m_filename +
"couldn't be opened. Make sure the file " " couldn't be opened. Make sure the file "
"exists in the current directory"); "exists in the current directory");
} }
read_header(); read_header();
......
...@@ -28,7 +28,7 @@ How? (Generic frameworks) ...@@ -28,7 +28,7 @@ How? (Generic frameworks)
To synchronize gradients across all workers, the essential operation for data To synchronize gradients across all workers, the essential operation for data
parallel training, due to its simplicity and scalability over parameter servers, parallel training, due to its simplicity and scalability over parameter servers,
is “allreduce”. The AllReduce op is one of the nGraph Library’s core ops. To is ``allreduce``. The AllReduce op is one of the nGraph Library’s core ops. To
enable gradient synchronization for a network, we simply inject the AllReduce op enable gradient synchronization for a network, we simply inject the AllReduce op
into the computation graph, connecting the graph for the autodiff computation into the computation graph, connecting the graph for the autodiff computation
and optimizer update (which then becomes part of the nGraph graph). The and optimizer update (which then becomes part of the nGraph graph). The
...@@ -64,7 +64,7 @@ MXNet ...@@ -64,7 +64,7 @@ MXNet
We implemented a KVStore in MXNet\* (KVStore is unique to MXNet) to modify We implemented a KVStore in MXNet\* (KVStore is unique to MXNet) to modify
the SGD update op so the nGraph graph will contain the allreduce op and generate the SGD update op so the nGraph graph will contain the allreduce op and generate
corresponding collective communication kernels for different backends. We are corresponding collective communication kernels for different backends. We are
using OpenMPI for CPU backends and plan to integrate `Intel MLSL`_ in future. using `Intel MLSL`_ for CPU backends.
The figure below shows a bar chart with preliminary results from a Resnet-50 The figure below shows a bar chart with preliminary results from a Resnet-50
I1K training in MXNet 1, 2, 4, (and 8 if available) nodes, x-axis is the number I1K training in MXNet 1, 2, 4, (and 8 if available) nodes, x-axis is the number
......
...@@ -8,10 +8,10 @@ In the :doc:`previous section <../howto/derive-for-training>`, we described the ...@@ -8,10 +8,10 @@ In the :doc:`previous section <../howto/derive-for-training>`, we described the
steps needed to create a "trainable" nGraph model. Here we demonstrate how to steps needed to create a "trainable" nGraph model. Here we demonstrate how to
train a data parallel model by distributing the graph across devices. train a data parallel model by distributing the graph across devices.
To use this mode of training, first install a supported version of `OpenMPI`_
(1.10 or newer).
Next, create an nGraph build with the cmake flag ``-DNGRAPH_DISTRIBUTED_ENABLE=TRUE``.
To use this mode of training, create an nGraph build with the cmake flag
``-DNGRAPH_DISTRIBUTED_ENABLE=TRUE``.
To deploy data-parallel training on backends supported by nGraph API, the To deploy data-parallel training on backends supported by nGraph API, the
``AllReduce`` op should be added after the steps needed to complete the ``AllReduce`` op should be added after the steps needed to complete the
...@@ -25,7 +25,8 @@ To deploy data-parallel training on backends supported by nGraph API, the ...@@ -25,7 +25,8 @@ To deploy data-parallel training on backends supported by nGraph API, the
We need to initialize and finalize distributed training with ``Distributed`` object; We need to initialize and finalize distributed training with ``Distributed`` object;
see the `full raw code`_. see the `full raw code`_.
Finally, to run the training using two nGraph devices, invoke :command:`mpirun`. Finally, to run the training using two nGraph devices, invoke :command:`mpirun` which is a distributed with
`Intel MLSL`_ library.
This will launch two nGraph CPU backends. This will launch two nGraph CPU backends.
...@@ -34,5 +35,5 @@ This will launch two nGraph CPU backends. ...@@ -34,5 +35,5 @@ This will launch two nGraph CPU backends.
$ mpirun -np 2 dist_mnist_mlp $ mpirun -np 2 dist_mnist_mlp
.. _OpenMPI: https://www.open-mpi.org/software/ompi/v3.1 .. _Intel MLSL: https://github.com/intel/MLSL/releases
.. _full raw code: https://github.com/NervanaSystems/ngraph/blob/master/doc/examples/mnist_mlp/dist_mnist_mlp.cpp .. _full raw code: https://github.com/NervanaSystems/ngraph/blob/master/doc/examples/mnist_mlp/dist_mnist_mlp.cpp
...@@ -181,10 +181,10 @@ endif() ...@@ -181,10 +181,10 @@ endif()
add_library(ngraph SHARED ${SRC}) add_library(ngraph SHARED ${SRC})
if(NGRAPH_DISTRIBUTED_ENABLE) if(NGRAPH_DISTRIBUTED_ENABLE)
find_package(MPI REQUIRED) target_sources(ngraph PRIVATE distributed.cpp)
target_compile_definitions(ngraph PRIVATE NGRAPH_DISTRIBUTED) target_compile_definitions(ngraph PRIVATE NGRAPH_DISTRIBUTED)
target_include_directories(ngraph SYSTEM PRIVATE ${MPI_C_INCLUDE_PATH} ${MPI_CXX_INCLUDE_PATH}) target_include_directories(ngraph SYSTEM PRIVATE libmlsl)
target_link_libraries(ngraph PRIVATE ${MPI_C_LIBRARIES} ${MPI_CXX_LIBRARIES}) target_link_libraries(ngraph PRIVATE libmlsl)
endif() endif()
add_subdirectory(frontend) add_subdirectory(frontend)
......
...@@ -48,9 +48,9 @@ if (NGRAPH_GPU_ENABLE OR (NGRAPH_CPU_ENABLE AND NOT NGRAPH_DEX_ONLY)) ...@@ -48,9 +48,9 @@ if (NGRAPH_GPU_ENABLE OR (NGRAPH_CPU_ENABLE AND NOT NGRAPH_DEX_ONLY))
list(APPEND HEADER_SEARCH_DEFINES NGRAPH_HEADERS_PATH="${NGRAPH_INCLUDE_PATH}") list(APPEND HEADER_SEARCH_DEFINES NGRAPH_HEADERS_PATH="${NGRAPH_INCLUDE_PATH}")
if(NGRAPH_DISTRIBUTED_ENABLE) if(NGRAPH_DISTRIBUTED_ENABLE)
find_package(MPI REQUIRED) get_target_property(MLSL_INCLUDE_DIR libmlsl INTERFACE_INCLUDE_DIRECTORIES)
target_compile_definitions(codegen PRIVATE NGRAPH_DISTRIBUTED) list(APPEND HEADER_SEARCH_DEFINES MLSL_HEADER_PATH="${MLSL_INCLUDE_DIR}")
add_definitions(-DMPI_HEADER_PATH="${MPI_PATH}") add_definitions(-DNGRAPH_DISTRIBUTED)
endif() endif()
if(NGRAPH_GPU_ENABLE) if(NGRAPH_GPU_ENABLE)
......
...@@ -473,7 +473,7 @@ void codegen::CompilerCore::configure_search_path() ...@@ -473,7 +473,7 @@ void codegen::CompilerCore::configure_search_path()
#endif #endif
#ifdef NGRAPH_DISTRIBUTED #ifdef NGRAPH_DISTRIBUTED
add_header_search_path(MPI_HEADER_PATH); add_header_search_path(MLSL_HEADER_PATH);
#endif #endif
} }
......
...@@ -16,37 +16,35 @@ ...@@ -16,37 +16,35 @@
#ifdef NGRAPH_DISTRIBUTED #ifdef NGRAPH_DISTRIBUTED
#include <mlsl.hpp>
#include "ngraph/distributed.hpp" #include "ngraph/distributed.hpp"
#include <mpi.h>
using namespace ngraph; using namespace ngraph;
ngraph::Distributed::Distributed() ngraph::Distributed::Distributed()
{ {
int flag = 0; if (!MLSL::Environment::GetEnv().IsInitialized())
MPI_Initialized(&flag);
if (!flag)
{ {
MPI_Init(NULL, NULL); MLSL::Environment::GetEnv().Init(nullptr, nullptr);
} }
} }
ngraph::Distributed::~Distributed() ngraph::Distributed::~Distributed()
{ {
MPI_Finalize(); if (MLSL::Environment::GetEnv().IsInitialized())
{
MLSL::Environment::GetEnv().Finalize();
}
} }
int ngraph::Distributed::get_size() const size_t ngraph::Distributed::get_size() const
{ {
int size; return MLSL::Environment::GetEnv().GetProcessCount();
MPI_Comm_size(MPI_COMM_WORLD, &size);
return size;
} }
int ngraph::Distributed::get_rank() const size_t ngraph::Distributed::get_rank() const
{ {
int rank; return MLSL::Environment::GetEnv().GetProcessIdx();
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
return rank;
} }
#endif #endif
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#pragma once #pragma once
#include <cstddef>
namespace ngraph namespace ngraph
{ {
class Distributed class Distributed
...@@ -23,7 +25,7 @@ namespace ngraph ...@@ -23,7 +25,7 @@ namespace ngraph
public: public:
Distributed(); Distributed();
~Distributed(); ~Distributed();
int get_size() const; size_t get_size() const;
int get_rank() const; size_t get_rank() const;
}; };
} }
...@@ -190,13 +190,9 @@ if (NGRAPH_CPU_ENABLE) ...@@ -190,13 +190,9 @@ if (NGRAPH_CPU_ENABLE)
endif() endif()
if(NGRAPH_DISTRIBUTED_ENABLE) if(NGRAPH_DISTRIBUTED_ENABLE)
find_package(MPI REQUIRED) target_compile_definitions(cpu_backend PRIVATE NGRAPH_DISTRIBUTED)
target_compile_definitions(cpu_backend target_include_directories(cpu_backend SYSTEM PRIVATE libmlsl)
PRIVATE NGRAPH_DISTRIBUTED) target_link_libraries(cpu_backend PRIVATE libmlsl)
target_include_directories(cpu_backend
SYSTEM PRIVATE ${MPI_C_INCLUDE_PATH} ${MPI_CXX_INCLUDE_PATH})
target_link_libraries(cpu_backend
PRIVATE ${MPI_C_LIBRARIES} ${MPI_CXX_LIBRARIES})
endif() endif()
add_dependencies(cpu_backend ext_mkldnn ext_eigen) add_dependencies(cpu_backend ext_mkldnn ext_eigen)
......
...@@ -15,8 +15,9 @@ ...@@ -15,8 +15,9 @@
//***************************************************************************** //*****************************************************************************
#ifdef NGRAPH_DISTRIBUTED #ifdef NGRAPH_DISTRIBUTED
#include <mlsl.hpp>
#include "ngraph/op/allreduce.hpp" #include "ngraph/op/allreduce.hpp"
#include <mpi.h>
#include "ngraph/runtime/cpu/cpu_builder.hpp" #include "ngraph/runtime/cpu/cpu_builder.hpp"
using namespace std; using namespace std;
...@@ -36,21 +37,22 @@ namespace ngraph ...@@ -36,21 +37,22 @@ namespace ngraph
auto& arg_tensor = external_function->get_tensor_data(args[0].get_name()); auto& arg_tensor = external_function->get_tensor_data(args[0].get_name());
auto& out_tensor = external_function->get_tensor_data(out[0].get_name()); auto& out_tensor = external_function->get_tensor_data(out[0].get_name());
auto count = static_cast<int>(out[0].get_size()); auto count = static_cast<int>(out[0].get_size());
auto data_type = MPI_FLOAT; auto data_type = MLSL::DT_FLOAT;
if (args[0].get_element_type() == element::f32) if (args[0].get_element_type() == element::f32)
{ {
data_type = MPI_FLOAT; data_type = MLSL::DT_FLOAT;
} }
else if (args[0].get_element_type() == element::f64) else if (args[0].get_element_type() == element::f64)
{ {
data_type = MPI_DOUBLE; data_type = MLSL::DT_DOUBLE;
} }
auto functor = [&, count, data_type](CPURuntimeContext* ctx, auto functor = [&, count, data_type](CPURuntimeContext* ctx,
CPUExecutionContext* ectx) { CPUExecutionContext* ectx) {
MPI_Allreduce( MLSL::CommReq* req = ctx->mlsl_dist->AllReduce(
arg_tensor, out_tensor, count, data_type, MPI_SUM, MPI_COMM_WORLD); arg_tensor, out_tensor, count, data_type, MLSL::RT_SUM, MLSL::GT_DATA);
ctx->mlsl_env->Wait(req);
}; };
functors.emplace_back(functor); functors.emplace_back(functor);
......
...@@ -103,11 +103,6 @@ ...@@ -103,11 +103,6 @@
#include "ngraph/type/element_type.hpp" #include "ngraph/type/element_type.hpp"
#include "ngraph/util.hpp" #include "ngraph/util.hpp"
#ifdef NGRAPH_DISTRIBUTED
#include <mpi.h>
#include "ngraph/op/allreduce.hpp"
#endif
using namespace std; using namespace std;
using namespace ngraph; using namespace ngraph;
......
...@@ -22,6 +22,10 @@ ...@@ -22,6 +22,10 @@
#include "ngraph/runtime/cpu/cpu_tensor_view.hpp" #include "ngraph/runtime/cpu/cpu_tensor_view.hpp"
#include "ngraph/runtime/cpu/cpu_tracing.hpp" #include "ngraph/runtime/cpu/cpu_tracing.hpp"
#ifdef NGRAPH_DISTRIBUTED
#include <mlsl.hpp>
#endif
using namespace std; using namespace std;
using namespace ngraph; using namespace ngraph;
...@@ -139,6 +143,12 @@ void runtime::cpu::CPU_CallFrame::setup_runtime_context() ...@@ -139,6 +143,12 @@ void runtime::cpu::CPU_CallFrame::setup_runtime_context()
const auto parallelism = envParallelism == nullptr ? 1 : std::atoi(envParallelism); const auto parallelism = envParallelism == nullptr ? 1 : std::atoi(envParallelism);
ctx->c = new tbb::global_control(tbb::global_control::max_allowed_parallelism, parallelism); ctx->c = new tbb::global_control(tbb::global_control::max_allowed_parallelism, parallelism);
} }
#ifdef NGRAPH_DISTRIBUTED
NGRAPH_ASSERT(MLSL::Environment::GetEnv().IsInitialized());
ctx->mlsl_env = &MLSL::Environment::GetEnv();
ctx->mlsl_dist = ctx->mlsl_env->CreateDistribution(ctx->mlsl_env->GetProcessCount(), 1);
#endif
} }
void runtime::cpu::CPU_CallFrame::cleanup_runtime_context() void runtime::cpu::CPU_CallFrame::cleanup_runtime_context()
...@@ -165,5 +175,11 @@ void runtime::cpu::CPU_CallFrame::cleanup_runtime_context() ...@@ -165,5 +175,11 @@ void runtime::cpu::CPU_CallFrame::cleanup_runtime_context()
} }
delete ctx->c; delete ctx->c;
} }
#ifdef NGRAPH_DISTRIBUTED
if (MLSL::Environment::GetEnv().IsInitialized() && ctx->mlsl_dist != nullptr)
{
ctx->mlsl_env->DeleteDistribution(ctx->mlsl_dist);
}
#endif
delete ctx; delete ctx;
} }
...@@ -126,7 +126,8 @@ ...@@ -126,7 +126,8 @@
#include "ngraph/util.hpp" #include "ngraph/util.hpp"
#ifdef NGRAPH_DISTRIBUTED #ifdef NGRAPH_DISTRIBUTED
#include <mpi.h> #include <mlsl.hpp>
#include "ngraph/op/allreduce.hpp" #include "ngraph/op/allreduce.hpp"
#endif #endif
...@@ -221,21 +222,22 @@ namespace ngraph ...@@ -221,21 +222,22 @@ namespace ngraph
void CPU_Emitter::EMITTER_DECL(ngraph::op::AllReduce) void CPU_Emitter::EMITTER_DECL(ngraph::op::AllReduce)
{ {
const element::Type& element_type = args[0].get_element_type(); const element::Type& element_type = args[0].get_element_type();
auto data_type = "MPI_FLOAT"; auto data_type = "MLSL::DT_FLOAT";
if (element_type == element::f32) if (element_type == element::f32)
{ {
data_type = "MPI_FLOAT"; data_type = "MLSL::DT_FLOAT";
} }
else if (element_type == element::f64) else if (element_type == element::f64)
{ {
data_type = "MPI_DOUBLE"; data_type = "MLSL::DT_DOUBLE";
} }
writer.block_begin(); writer.block_begin();
writer << "MPI_Allreduce(" << args[0].get_name() << ", " << out[0].get_name() writer << "MLSL::CommReq* req = ctx->mlsl_dist->AllReduce(" << args[0].get_name()
<< ", " << out[0].get_size() << ", " << data_type << ", " << out[0].get_name() << ", " << out[0].get_size() << ", "
<< ", MPI_SUM, MPI_COMM_WORLD);\n"; << data_type << ", MLSL::RT_SUM, MLSL::GT_DATA);\n";
writer << "ctx->mlsl_env->Wait(req);\n";
writer.block_end(); writer.block_end();
} }
#endif #endif
......
...@@ -476,6 +476,11 @@ void runtime::cpu::CPU_ExternalFunction::compile() ...@@ -476,6 +476,11 @@ void runtime::cpu::CPU_ExternalFunction::compile()
writer << "#include <tbb/flow_graph.h>"; writer << "#include <tbb/flow_graph.h>";
} }
#ifdef NGRAPH_DISTRIBUTED
writer << "#include <mlsl.hpp>\n";
writer << "#define NGRAPH_DISTRIBUTED\n";
#endif
writer += writer +=
R"( R"(
#include <cmath> #include <cmath>
...@@ -529,10 +534,6 @@ using namespace ngraph::runtime; ...@@ -529,10 +534,6 @@ using namespace ngraph::runtime;
)"; )";
#ifdef NGRAPH_DISTRIBUTED
writer << "#include <mpi.h>\n\n";
#endif
string pch_header_source = writer.get_code(); string pch_header_source = writer.get_code();
// The "dso_handle" symbol is required by __cxa_atexit() // The "dso_handle" symbol is required by __cxa_atexit()
......
...@@ -26,6 +26,10 @@ ...@@ -26,6 +26,10 @@
#include <tbb/global_control.h> #include <tbb/global_control.h>
#include <tbb/task_scheduler_init.h> #include <tbb/task_scheduler_init.h>
#ifdef NGRAPH_DISTRIBUTED
#include <mlsl.hpp>
#endif
namespace mkldnn namespace mkldnn
{ {
class primitive; class primitive;
...@@ -65,6 +69,10 @@ namespace ngraph ...@@ -65,6 +69,10 @@ namespace ngraph
State* const* states; State* const* states;
std::set<size_t> breakpoints; std::set<size_t> breakpoints;
size_t pc; size_t pc;
#ifdef NGRAPH_DISTRIBUTED
MLSL::Environment* mlsl_env;
MLSL::Distribution* mlsl_dist;
#endif
}; };
} }
......
...@@ -24,18 +24,14 @@ if (NGRAPH_INTERPRETER_ENABLE) ...@@ -24,18 +24,14 @@ if (NGRAPH_INTERPRETER_ENABLE)
target_link_libraries(interpreter_backend PUBLIC ngraph) target_link_libraries(interpreter_backend PUBLIC ngraph)
set_target_properties(interpreter_backend PROPERTIES LIBRARY_OUTPUT_DIRECTORY ${NGRAPH_BUILD_DIR}) set_target_properties(interpreter_backend PROPERTIES LIBRARY_OUTPUT_DIRECTORY ${NGRAPH_BUILD_DIR})
if(NGRAPH_DISTRIBUTED_ENABLE)
target_compile_definitions(interpreter_backend PRIVATE NGRAPH_DISTRIBUTED)
target_include_directories(interpreter_backend SYSTEM PRIVATE libmlsl)
target_link_libraries(interpreter_backend PRIVATE libmlsl)
endif()
install(TARGETS interpreter_backend install(TARGETS interpreter_backend
LIBRARY DESTINATION "${NGRAPH_INSTALL_LIB}" LIBRARY DESTINATION "${NGRAPH_INSTALL_LIB}"
ARCHIVE DESTINATION "${NGRAPH_INSTALL_LIB}" ARCHIVE DESTINATION "${NGRAPH_INSTALL_LIB}"
) )
if(NGRAPH_DISTRIBUTED_ENABLE)
find_package(MPI REQUIRED)
target_compile_definitions(interpreter_backend
PRIVATE NGRAPH_DISTRIBUTED)
target_include_directories(interpreter_backend
SYSTEM PRIVATE ${MPI_C_INCLUDE_PATH} ${MPI_CXX_INCLUDE_PATH})
target_link_libraries(interpreter_backend
PRIVATE ${MPI_C_LIBRARIES} ${MPI_CXX_LIBRARIES})
endif()
endif() endif()
...@@ -254,7 +254,7 @@ private: ...@@ -254,7 +254,7 @@ private:
} }
case OP_TYPEID::AllReduce: { case OP_TYPEID::AllReduce: {
#ifdef NGRAPH_DISTRIBUTED #ifdef NGRAPH_DISTRIBUTED
reference::allreduce<T>(static_cast<const T*>(args[0]), reference::allreduce<T>(static_cast<T*>(const_cast<void*>(args[0])),
static_cast<T*>(out[0]), static_cast<T*>(out[0]),
node.get_input_element_type(0), node.get_input_element_type(0),
static_cast<int>(shape_size(node.get_input_shape(0)))); static_cast<int>(shape_size(node.get_input_shape(0))));
......
...@@ -18,7 +18,8 @@ ...@@ -18,7 +18,8 @@
#ifdef NGRAPH_DISTRIBUTED #ifdef NGRAPH_DISTRIBUTED
#include <mpi.h> #include <mlsl.hpp>
#include "ngraph/type/element_type.hpp" #include "ngraph/type/element_type.hpp"
namespace ngraph namespace ngraph
...@@ -28,20 +29,29 @@ namespace ngraph ...@@ -28,20 +29,29 @@ namespace ngraph
namespace reference namespace reference
{ {
template <typename T> template <typename T>
void allreduce(const T* arg, T* out, const element::Type element_type, int count) void allreduce(T* arg, T* out, const element::Type element_type, int count)
{ {
auto data_type = MPI_FLOAT; auto data_type = MLSL::DT_FLOAT;
if (element_type == element::f32) if (element_type == element::f32)
{ {
data_type = MPI_FLOAT; data_type = MLSL::DT_FLOAT;
} }
else if (element_type == element::f64) else if (element_type == element::f64)
{ {
data_type = MPI_DOUBLE; data_type = MLSL::DT_DOUBLE;
}
else
{
throw std::runtime_error("AllReduce op supports only f32 and f64 types");
} }
MPI_Allreduce(arg, out, count, data_type, MPI_SUM, MPI_COMM_WORLD); MLSL::Environment& env = MLSL::Environment::GetEnv();
MLSL::Distribution* distribution = env.CreateDistribution(env.GetProcessCount(), 1);
MLSL::CommReq* req = distribution->AllReduce(
arg, out, count, data_type, MLSL::RT_SUM, MLSL::GT_DATA);
env.Wait(req);
env.DeleteDistribution(distribution);
} }
} }
} }
......
...@@ -46,4 +46,9 @@ if (NGRAPH_PLAIDML_ENABLE) ...@@ -46,4 +46,9 @@ if (NGRAPH_PLAIDML_ENABLE)
target_link_libraries(nbench plaidml_backend) target_link_libraries(nbench plaidml_backend)
endif() endif()
if (NGRAPH_DISTRIBUTED_ENABLE)
target_compile_definitions(nbench PRIVATE NGRAPH_DISTRIBUTED)
target_link_libraries(nbench libmlsl)
endif()
install(TARGETS nbench RUNTIME DESTINATION ${NGRAPH_INSTALL_BIN}) install(TARGETS nbench RUNTIME DESTINATION ${NGRAPH_INSTALL_BIN})
...@@ -33,6 +33,10 @@ ...@@ -33,6 +33,10 @@
#include "ngraph/serializer.hpp" #include "ngraph/serializer.hpp"
#include "ngraph/util.hpp" #include "ngraph/util.hpp"
#ifdef NGRAPH_DISTRIBUTED
#include "ngraph/distributed.hpp"
#endif
using namespace std; using namespace std;
using namespace ngraph; using namespace ngraph;
...@@ -294,6 +298,10 @@ OPTIONS ...@@ -294,6 +298,10 @@ OPTIONS
return 1; return 1;
} }
#ifdef NGRAPH_DISTRIBUTED
ngraph::Distributed dist;
#endif
vector<string> models; vector<string> models;
if (!directory.empty()) if (!directory.empty())
{ {
......
...@@ -178,11 +178,8 @@ if(NGRAPH_ADDRESS_SANITIZER) ...@@ -178,11 +178,8 @@ if(NGRAPH_ADDRESS_SANITIZER)
endif() endif()
if(NGRAPH_DISTRIBUTED_ENABLE) if(NGRAPH_DISTRIBUTED_ENABLE)
find_package(MPI REQUIRED)
target_compile_definitions(unit-test PRIVATE NGRAPH_DISTRIBUTED) target_compile_definitions(unit-test PRIVATE NGRAPH_DISTRIBUTED)
target_include_directories(unit-test target_link_libraries(unit-test PRIVATE libmlsl)
SYSTEM PRIVATE ${MPI_C_INCLUDE_PATH} ${MPI_CXX_INCLUDE_PATH})
target_link_libraries(unit-test PRIVATE ${MPI_C_LIBRARIES} ${MPI_CXX_LIBRARIES})
endif() endif()
target_link_libraries(unit-test PRIVATE ngraph_test_util) target_link_libraries(unit-test PRIVATE ngraph_test_util)
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include <fstream> #include <fstream>
#include <sstream> #include <sstream>
#include <mpi.h> #include <mlsl.hpp>
#include "gtest/gtest.h" #include "gtest/gtest.h"
...@@ -36,9 +36,7 @@ TEST(distributed_${BACKEND_NAME}, allreduce) ...@@ -36,9 +36,7 @@ TEST(distributed_${BACKEND_NAME}, allreduce)
auto f = make_shared<Function>(make_shared<op::AllReduce>(A), ParameterVector{A}); auto f = make_shared<Function>(make_shared<op::AllReduce>(A), ParameterVector{A});
auto backend = runtime::Backend::create("${BACKEND_NAME}"); auto backend = runtime::Backend::create("${BACKEND_NAME}");
int comm_size; auto comm_size = MLSL::Environment::GetEnv().GetProcessCount();
MPI_Comm_size(MPI_COMM_WORLD, &comm_size);
auto v = vector<float>{1, 2, 3, 4}; auto v = vector<float>{1, 2, 3, 4};
auto a = backend->create_tensor(element::f32, shape); auto a = backend->create_tensor(element::f32, shape);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment