Commit 132253c9 authored by Alexander Alekhin's avatar Alexander Alekhin

dnn: use AsyncArray

parent 9340af1a
...@@ -44,9 +44,7 @@ ...@@ -44,9 +44,7 @@
#include <vector> #include <vector>
#include <opencv2/core.hpp> #include <opencv2/core.hpp>
#ifdef CV_CXX11 #include "opencv2/core/async.hpp"
#include <future>
#endif
#if !defined CV_DOXYGEN && !defined CV_STATIC_ANALYSIS && !defined CV_DNN_DONT_ADD_EXPERIMENTAL_NS #if !defined CV_DOXYGEN && !defined CV_STATIC_ANALYSIS && !defined CV_DNN_DONT_ADD_EXPERIMENTAL_NS
#define CV__DNN_EXPERIMENTAL_NS_BEGIN namespace experimental_dnn_34_v12 { #define CV__DNN_EXPERIMENTAL_NS_BEGIN namespace experimental_dnn_34_v12 {
...@@ -67,18 +65,6 @@ CV__DNN_EXPERIMENTAL_NS_BEGIN ...@@ -67,18 +65,6 @@ CV__DNN_EXPERIMENTAL_NS_BEGIN
typedef std::vector<int> MatShape; typedef std::vector<int> MatShape;
#if defined(CV_CXX11) || defined(CV_DOXYGEN)
typedef std::future<Mat> AsyncMat;
#else
// Just a workaround for bindings.
struct AsyncMat
{
Mat get() { return Mat(); }
void wait() const {}
size_t wait_for(size_t milliseconds) const { CV_UNUSED(milliseconds); return -1; }
};
#endif
/** /**
* @brief Enum of computation backends supported by layers. * @brief Enum of computation backends supported by layers.
* @see Net::setPreferableBackend * @see Net::setPreferableBackend
...@@ -483,7 +469,7 @@ CV__DNN_EXPERIMENTAL_NS_BEGIN ...@@ -483,7 +469,7 @@ CV__DNN_EXPERIMENTAL_NS_BEGIN
* This is an asynchronous version of forward(const String&). * This is an asynchronous version of forward(const String&).
* dnn::DNN_BACKEND_INFERENCE_ENGINE backend is required. * dnn::DNN_BACKEND_INFERENCE_ENGINE backend is required.
*/ */
CV_WRAP AsyncMat forwardAsync(const String& outputName = String()); CV_WRAP AsyncArray forwardAsync(const String& outputName = String());
/** @brief Runs forward pass to compute output of layer with name @p outputName. /** @brief Runs forward pass to compute output of layer with name @p outputName.
* @param outputBlobs contains all output blobs for specified layer. * @param outputBlobs contains all output blobs for specified layer.
......
...@@ -2,13 +2,6 @@ ...@@ -2,13 +2,6 @@
typedef dnn::DictValue LayerId; typedef dnn::DictValue LayerId;
typedef std::vector<dnn::MatShape> vector_MatShape; typedef std::vector<dnn::MatShape> vector_MatShape;
typedef std::vector<std::vector<dnn::MatShape> > vector_vector_MatShape; typedef std::vector<std::vector<dnn::MatShape> > vector_vector_MatShape;
#ifdef CV_CXX11
typedef std::chrono::milliseconds chrono_milliseconds;
typedef std::future_status AsyncMatStatus;
#else
typedef size_t chrono_milliseconds;
typedef size_t AsyncMatStatus;
#endif
template<> template<>
bool pyopencv_to(PyObject *o, dnn::DictValue &dv, const char *name) bool pyopencv_to(PyObject *o, dnn::DictValue &dv, const char *name)
...@@ -46,46 +39,6 @@ bool pyopencv_to(PyObject *o, std::vector<Mat> &blobs, const char *name) //requi ...@@ -46,46 +39,6 @@ bool pyopencv_to(PyObject *o, std::vector<Mat> &blobs, const char *name) //requi
return pyopencvVecConverter<Mat>::to(o, blobs, ArgInfo(name, false)); return pyopencvVecConverter<Mat>::to(o, blobs, ArgInfo(name, false));
} }
#ifdef CV_CXX11
template<>
PyObject* pyopencv_from(const std::future<Mat>& f_)
{
std::future<Mat>& f = const_cast<std::future<Mat>&>(f_);
Ptr<cv::dnn::AsyncMat> p(new std::future<Mat>(std::move(f)));
return pyopencv_from(p);
}
template<>
PyObject* pyopencv_from(const std::future_status& status)
{
return pyopencv_from((int)status);
}
template<>
bool pyopencv_to(PyObject* src, std::chrono::milliseconds& dst, const char* name)
{
size_t millis = 0;
if (pyopencv_to(src, millis, name))
{
dst = std::chrono::milliseconds(millis);
return true;
}
else
return false;
}
#else
template<>
PyObject* pyopencv_from(const cv::dnn::AsyncMat&)
{
CV_Error(Error::StsNotImplemented, "C++11 is required.");
return 0;
}
#endif // CV_CXX11
template<typename T> template<typename T>
PyObject* pyopencv_from(const dnn::DictValue &dv) PyObject* pyopencv_from(const dnn::DictValue &dv)
{ {
......
#error This is a shadow header file, which is not intended for processing by any compiler. \
Only bindings parser should handle this file.
namespace cv { namespace dnn {
class CV_EXPORTS_W AsyncMat
{
public:
//! Wait for Mat object readiness and return it.
CV_WRAP Mat get();
//! Wait for Mat object readiness.
CV_WRAP void wait() const;
/** @brief Wait for Mat object readiness specific amount of time.
* @param timeout Timeout in milliseconds
* @returns [std::future_status](https://en.cppreference.com/w/cpp/thread/future_status)
*/
CV_WRAP AsyncMatStatus wait_for(std::chrono::milliseconds timeout) const;
};
}}
...@@ -69,8 +69,9 @@ def printParams(backend, target): ...@@ -69,8 +69,9 @@ def printParams(backend, target):
class dnn_test(NewOpenCVTests): class dnn_test(NewOpenCVTests):
def __init__(self, *args, **kwargs): def setUp(self):
super(dnn_test, self).__init__(*args, **kwargs) super(dnn_test, self).setUp()
self.dnnBackendsAndTargets = [ self.dnnBackendsAndTargets = [
[cv.dnn.DNN_BACKEND_OPENCV, cv.dnn.DNN_TARGET_CPU], [cv.dnn.DNN_BACKEND_OPENCV, cv.dnn.DNN_TARGET_CPU],
] ]
...@@ -168,7 +169,7 @@ class dnn_test(NewOpenCVTests): ...@@ -168,7 +169,7 @@ class dnn_test(NewOpenCVTests):
normAssertDetections(self, ref, out, 0.5, scoresDiff, iouDiff) normAssertDetections(self, ref, out, 0.5, scoresDiff, iouDiff)
def test_async(self): def test_async(self):
timeout = 5000 # in milliseconds timeout = 500*10**6 # in nanoseconds (500ms)
testdata_required = bool(os.environ.get('OPENCV_DNN_TEST_REQUIRE_TESTDATA', False)) testdata_required = bool(os.environ.get('OPENCV_DNN_TEST_REQUIRE_TESTDATA', False))
proto = self.find_dnn_file('dnn/layers/layer_convolution.prototxt', required=testdata_required) proto = self.find_dnn_file('dnn/layers/layer_convolution.prototxt', required=testdata_required)
model = self.find_dnn_file('dnn/layers/layer_convolution.caffemodel', required=testdata_required) model = self.find_dnn_file('dnn/layers/layer_convolution.caffemodel', required=testdata_required)
...@@ -209,11 +210,9 @@ class dnn_test(NewOpenCVTests): ...@@ -209,11 +210,9 @@ class dnn_test(NewOpenCVTests):
outs.insert(0, netAsync.forwardAsync()) outs.insert(0, netAsync.forwardAsync())
for i in reversed(range(numInputs)): for i in reversed(range(numInputs)):
ret = outs[i].wait_for(timeout) ret, result = outs[i].get(timeoutNs=float(timeout))
if ret == 1: self.assertTrue(ret)
self.fail("Timeout") normAssert(self, refs[i], result, 'Index: %d' % i, 1e-10)
self.assertEqual(ret, 0) # is ready
normAssert(self, refs[i], outs[i].get(), 'Index: %d' % i, 1e-10)
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -2557,7 +2557,7 @@ struct Net::Impl ...@@ -2557,7 +2557,7 @@ struct Net::Impl
} }
#ifdef CV_CXX11 #ifdef CV_CXX11
std::future<Mat> getBlobAsync(const LayerPin& pin) AsyncArray getBlobAsync(const LayerPin& pin)
{ {
CV_TRACE_FUNCTION(); CV_TRACE_FUNCTION();
#ifdef HAVE_INF_ENGINE #ifdef HAVE_INF_ENGINE
...@@ -2586,7 +2586,7 @@ struct Net::Impl ...@@ -2586,7 +2586,7 @@ struct Net::Impl
#endif #endif
} }
std::future<Mat> getBlobAsync(String outputName) AsyncArray getBlobAsync(String outputName)
{ {
return getBlobAsync(getPinByAlias(outputName)); return getBlobAsync(getPinByAlias(outputName));
} }
...@@ -2714,7 +2714,7 @@ Mat Net::forward(const String& outputName) ...@@ -2714,7 +2714,7 @@ Mat Net::forward(const String& outputName)
return impl->getBlob(layerName); return impl->getBlob(layerName);
} }
AsyncMat Net::forwardAsync(const String& outputName) AsyncArray Net::forwardAsync(const String& outputName)
{ {
CV_TRACE_FUNCTION(); CV_TRACE_FUNCTION();
#ifdef CV_CXX11 #ifdef CV_CXX11
......
...@@ -849,7 +849,7 @@ void InfEngineBackendNet::InfEngineReqWrapper::makePromises(const std::vector<Pt ...@@ -849,7 +849,7 @@ void InfEngineBackendNet::InfEngineReqWrapper::makePromises(const std::vector<Pt
outsNames.resize(outs.size()); outsNames.resize(outs.size());
for (int i = 0; i < outs.size(); ++i) for (int i = 0; i < outs.size(); ++i)
{ {
outs[i]->futureMat = outProms[i].get_future(); outs[i]->futureMat = outProms[i].getArrayResult();
outsNames[i] = outs[i]->dataPtr->name; outsNames[i] = outs[i]->dataPtr->name;
} }
} }
...@@ -906,20 +906,38 @@ void InfEngineBackendNet::forward(const std::vector<Ptr<BackendWrapper> >& outBl ...@@ -906,20 +906,38 @@ void InfEngineBackendNet::forward(const std::vector<Ptr<BackendWrapper> >& outBl
{ {
InfEngineReqWrapper* wrapper; InfEngineReqWrapper* wrapper;
request->GetUserData((void**)&wrapper, 0); request->GetUserData((void**)&wrapper, 0);
CV_Assert(wrapper); CV_Assert(wrapper && "Internal error");
for (int i = 0; i < wrapper->outProms.size(); ++i) size_t processedOutputs = 0;
try
{ {
const std::string& name = wrapper->outsNames[i]; for (; processedOutputs < wrapper->outProms.size(); ++processedOutputs)
Mat m = infEngineBlobToMat(wrapper->req.GetBlob(name)); {
const std::string& name = wrapper->outsNames[processedOutputs];
Mat m = infEngineBlobToMat(wrapper->req.GetBlob(name));
if (status == InferenceEngine::StatusCode::OK) try
wrapper->outProms[i].set_value(m.clone()); {
else CV_Assert(status == InferenceEngine::StatusCode::OK);
wrapper->outProms[processedOutputs].setValue(m.clone());
}
catch (...)
{
try {
wrapper->outProms[processedOutputs].setException(std::current_exception());
} catch(...) {
CV_LOG_ERROR(NULL, "DNN: Exception occured during async inference exception propagation");
}
}
}
}
catch (...)
{
std::exception_ptr e = std::current_exception();
for (; processedOutputs < wrapper->outProms.size(); ++processedOutputs)
{ {
try { try {
std::runtime_error e("Async request failed"); wrapper->outProms[processedOutputs].setException(e);
wrapper->outProms[i].set_exception(std::make_exception_ptr(e));
} catch(...) { } catch(...) {
CV_LOG_ERROR(NULL, "DNN: Exception occured during async inference exception propagation"); CV_LOG_ERROR(NULL, "DNN: Exception occured during async inference exception propagation");
} }
......
...@@ -12,6 +12,9 @@ ...@@ -12,6 +12,9 @@
#include "opencv2/core/cvstd.hpp" #include "opencv2/core/cvstd.hpp"
#include "opencv2/dnn.hpp" #include "opencv2/dnn.hpp"
#include "opencv2/core/async.hpp"
#include "opencv2/core/detail/async_promise.hpp"
#include "opencv2/dnn/utils/inference_engine.hpp" #include "opencv2/dnn/utils/inference_engine.hpp"
#ifdef HAVE_INF_ENGINE #ifdef HAVE_INF_ENGINE
...@@ -208,7 +211,7 @@ private: ...@@ -208,7 +211,7 @@ private:
void makePromises(const std::vector<Ptr<BackendWrapper> >& outs); void makePromises(const std::vector<Ptr<BackendWrapper> >& outs);
InferenceEngine::InferRequest req; InferenceEngine::InferRequest req;
std::vector<std::promise<Mat> > outProms; std::vector<cv::AsyncPromise> outProms;
std::vector<std::string> outsNames; std::vector<std::string> outsNames;
bool isReady; bool isReady;
}; };
...@@ -264,7 +267,7 @@ public: ...@@ -264,7 +267,7 @@ public:
InferenceEngine::DataPtr dataPtr; InferenceEngine::DataPtr dataPtr;
InferenceEngine::Blob::Ptr blob; InferenceEngine::Blob::Ptr blob;
std::future<Mat> futureMat; AsyncArray futureMat;
}; };
InferenceEngine::Blob::Ptr wrapToInfEngineBlob(const Mat& m, InferenceEngine::Layout layout = InferenceEngine::Layout::ANY); InferenceEngine::Blob::Ptr wrapToInfEngineBlob(const Mat& m, InferenceEngine::Layout layout = InferenceEngine::Layout::ANY);
......
...@@ -341,12 +341,13 @@ TEST(Net, forwardAndRetrieve) ...@@ -341,12 +341,13 @@ TEST(Net, forwardAndRetrieve)
} }
#ifdef HAVE_INF_ENGINE #ifdef HAVE_INF_ENGINE
static const std::chrono::milliseconds async_timeout(500);
// This test runs network in synchronous mode for different inputs and then // This test runs network in synchronous mode for different inputs and then
// runs the same model asynchronously for the same inputs. // runs the same model asynchronously for the same inputs.
typedef testing::TestWithParam<tuple<int, Target> > Async; typedef testing::TestWithParam<tuple<int, Target> > Async;
TEST_P(Async, set_and_forward_single) TEST_P(Async, set_and_forward_single)
{ {
static const int kTimeout = 5000; // in milliseconds.
const int dtype = get<0>(GetParam()); const int dtype = get<0>(GetParam());
const int target = get<1>(GetParam()); const int target = get<1>(GetParam());
...@@ -383,16 +384,16 @@ TEST_P(Async, set_and_forward_single) ...@@ -383,16 +384,16 @@ TEST_P(Async, set_and_forward_single)
{ {
netAsync.setInput(inputs[i]); netAsync.setInput(inputs[i]);
std::future<Mat> out = netAsync.forwardAsync(); AsyncArray out = netAsync.forwardAsync();
if (out.wait_for(std::chrono::milliseconds(kTimeout)) == std::future_status::timeout) ASSERT_TRUE(out.valid());
CV_Error(Error::StsAssert, "Timeout"); Mat result;
normAssert(refs[i], out.get(), format("Index: %d", i).c_str(), 0, 0); EXPECT_TRUE(out.get(result, async_timeout));
normAssert(refs[i], result, format("Index: %d", i).c_str(), 0, 0);
} }
} }
TEST_P(Async, set_and_forward_all) TEST_P(Async, set_and_forward_all)
{ {
static const int kTimeout = 5000; // in milliseconds.
const int dtype = get<0>(GetParam()); const int dtype = get<0>(GetParam());
const int target = get<1>(GetParam()); const int target = get<1>(GetParam());
...@@ -426,7 +427,7 @@ TEST_P(Async, set_and_forward_all) ...@@ -426,7 +427,7 @@ TEST_P(Async, set_and_forward_all)
} }
// Run asynchronously. To make test more robust, process inputs in the reversed order. // Run asynchronously. To make test more robust, process inputs in the reversed order.
std::vector<std::future<Mat> > outs(numInputs); std::vector<AsyncArray> outs(numInputs);
for (int i = numInputs - 1; i >= 0; --i) for (int i = numInputs - 1; i >= 0; --i)
{ {
netAsync.setInput(inputs[i]); netAsync.setInput(inputs[i]);
...@@ -435,9 +436,10 @@ TEST_P(Async, set_and_forward_all) ...@@ -435,9 +436,10 @@ TEST_P(Async, set_and_forward_all)
for (int i = numInputs - 1; i >= 0; --i) for (int i = numInputs - 1; i >= 0; --i)
{ {
if (outs[i].wait_for(std::chrono::milliseconds(kTimeout)) == std::future_status::timeout) ASSERT_TRUE(outs[i].valid());
CV_Error(Error::StsAssert, "Timeout"); Mat result;
normAssert(refs[i], outs[i].get(), format("Index: %d", i).c_str(), 0, 0); EXPECT_TRUE(outs[i].get(result, async_timeout));
normAssert(refs[i], result, format("Index: %d", i).c_str(), 0, 0);
} }
} }
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <opencv2/highgui.hpp> #include <opencv2/highgui.hpp>
#ifdef CV_CXX11 #ifdef CV_CXX11
#include <mutex>
#include <thread> #include <thread>
#include <queue> #include <queue>
#endif #endif
...@@ -185,7 +186,7 @@ int main(int argc, char** argv) ...@@ -185,7 +186,7 @@ int main(int argc, char** argv)
QueueFPS<Mat> processedFramesQueue; QueueFPS<Mat> processedFramesQueue;
QueueFPS<std::vector<Mat> > predictionsQueue; QueueFPS<std::vector<Mat> > predictionsQueue;
std::thread processingThread([&](){ std::thread processingThread([&](){
std::queue<std::future<Mat> > futureOutputs; std::queue<AsyncArray> futureOutputs;
Mat blob; Mat blob;
while (process) while (process)
{ {
...@@ -224,11 +225,13 @@ int main(int argc, char** argv) ...@@ -224,11 +225,13 @@ int main(int argc, char** argv)
} }
while (!futureOutputs.empty() && while (!futureOutputs.empty() &&
futureOutputs.front().wait_for(std::chrono::seconds(0)) == std::future_status::ready) futureOutputs.front().wait_for(std::chrono::seconds(0)))
{ {
Mat out = futureOutputs.front().get(); AsyncArray async_out = futureOutputs.front();
predictionsQueue.push({out});
futureOutputs.pop(); futureOutputs.pop();
Mat out;
async_out.get(out);
predictionsQueue.push({out});
} }
} }
}); });
......
...@@ -4,7 +4,7 @@ import numpy as np ...@@ -4,7 +4,7 @@ import numpy as np
import sys import sys
import time import time
from threading import Thread from threading import Thread
if sys.version_info[0] == '2': if sys.version_info[0] == 2:
import Queue as queue import Queue as queue
else: else:
import queue import queue
...@@ -262,7 +262,7 @@ def processingThreadBody(): ...@@ -262,7 +262,7 @@ def processingThreadBody():
outs = net.forward(outNames) outs = net.forward(outNames)
predictionsQueue.put(np.copy(outs)) predictionsQueue.put(np.copy(outs))
while futureOutputs and futureOutputs[0].wait_for(0) == 0: while futureOutputs and futureOutputs[0].wait_for(0):
out = futureOutputs[0].get() out = futureOutputs[0].get()
predictionsQueue.put(np.copy([out])) predictionsQueue.put(np.copy([out]))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment