Commit 491502a3 authored by Alexander Alekhin's avatar Alexander Alekhin

core: fix parallel_for data race

parent e268fdc0
...@@ -190,7 +190,7 @@ class WorkerThread ...@@ -190,7 +190,7 @@ class WorkerThread
{ {
public: public:
ThreadPool& thread_pool; ThreadPool& thread_pool;
unsigned id; const unsigned id;
pthread_t posix_thread; pthread_t posix_thread;
bool is_created; bool is_created;
...@@ -418,14 +418,15 @@ void WorkerThread::thread_body() ...@@ -418,14 +418,15 @@ void WorkerThread::thread_body()
stat.threadWake = getTickCount(); stat.threadWake = getTickCount();
#endif #endif
if (!stop_thread)
{
CV_LOG_VERBOSE(NULL, 5, "Thread: checking for new job"); CV_LOG_VERBOSE(NULL, 5, "Thread: checking for new job");
if (CV_WORKER_ACTIVE_WAIT_THREADS_LIMIT == 0) if (CV_WORKER_ACTIVE_WAIT_THREADS_LIMIT == 0)
allow_active_wait = true; allow_active_wait = true;
Ptr<ParallelJob> j_ptr; swap(j_ptr, job); Ptr<ParallelJob> j_ptr; swap(j_ptr, job);
has_wake_signal = false; has_wake_signal = false; // TODO .store(false, std::memory_order_release)
pthread_mutex_unlock(&mutex); pthread_mutex_unlock(&mutex);
if (!stop_thread)
{
ParallelJob* j = j_ptr; ParallelJob* j = j_ptr;
if (j) if (j)
{ {
...@@ -480,10 +481,6 @@ void WorkerThread::thread_body() ...@@ -480,10 +481,6 @@ void WorkerThread::thread_body()
} }
} }
} }
else
{
pthread_mutex_unlock(&mutex);
}
#ifdef CV_PROFILE_THREADS #ifdef CV_PROFILE_THREADS
stat.threadFree = getTickCount(); stat.threadFree = getTickCount();
stat.keepActive = allow_active_wait; stat.keepActive = allow_active_wait;
...@@ -595,40 +592,42 @@ void ThreadPool::run(const Range& range, const ParallelLoopBody& body, double ns ...@@ -595,40 +592,42 @@ void ThreadPool::run(const Range& range, const ParallelLoopBody& body, double ns
CV_LOG_VERBOSE(NULL, 5, "MainThread: wake worker threads..."); CV_LOG_VERBOSE(NULL, 5, "MainThread: wake worker threads...");
for (size_t i = 0; i < threads.size(); ++i) for (size_t i = 0; i < threads.size(); ++i)
{ {
WorkerThread& thread = *(threads[i].get());
if (
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR) #if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
bool isActive = threads[i]->isActive; thread.isActive ||
if (isActive || threads[i]->has_wake_signal)
#else
if (threads[i]->has_wake_signal)
#endif #endif
thread.has_wake_signal
|| !thread.job.empty() // #10881
)
{ {
pthread_mutex_lock(&threads[i]->mutex); pthread_mutex_lock(&thread.mutex);
threads[i]->job = job; thread.job = job;
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR) #if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
isActive = threads[i]->isActive; bool isActive = thread.isActive;
#endif #endif
threads[i]->has_wake_signal = true; thread.has_wake_signal = true;
#ifdef CV_PROFILE_THREADS #ifdef CV_PROFILE_THREADS
threads_stat[i + 1].reset(); threads_stat[i + 1].reset();
#endif #endif
pthread_mutex_unlock(&threads[i]->mutex); pthread_mutex_unlock(&thread.mutex);
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR) #if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
if (!isActive) if (!isActive)
{ {
pthread_cond_broadcast/*pthread_cond_signal*/(&threads[i]->cond_thread_wake); // wake thread pthread_cond_broadcast/*pthread_cond_signal*/(&thread.cond_thread_wake); // wake thread
} }
#endif #endif
} }
else else
{ {
CV_Assert(threads[i]->job.empty()); CV_Assert(thread.job.empty());
threads[i]->job = job; thread.job = job;
threads[i]->has_wake_signal = true; thread.has_wake_signal = true;
#ifdef CV_PROFILE_THREADS #ifdef CV_PROFILE_THREADS
threads_stat[i + 1].reset(); threads_stat[i + 1].reset();
#endif #endif
#if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR) #if !defined(CV_USE_GLOBAL_WORKERS_COND_VAR)
pthread_cond_broadcast/*pthread_cond_signal*/(&threads[i]->cond_thread_wake); // wake thread pthread_cond_broadcast/*pthread_cond_signal*/(&thread.cond_thread_wake); // wake thread
#endif #endif
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment