Unverified Commit 363cfafa authored by Robert Kimball's avatar Robert Kimball Committed by GitHub

Remove distributed implementations (#4380)

* Remove distributed

* Remove mlsl as external project
Co-authored-by: 's avatarSang Ik Lee <sang.ik.lee@intel.com>
Co-authored-by: 's avatarraramer01 <rebecca.a.ramer@gmail.com>
parent 77941168
...@@ -133,7 +133,6 @@ option(NGRAPH_CODE_COVERAGE_ENABLE "Enable code coverage data collection" FALSE) ...@@ -133,7 +133,6 @@ option(NGRAPH_CODE_COVERAGE_ENABLE "Enable code coverage data collection" FALSE)
option(NGRAPH_LIB_VERSIONING_ENABLE "Enable shared library versioning" FALSE) option(NGRAPH_LIB_VERSIONING_ENABLE "Enable shared library versioning" FALSE)
option(NGRAPH_PYTHON_BUILD_ENABLE "Enable build nGraph python package wheel" FALSE) option(NGRAPH_PYTHON_BUILD_ENABLE "Enable build nGraph python package wheel" FALSE)
option(NGRAPH_PLAIDML_ENABLE "Enable the PlaidML backend" ${PLAIDML_FOUND}) option(NGRAPH_PLAIDML_ENABLE "Enable the PlaidML backend" ${PLAIDML_FOUND})
option(NGRAPH_DISTRIBUTED_ENABLE "Enable distributed training using MLSL/OpenMPI" OFF)
option(NGRAPH_FAST_MATH_ENABLE "Enable fast math" ON) option(NGRAPH_FAST_MATH_ENABLE "Enable fast math" ON)
option(NGRAPH_JSON_ENABLE "Enable JSON based serialization and tracing features" TRUE) option(NGRAPH_JSON_ENABLE "Enable JSON based serialization and tracing features" TRUE)
option(NGRAPH_STATIC_LIB_ENABLE "Enable build nGraph as a static library" FALSE) option(NGRAPH_STATIC_LIB_ENABLE "Enable build nGraph as a static library" FALSE)
...@@ -164,24 +163,6 @@ if (NGRAPH_MLIR_ENABLE AND (NOT NGRAPH_DEX_ONLY)) ...@@ -164,24 +163,6 @@ if (NGRAPH_MLIR_ENABLE AND (NOT NGRAPH_DEX_ONLY))
"Use -DNGRAPH_DEX_ONLY=ON and try again. \n") "Use -DNGRAPH_DEX_ONLY=ON and try again. \n")
endif() endif()
if (NGRAPH_DISTRIBUTED_ENABLE)
if ("${NGRAPH_DISTRIBUTED_ENABLE}" STREQUAL "MLSL")
if (NGRAPH_INTEL_CPU_ONLY_ENABLE)
set(NGRAPH_DISTRIBUTED_MLSL_ENABLE TRUE)
else()
message(FATAL_ERROR
"-DNGRAPH_DISTRIBUTED_ENABLE=MLSL to be used, if Intel CPU is the only backend enabled.\n"
"Use -DNGRAPH_DISTRIBUTED_ENABLE=OMPI for all other situations.\n")
endif()
elseif("${NGRAPH_DISTRIBUTED_ENABLE}" STREQUAL "OMPI")
set(NGRAPH_DISTRIBUTED_OMPI_ENABLE TRUE)
else()
message(FATAL_ERROR
"Invalid arguments passed to NGRAPH_DISTRIBUTED_ENABLE, must select one of MLSL, OMPI or OFF.\n"
"If using Intel CPU only backend, recommend Intel MLSL by setting -DNGRAPH_DISTRIBUTED_ENABLE=MLSL .\n")
endif()
endif()
if (NGRAPH_ONNX_IMPORT_ENABLE) if (NGRAPH_ONNX_IMPORT_ENABLE)
option(NGRAPH_USE_SYSTEM_PROTOBUF "Use system provided Protobuf shared object" FALSE) option(NGRAPH_USE_SYSTEM_PROTOBUF "Use system provided Protobuf shared object" FALSE)
endif() endif()
...@@ -246,7 +227,6 @@ message(STATUS "NGRAPH_CPU_STATIC_LIB_ENABLE: ${NGRAPH_CPU_STATIC_LIB_EN ...@@ -246,7 +227,6 @@ message(STATUS "NGRAPH_CPU_STATIC_LIB_ENABLE: ${NGRAPH_CPU_STATIC_LIB_EN
message(STATUS "NGRAPH_DEBUG_ENABLE: ${NGRAPH_DEBUG_ENABLE}") message(STATUS "NGRAPH_DEBUG_ENABLE: ${NGRAPH_DEBUG_ENABLE}")
message(STATUS "NGRAPH_DEPRECATED_ENABLE: ${NGRAPH_DEPRECATED_ENABLE}") message(STATUS "NGRAPH_DEPRECATED_ENABLE: ${NGRAPH_DEPRECATED_ENABLE}")
message(STATUS "NGRAPH_DEX_ONLY: ${NGRAPH_DEX_ONLY}") message(STATUS "NGRAPH_DEX_ONLY: ${NGRAPH_DEX_ONLY}")
message(STATUS "NGRAPH_DISTRIBUTED_ENABLE: ${NGRAPH_DISTRIBUTED_ENABLE}")
message(STATUS "NGRAPH_DOC_BUILD_ENABLE: ${NGRAPH_DOC_BUILD_ENABLE}") message(STATUS "NGRAPH_DOC_BUILD_ENABLE: ${NGRAPH_DOC_BUILD_ENABLE}")
message(STATUS "NGRAPH_DYNAMIC_COMPONENTS_ENABLE: ${NGRAPH_DYNAMIC_COMPONENTS_ENABLE}") message(STATUS "NGRAPH_DYNAMIC_COMPONENTS_ENABLE: ${NGRAPH_DYNAMIC_COMPONENTS_ENABLE}")
message(STATUS "NGRAPH_ENABLE_CPU_CONV_AUTO: ${NGRAPH_ENABLE_CPU_CONV_AUTO}") message(STATUS "NGRAPH_ENABLE_CPU_CONV_AUTO: ${NGRAPH_ENABLE_CPU_CONV_AUTO}")
...@@ -479,15 +459,6 @@ if (NGRAPH_PLAIDML_ENABLE) ...@@ -479,15 +459,6 @@ if (NGRAPH_PLAIDML_ENABLE)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DNGRAPH_PlaidML_ENABLE") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DNGRAPH_PlaidML_ENABLE")
endif() endif()
if (NGRAPH_DISTRIBUTED_ENABLE)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DNGRAPH_DISTRIBUTED_ENABLE")
if (NGRAPH_DISTRIBUTED_MLSL_ENABLE)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DNGRAPH_DISTRIBUTED_MLSL_ENABLE")
elseif (NGRAPH_DISTRIBUTED_OMPI_ENABLE)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DNGRAPH_DISTRIBUTED_OMPI_ENABLE")
endif()
endif()
if (NOT DEFINED NGRAPH_TBB_ENABLE) if (NOT DEFINED NGRAPH_TBB_ENABLE)
set(NGRAPH_TBB_ENABLE ${NGRAPH_CPU_ENABLE}) set(NGRAPH_TBB_ENABLE ${NGRAPH_CPU_ENABLE})
endif() endif()
...@@ -642,10 +613,6 @@ if(NGRAPH_TBB_ENABLE) ...@@ -642,10 +613,6 @@ if(NGRAPH_TBB_ENABLE)
endif() endif()
endif() endif()
if (NGRAPH_DISTRIBUTED_MLSL_ENABLE)
include(cmake/external_mlsl.cmake)
endif()
add_subdirectory(src) add_subdirectory(src)
if (NGRAPH_TEST_UTIL_ENABLE) if (NGRAPH_TEST_UTIL_ENABLE)
......
# ******************************************************************************
# Copyright 2017-2020 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
# Enable ExternalProject CMake module
include(ExternalProject)
#------------------------------------------------------------------------------
# Download MLSL
#------------------------------------------------------------------------------
set(MLSL_GIT_URL https://github.com/intel/MLSL)
set(MLSL_GIT_TAG 98a683cb861514259480aff2e54c8fce4bec67e5)
find_program(MAKE_EXE NAMES gmake nmake make)
ExternalProject_Add(
MLSL
PREFIX MLSL
GIT_REPOSITORY ${MLSL_GIT_URL}
GIT_TAG ${MLSL_GIT_TAG}
UPDATE_COMMAND ""
CONFIGURE_COMMAND ""
BUILD_COMMAND ${MAKE_EXE} -j 1 ENABLE_INTERNAL_ENV_UPDATE=1
INSTALL_COMMAND ${MAKE_EXE} install PREFIX=${EXTERNAL_PROJECTS_ROOT}/MLSL/install
BUILD_IN_SOURCE TRUE
TMP_DIR "${EXTERNAL_PROJECTS_ROOT}/MLSL/tmp"
STAMP_DIR "${EXTERNAL_PROJECTS_ROOT}/MLSL/stamp"
SOURCE_DIR "${EXTERNAL_PROJECTS_ROOT}/MLSL/src"
INSTALL_DIR "${EXTERNAL_PROJECTS_ROOT}/MLSL/install"
EXCLUDE_FROM_ALL TRUE
)
add_library(libmlsl INTERFACE)
ExternalProject_Get_Property(MLSL SOURCE_DIR)
ExternalProject_Get_Property(MLSL INSTALL_DIR)
set(MLSL_LIB_DIR ${INSTALL_DIR}/intel64/lib/thread)
set(MLSL_LIB ${CMAKE_SHARED_LIBRARY_PREFIX}mlsl${CMAKE_SHARED_LIBRARY_SUFFIX})
set(MPI_LIB ${CMAKE_SHARED_LIBRARY_PREFIX}mpi${CMAKE_SHARED_LIBRARY_SUFFIX})
set(FABRIC_LIB ${CMAKE_SHARED_LIBRARY_PREFIX}fabric${CMAKE_SHARED_LIBRARY_SUFFIX})
ExternalProject_Add_Step(
MLSL
CopyMLSL
COMMAND ${CMAKE_COMMAND} -E copy_directory ${MLSL_LIB_DIR} ${NGRAPH_LIBRARY_OUTPUT_DIRECTORY}
COMMENT "Copy mlsl runtime libraries to ngraph build directory."
DEPENDEES install
)
target_include_directories(libmlsl SYSTEM INTERFACE ${SOURCE_DIR}/include)
set(MLSL_LINK_LIBRARIES
${NGRAPH_LIBRARY_OUTPUT_DIRECTORY}/${MLSL_LIB}
${NGRAPH_LIBRARY_OUTPUT_DIRECTORY}/${MPI_LIB}
${NGRAPH_LIBRARY_OUTPUT_DIRECTORY}/${FABRIC_LIB})
target_link_libraries(libmlsl PRIVATE INTERFACE ${MLSL_LINK_LIBRARIES})
add_dependencies(libmlsl MLSL)
#installation
#mlsl & mpi & fabric libraries
install(DIRECTORY "${INSTALL_DIR}/intel64/lib/thread/"
DESTINATION ${NGRAPH_INSTALL_LIB})
#install mpi binaries
install(DIRECTORY "${INSTALL_DIR}/intel64/bin/thread/"
USE_SOURCE_PERMISSIONS
DESTINATION ${NGRAPH_INSTALL_BIN})
#install mpi tunning data
install(DIRECTORY "${INSTALL_DIR}/intel64/etc/"
DESTINATION ${CMAKE_INSTALL_PREFIX}/etc)
#mlsl header
install(FILES ${SOURCE_DIR}/include/mlsl.hpp
DESTINATION ${NGRAPH_INSTALL_INCLUDE}/ngraph)
...@@ -58,19 +58,6 @@ if [ "$(echo ${CMD_TO_RUN} | grep build | wc -l)" != "0" ] ; then ...@@ -58,19 +58,6 @@ if [ "$(echo ${CMD_TO_RUN} | grep build | wc -l)" != "0" ] ; then
chmod ug+rwx ${OUTPUT_DIR} chmod ug+rwx ${OUTPUT_DIR}
fi fi
#TODO: add openmpi dependency to enable building with -DNGRAPH_DISTRIBUTED_ENABLE=TRUE
#
#if [ -z ${NGRAPH_DISTRIBUTED_ENABLE} ] ; then
# NGRAPH_DISTRIBUTED_ENABLE=false
#fi
#if $NGRAPH_DISTRIBUTED_ENABLE; then
# source /home/environment-openmpi-ci.source
# which mpirun
# mpirun --version
# export CMAKE_OPTIONS_EXTRA="-DNGRAPH_DISTRIBUTED_ENABLE=$NGRAPH_DISTRIBUTED_ENABLE"
#fi
GCC_VERSION=` gcc --version | grep gcc | cut -f 2 -d ')' | cut -f 2 -d ' ' | cut -f 1,2 -d '.'` GCC_VERSION=` gcc --version | grep gcc | cut -f 2 -d ')' | cut -f 2 -d ' ' | cut -f 1,2 -d '.'`
# Set the -DNGRAPH_USE_PREBUILT_LLVM=TRUE for appropriate build environments # Set the -DNGRAPH_USE_PREBUILT_LLVM=TRUE for appropriate build environments
......
...@@ -19,6 +19,5 @@ add_dependencies(mnist_mlp ngraph cpu_backend) ...@@ -19,6 +19,5 @@ add_dependencies(mnist_mlp ngraph cpu_backend)
target_link_libraries(mnist_mlp ngraph cpu_backend) target_link_libraries(mnist_mlp ngraph cpu_backend)
add_executable(dist_mnist_mlp mnist_loader.cpp dist_mnist_mlp.cpp) add_executable(dist_mnist_mlp mnist_loader.cpp dist_mnist_mlp.cpp)
target_compile_definitions(dist_mnist_mlp PRIVATE NGRAPH_DISTRIBUTED_ENABLE)
add_dependencies(dist_mnist_mlp ngraph cpu_backend) add_dependencies(dist_mnist_mlp ngraph cpu_backend)
target_link_libraries(dist_mnist_mlp ngraph cpu_backend) target_link_libraries(dist_mnist_mlp ngraph cpu_backend)
...@@ -25,8 +25,6 @@ Compile Flags ...@@ -25,8 +25,6 @@ Compile Flags
``NGRAPH_DEBUG_ENABLE``, Enable output for ``NGRAPH_DEBUG`` statements, ``FALSE`` ``NGRAPH_DEBUG_ENABLE``, Enable output for ``NGRAPH_DEBUG`` statements, ``FALSE``
``NGRAPH_DEPRECATED_ENABLE``, Enable compiler deprecation pragmas for deprecated APIs (recommended only for development use), ``FALSE`` ``NGRAPH_DEPRECATED_ENABLE``, Enable compiler deprecation pragmas for deprecated APIs (recommended only for development use), ``FALSE``
``NGRAPH_DEX_ONLY``, Build CPU DEX without codegen, ``FALSE`` ``NGRAPH_DEX_ONLY``, Build CPU DEX without codegen, ``FALSE``
``NGRAPH_DISTRIBUTED_ENABLE``, Enable distributed training using MLSL/OpenMPI, ``OFF``
``NGRAPH_DISTRIBUTED_MLSL_ENABLE``, Use MLSL, ``OFF``
``NGRAPH_DOC_BUILD_ENABLE``, Automatically build documentation, ``OFF`` ``NGRAPH_DOC_BUILD_ENABLE``, Automatically build documentation, ``OFF``
``NGRAPH_FAST_MATH_ENABLE``, Enable fast math, ``ON`` ``NGRAPH_FAST_MATH_ENABLE``, Enable fast math, ``ON``
``NGRAPH_HALIDE``, ,``OFF`` ``NGRAPH_HALIDE``, ,``OFF``
......
...@@ -668,19 +668,6 @@ if(NGRAPH_CPU_STATIC_LIB_ENABLE) ...@@ -668,19 +668,6 @@ if(NGRAPH_CPU_STATIC_LIB_ENABLE)
target_compile_definitions(ngraph PUBLIC NGRAPH_CPU_STATIC_LIB_ENABLE) target_compile_definitions(ngraph PUBLIC NGRAPH_CPU_STATIC_LIB_ENABLE)
endif() endif()
if(NGRAPH_DISTRIBUTED_ENABLE)
if(NGRAPH_DISTRIBUTED_MLSL_ENABLE)
target_include_directories(ngraph SYSTEM PRIVATE libmlsl)
target_link_libraries(ngraph PRIVATE libmlsl)
elseif(NGRAPH_DISTRIBUTED_OMPI_ENABLE)
find_package(MPI REQUIRED)
target_include_directories(ngraph SYSTEM PRIVATE ${MPI_C_INCLUDE_PATH} ${MPI_CXX_INCLUDE_PATH})
target_link_libraries(ngraph PRIVATE ${MPI_C_LIBRARIES} ${MPI_CXX_LIBRARIES})
else()
message(FATAL_ERROR "Distributed Library not supported/mentioned")
endif()
endif()
add_subdirectory(frontend) add_subdirectory(frontend)
find_package(Graphviz QUIET) find_package(Graphviz QUIET)
......
...@@ -15,9 +15,7 @@ ...@@ -15,9 +15,7 @@
//***************************************************************************** //*****************************************************************************
#include "ngraph/distributed.hpp" #include "ngraph/distributed.hpp"
#include "ngraph/distributed/mlsl.hpp"
#include "ngraph/distributed/null.hpp" #include "ngraph/distributed/null.hpp"
#include "ngraph/distributed/open_mpi.hpp"
#include "ngraph/log.hpp" #include "ngraph/log.hpp"
#include "ngraph/type.hpp" #include "ngraph/type.hpp"
...@@ -56,16 +54,8 @@ DistributedInterface* ngraph::get_distributed_interface() ...@@ -56,16 +54,8 @@ DistributedInterface* ngraph::get_distributed_interface()
{ {
if (nullptr == s_distributed_interface) if (nullptr == s_distributed_interface)
{ {
#ifdef NGRAPH_DISTRIBUTED_OMPI_ENABLE
set_distributed_interface(std::unique_ptr<DistributedInterface>(
new ngraph::distributed::OpenMPIDistributedInterface()));
#elif defined(NGRAPH_DISTRIBUTED_MLSL_ENABLE)
set_distributed_interface(std::unique_ptr<DistributedInterface>(
new ngraph::distributed::MLSLDistributedInterface()));
#else
set_distributed_interface(std::unique_ptr<DistributedInterface>( set_distributed_interface(std::unique_ptr<DistributedInterface>(
new ngraph::distributed::NullDistributedInterface())); new ngraph::distributed::NullDistributedInterface()));
#endif
} }
return s_distributed_interface.get(); return s_distributed_interface.get();
} }
//*****************************************************************************
// Copyright 2017-2020 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//*****************************************************************************
#pragma once
#ifdef NGRAPH_DISTRIBUTED_MLSL_ENABLE
#include <string>
#include <mlsl.hpp>
#include "ngraph/distributed.hpp"
namespace ngraph
{
namespace distributed
{
class MLSLDistributedInterface : public DistributedInterface
{
public:
MLSLDistributedInterface(const std::string& name = "MLSL")
: m_name(name)
{
if (!MLSL::Environment::GetEnv().IsInitialized() && !m_initialized_mlsl)
{
MLSL::Environment::GetEnv().Init(nullptr, nullptr);
m_initialized_mlsl = true;
}
}
~MLSLDistributedInterface() override
{
if (MLSL::Environment::GetEnv().IsInitialized() && m_initialized_mlsl)
{
MLSL::Environment::GetEnv().Finalize();
m_initialized_mlsl = false;
}
}
const std::string& get_name() const override { return m_name; }
int get_size() override
{
return static_cast<int>(MLSL::Environment::GetEnv().GetProcessCount());
}
int get_rank() override
{
return static_cast<int>(MLSL::Environment::GetEnv().GetProcessIdx());
}
void log_print(const std::string& timestamp, const std::vector<char>& buf) override
{
std::printf("%s [MLSL RANK: %d]: %s\n", timestamp.c_str(), get_rank(), buf.data());
}
void all_reduce(void* in,
void* out,
element::Type_t element_type,
reduction::Type reduce_type,
size_t count) override
{
auto data_type = MLSL::DT_FLOAT;
if (element_type == element::Type_t::f32)
{
data_type = MLSL::DT_FLOAT;
}
else if (element_type == element::Type_t::f64)
{
data_type = MLSL::DT_DOUBLE;
}
else
{
throw std::runtime_error("AllReduce op supports only f32 and f64 types");
}
decltype(MLSL::RT_SUM) mlsl_reduce_type;
#if defined(__GNUC__) && !(__GNUC__ == 4 && __GNUC_MINOR__ == 8)
#pragma GCC diagnostic push
#pragma GCC diagnostic error "-Wswitch"
#pragma GCC diagnostic error "-Wswitch-enum"
#endif
switch (reduce_type)
{
case reduction::Type::SUM: mlsl_reduce_type = MLSL::RT_SUM; break;
case reduction::Type::PROD:
throw std::runtime_error("MLSL doesn't support allreduce prod");
break;
case reduction::Type::MIN: mlsl_reduce_type = MLSL::RT_MIN; break;
case reduction::Type::MAX: mlsl_reduce_type = MLSL::RT_MAX; break;
}
#if defined(__GNUC__) && !(__GNUC__ == 4 && __GNUC_MINOR__ == 8)
#pragma GCC diagnostic pop
#endif
MLSL::Environment& env = MLSL::Environment::GetEnv();
MLSL::Distribution* distribution = env.CreateDistribution(env.GetProcessCount(), 1);
MLSL::CommReq* req = distribution->AllReduce(
in, out, count, data_type, mlsl_reduce_type, MLSL::GT_DATA);
env.Wait(req);
env.DeleteDistribution(distribution);
}
void broadcast(void* in,
element::Type_t element_type,
size_t count,
int root_id) override
{
auto data_type = MLSL::DT_FLOAT;
if (element_type == element::Type_t::f64)
{
data_type = MLSL::DT_DOUBLE;
}
else if (element_type != element::Type_t::f32)
{
throw std::runtime_error(
"BroadcastDistributed op supports only f32 and f64 types");
}
MLSL::Environment& env = MLSL::Environment::GetEnv();
MLSL::Distribution* distribution = env.CreateDistribution(env.GetProcessCount(), 1);
MLSL::CommReq* req =
distribution->Bcast(in, count, data_type, root_id, MLSL::GT_DATA);
env.Wait(req);
env.DeleteDistribution(distribution);
}
void recv(void* /* in */,
element::Type_t /* element_type */,
size_t /* count */,
int /* src_id */) override
{
throw ngraph_error("recv not supported/mentioned in MLSL");
}
void send(const void* /* in */,
element::Type_t /* element_type */,
size_t /* count */,
int /* dest_id */) override
{
throw ngraph_error("send not supported/mentioned in MLSL");
}
protected:
std::string m_name{"MLSL"};
bool m_initialized_mlsl = false;
};
}
}
#endif
//*****************************************************************************
// Copyright 2017-2020 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//*****************************************************************************
#pragma once
#include <cstdio>
#include <iostream>
#include "ngraph/distributed.hpp"
#ifdef NGRAPH_DISTRIBUTED_OMPI_ENABLE
#include <string>
#include <mpi.h>
namespace ngraph
{
namespace distributed
{
class OpenMPIDistributedInterface : public DistributedInterface
{
public:
OpenMPIDistributedInterface(const std::string& name = "OpenMPI")
: m_name(name)
{
int flag = 0;
MPI_Initialized(&flag);
if (!flag && !m_initialized_mpi)
{
MPI_Init(NULL, NULL);
m_initialized_mpi = true;
}
}
~OpenMPIDistributedInterface() override
{
int is_mpi_finalized = 0;
MPI_Finalized(&is_mpi_finalized);
if (!is_mpi_finalized && m_initialized_mpi)
{
MPI_Finalize();
m_initialized_mpi = false;
}
}
const std::string& get_name() const override { return m_name; }
int get_size() override
{
int size;
MPI_Comm_size(MPI_COMM_WORLD, &size);
return size;
}
int get_rank() override
{
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
return rank;
}
void log_print(const std::string& timestamp, const std::vector<char>& buf) override
{
std::printf(
"%s [OpenMPI RANK: %d]: %s\n", timestamp.c_str(), get_rank(), buf.data());
}
void all_reduce(void* in,
void* out,
element::Type_t element_type,
reduction::Type reduce_type,
size_t count) override
{
auto data_type = MPI_FLOAT;
if (element_type == element::Type_t::f32)
{
data_type = MPI_FLOAT;
}
else if (element_type == element::Type_t::f64)
{
data_type = MPI_DOUBLE;
}
else
{
throw std::runtime_error("AllReduce op supports only f32 and f64 types");
}
decltype(MPI_SUM) mpi_reduce_type;
#if defined(__GNUC__) && !(__GNUC__ == 4 && __GNUC_MINOR__ == 8)
#pragma GCC diagnostic push
#pragma GCC diagnostic error "-Wswitch"
#pragma GCC diagnostic error "-Wswitch-enum"
#endif
switch (reduce_type)
{
case reduction::Type::SUM: mpi_reduce_type = MPI_SUM; break;
case reduction::Type::PROD: mpi_reduce_type = MPI_PROD; break;
case reduction::Type::MIN: mpi_reduce_type = MPI_MIN; break;
case reduction::Type::MAX: mpi_reduce_type = MPI_MAX; break;
}
#if defined(__GNUC__) && !(__GNUC__ == 4 && __GNUC_MINOR__ == 8)
#pragma GCC diagnostic pop
#endif
MPI_Allreduce(in, out, count, data_type, mpi_reduce_type, MPI_COMM_WORLD);
}
void broadcast(void* in,
element::Type_t element_type,
size_t count,
int root_id) override
{
auto data_type = MPI_FLOAT;
if (element_type == element::Type_t::f64)
{
data_type = MPI_DOUBLE;
}
else if (element_type != element::Type_t::f32)
{
throw std::runtime_error(
"BroadcastDistributed op supports only f32 and f64 types");
}
MPI_Bcast(in, count, data_type, root_id, MPI_COMM_WORLD);
}
void recv(void* in, element::Type_t element_type, size_t count, int src_id) override
{
auto data_type = MPI_FLOAT;
// for send/recv bf16 and f16 can be treat as MPI_SHORT since all are 16bits
if (element_type == element::Type_t::bf16 || element_type == element::Type_t::f16)
{
data_type = MPI_SHORT;
}
else
{
data_type = ngraph_type_to_mpi_type(element_type);
}
MPI_Recv(in, count, data_type, src_id, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
void send(const void* in,
element::Type_t element_type,
size_t count,
int dest_id) override
{
auto data_type = MPI_FLOAT;
// for send/recv bf16 and f16 can be treat as MPI_SHORT since all are 16bits
if (element_type == element::Type_t::bf16 || element_type == element::Type_t::f16)
{
data_type = MPI_SHORT;
}
else
{
data_type = ngraph_type_to_mpi_type(element_type);
}
MPI_Send(in, count, data_type, dest_id, 0, MPI_COMM_WORLD);
}
protected:
MPI_Datatype ngraph_type_to_mpi_type(element::Type_t& n_type)
{
MPI_Datatype m_type = MPI_FLOAT;
#if defined(__GNUC__) && !(__GNUC__ == 4 && __GNUC_MINOR__ == 8)
#pragma GCC diagnostic push
#pragma GCC diagnostic error "-Wswitch"
#pragma GCC diagnostic error "-Wswitch-enum"
#endif
switch (n_type)
{
case element::Type_t::boolean: m_type = MPI_BYTE; break;
case element::Type_t::f32: m_type = MPI_FLOAT; break;
case element::Type_t::f64: m_type = MPI_DOUBLE; break;
case element::Type_t::i8: m_type = MPI_BYTE; break;
case element::Type_t::i16: m_type = MPI_SHORT; break;
case element::Type_t::i32: m_type = MPI_INT; break;
case element::Type_t::i64: m_type = MPI_LONG; break;
case element::Type_t::u8: m_type = MPI_UNSIGNED_CHAR; break;
case element::Type_t::u16: m_type = MPI_UNSIGNED_SHORT; break;
case element::Type_t::u32: m_type = MPI_UNSIGNED; break;
case element::Type_t::u64: m_type = MPI_UNSIGNED_LONG; break;
case element::Type_t::bf16:
case element::Type_t::f16:
case element::Type_t::u1:
case element::Type_t::undefined:
case element::Type_t::dynamic: throw std::runtime_error("unsupported type");
}
#if defined(__GNUC__) && !(__GNUC__ == 4 && __GNUC_MINOR__ == 8)
#pragma GCC diagnostic pop
#endif
return m_type;
}
std::string m_name;
bool m_initialized_mpi = false;
};
}
}
#endif
...@@ -490,10 +490,6 @@ if (NGRAPH_MLIR_ENABLE) ...@@ -490,10 +490,6 @@ if (NGRAPH_MLIR_ENABLE)
list(APPEND SRC mlir/ops_test.cpp) list(APPEND SRC mlir/ops_test.cpp)
endif() endif()
if(NGRAPH_DISTRIBUTED_ENABLE)
list(APPEND MULTI_TEST_SRC backend/distributed.in.cpp)
endif()
if (NGRAPH_CPU_ENABLE) if (NGRAPH_CPU_ENABLE)
list(APPEND MULTI_TEST_SRC backend/graph_comparison.in.cpp) list(APPEND MULTI_TEST_SRC backend/graph_comparison.in.cpp)
endif() endif()
......
//*****************************************************************************
// Copyright 2017-2020 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//*****************************************************************************
#include <fstream>
#include <sstream>
#include "gtest/gtest.h"
#include "ngraph/distributed.hpp"
#include "ngraph/file_util.hpp"
#include "ngraph/ngraph.hpp"
#include "ngraph/serializer.hpp"
#include "util/all_close_f.hpp"
#include "util/random.hpp"
#include "util/test_control.hpp"
using namespace std;
using namespace ngraph;
static string s_manifest = "${MANIFEST}";
static void test_allreduce_common(reduction::Type reduce_type)
{
auto comm_size = get_distributed_interface()->get_size();
if (comm_size > 1)
{
auto shape = Shape{2, 2};
auto A = make_shared<op::Parameter>(element::f32, shape);
auto f =
make_shared<Function>(make_shared<op::AllReduce>(A, reduce_type), ParameterVector{A});
auto backend = runtime::Backend::create("${BACKEND_NAME}");
auto v = vector<float>{1, 2, 3, 4};
auto a = backend->create_tensor(element::f32, shape);
auto result = backend->create_tensor(element::f32, shape);
#if defined(__GNUC__) && !(__GNUC__ == 4 && __GNUC_MINOR__ == 8)
#pragma GCC diagnostic push
#pragma GCC diagnostic error "-Wswitch"
#pragma GCC diagnostic error "-Wswitch-enum"
#endif
switch (reduce_type)
{
case reduction::Type::SUM:
copy_data(a, v);
std::transform(v.begin(), v.end(), v.begin(), [=](float x) { return x * comm_size; });
break;
case reduction::Type::PROD:
copy_data(a, v);
std::transform(v.begin(), v.end(), v.begin(), [&](float elm) -> float {
return pow(elm, comm_size);
});
break;
case reduction::Type::MIN:
case reduction::Type::MAX:
auto shift = get_distributed_interface()->get_rank();
std::rotate(v.begin(), v.begin() + shift % v.size(), v.end());
copy_data(a, v);
if (reduce_type == reduction::Type::MIN)
{
std::fill(v.begin(), v.end(), 1);
for (int i = 1; i < static_cast<int>(v.size()) - comm_size + 1; i++)
v[i] = i + 1;
}
else
{
std::fill(v.begin(), v.end(), v.size());
for (int i = 0; i < static_cast<int>(v.size()) - comm_size; i++)
v[i] = i + 2;
}
}
#if defined(__GNUC__) && !(__GNUC__ == 4 && __GNUC_MINOR__ == 8)
#pragma GCC diagnostic pop
#endif
auto handle = backend->compile(f);
handle->call_with_validate({result}, {a});
EXPECT_TRUE(test::all_close_f(v, read_vector<float>(result)));
}
}
NGRAPH_TEST(${BACKEND_NAME}, allreduce_sum)
{
test_allreduce_common(reduction::Type::SUM);
}
NGRAPH_TEST(${BACKEND_NAME}, allreduce_min)
{
test_allreduce_common(reduction::Type::MIN);
}
NGRAPH_TEST(${BACKEND_NAME}, allreduce_max)
{
test_allreduce_common(reduction::Type::MAX);
}
#if !defined(NGRAPH_DISTRIBUTED_MLSL_ENABLE)
NGRAPH_TEST(${BACKEND_NAME}, allreduce_prod)
{
test_allreduce_common(reduction::Type::PROD);
}
#endif
NGRAPH_TEST(${BACKEND_NAME}, broadcastdistributed)
{
auto shape = Shape{2, 2};
auto A = make_shared<op::Parameter>(element::f32, shape);
auto comm_size = get_distributed_interface()->get_size();
for (int root_id = 0; root_id < comm_size; ++root_id)
{
auto f = make_shared<Function>(make_shared<op::BroadcastDistributed>(A, root_id),
ParameterVector{A});
auto backend = runtime::Backend::create("${BACKEND_NAME}");
auto v = vector<float>{1, 2, 3, 4};
auto result = backend->create_tensor(element::f32, shape);
copy_data(result, vector<float>(4, 0));
auto processIdx = get_distributed_interface()->get_rank();
if (processIdx == root_id)
{
copy_data(result, v);
}
auto handle = backend->compile(f);
handle->call_with_validate({result}, {result});
EXPECT_EQ(v, read_vector<float>(result));
}
}
// MLSL does not support send recv
#if !defined(NGRAPH_DISTRIBUTED_MLSL_ENABLE)
NGRAPH_TEST(${BACKEND_NAME}, send_recv)
{
auto shape = Shape{2, 2};
auto A = make_shared<op::Parameter>(element::f32, shape);
auto comm_size = get_distributed_interface()->get_size();
// this test only works for 2 nodes
if (comm_size != 2)
{
return;
}
auto rank = get_distributed_interface()->get_rank();
std::shared_ptr<Function> f;
if (rank == 0)
{
f = make_shared<Function>(make_shared<op::Send>(A, 1), ParameterVector{A});
}
else
{
f = make_shared<Function>(make_shared<op::Recv>(A, 0), ParameterVector{A});
}
auto backend = runtime::Backend::create("${BACKEND_NAME}");
auto v = vector<float>{1, 2, 3, 4};
auto result = backend->create_tensor(element::f32, shape);
copy_data(result, vector<float>(4, 0));
if (rank == 0)
{
copy_data(result, v);
}
auto handle = backend->compile(f);
handle->call_with_validate({result}, {result});
EXPECT_EQ(v, read_vector<float>(result));
}
#endif
// MLSL does not support send recv
#if !defined(NGRAPH_DISTRIBUTED_MLSL_ENABLE)
NGRAPH_TEST(${BACKEND_NAME}, send_recv_ring)
{
auto shape = Shape{2, 2};
auto A = make_shared<op::Parameter>(element::f32, shape);
auto comm_size = get_distributed_interface()->get_size();
// test only works for at least 2 nodes
if (comm_size < 2)
{
return;
}
auto rank = get_distributed_interface()->get_rank();
std::shared_ptr<Function> f_send;
std::shared_ptr<Function> f_recv;
auto backend = runtime::Backend::create("${BACKEND_NAME}");
auto v = vector<float>{1, 2, 3, 4};
auto result = backend->create_tensor(element::f32, shape);
copy_data(result, vector<float>(4, 0));
if (rank != 0)
{
f_recv = make_shared<Function>(make_shared<op::Recv>(A, rank - 1), ParameterVector{A});
auto handle = backend->compile(f_recv);
handle->call_with_validate({result}, {result});
EXPECT_EQ(v, read_vector<float>(result));
}
else
{
copy_data(result, v);
}
f_send =
make_shared<Function>(make_shared<op::Send>(A, (rank + 1) % comm_size), ParameterVector{A});
backend->compile(f_send)->call_with_validate({result}, {result});
if (rank == 0)
{
f_recv = make_shared<Function>(make_shared<op::Recv>(A, comm_size - 1), ParameterVector{A});
auto handle = backend->compile(f_recv);
copy_data(result, vector<float>(4, 0));
backend->compile(f_recv)->call_with_validate({result}, {result});
EXPECT_EQ(v, read_vector<float>(result));
}
}
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment