From 901cef6629d4ff82ddd8d210f8f2ea585b742464 Mon Sep 17 00:00:00 2001 From: Gabriel Ebner Date: Wed, 1 Mar 2017 09:08:23 +0100 Subject: [PATCH] refactor(util/thread): simplify thread life-cycle --- src/library/mt_task_queue.cpp | 22 ++++++++++----------- src/util/interrupt.h | 2 -- src/util/thread.cpp | 37 +++++++++++++++++++++++++---------- 3 files changed, 38 insertions(+), 23 deletions(-) diff --git a/src/library/mt_task_queue.cpp b/src/library/mt_task_queue.cpp index 4083b382c4..1420e0bf22 100644 --- a/src/library/mt_task_queue.cpp +++ b/src/library/mt_task_queue.cpp @@ -40,10 +40,7 @@ mt_task_queue::~mt_task_queue() { m_queue_added.notify_all(); m_queue_changed.notify_all(); m_wake_up_worker.notify_all(); - m_shut_down_cv.notify_all(); - auto workers = m_workers; - lock.unlock(); - for (auto & w : workers) w->m_thread->join(); + m_shut_down_cv.wait(lock, [=] { return m_workers.empty(); }); } bool mt_task_queue::empty_core() { @@ -62,12 +59,10 @@ constexpr chrono::milliseconds g_worker_max_idle_time = chrono::milliseconds(100 void mt_task_queue::spawn_worker() { lean_always_assert(!m_shutting_down); - auto this_worker_strong = std::make_shared(); - m_workers.push_back(this_worker_strong); + auto this_worker = std::make_shared(); + m_workers.push_back(this_worker); m_required_workers--; - std::weak_ptr this_worker_weak = this_worker_strong; - this_worker_strong->m_thread.reset(new lthread([=]() { - auto this_worker = this_worker_weak.lock(); + this_worker->m_thread.reset(new lthread([this, this_worker]() { save_stack_info(false); unique_lock lock(m_mutex); @@ -79,7 +74,7 @@ void mt_task_queue::spawn_worker() { scoped_add inc_required(m_required_workers, +1); scoped_add inc_sleeping(m_sleeping_workers, +1); if (m_wake_up_worker.wait_for(lock, g_worker_max_idle_time, - [&] { return m_required_workers >= 1; })) { + [&] { return m_required_workers >= 1 || m_shutting_down; })) { continue; } else { break; @@ -87,7 +82,7 @@ void mt_task_queue::spawn_worker() { } if (m_queue.empty()) { if (m_queue_added.wait_for(lock, g_worker_max_idle_time, - [&] { return !m_queue.empty(); })) { + [&] { return !m_queue.empty() || m_shutting_down; })) { continue; } else { break; @@ -114,10 +109,15 @@ void mt_task_queue::spawn_worker() { notify_queue_changed(); } + // We need to run the finalizers while the lock is held, + // otherwise we risk a race condition at the end of the program. + // We would finalize in the thread, while we call the finalize() function. run_thread_finalizers(); run_post_thread_finalizers(); + m_workers.erase(std::find(m_workers.begin(), m_workers.end(), this_worker)); m_required_workers++; + m_shut_down_cv.notify_all(); })); } diff --git a/src/util/interrupt.h b/src/util/interrupt.h index 7cd6fe2a2b..4b8329c903 100644 --- a/src/util/interrupt.h +++ b/src/util/interrupt.h @@ -76,8 +76,6 @@ public: save_stack_info(false); scoped_interrupt_flag scope_int_flag(&m_flag); fun(); - run_thread_finalizers(); - run_post_thread_finalizers(); }) {} diff --git a/src/util/thread.cpp b/src/util/thread.cpp index ba4c60f098..870a62972d 100644 --- a/src/util/thread.cpp +++ b/src/util/thread.cpp @@ -20,6 +20,20 @@ Author: Leonardo de Moura #endif namespace lean { + +using runnable = std::function; + +static void thread_main(void * p) { + std::unique_ptr f; + f.reset(reinterpret_cast(p)); + + (*f)(); + f.reset(); + + run_thread_finalizers(); + run_post_thread_finalizers(); +} + #if defined(LEAN_MULTI_THREAD) size_t lthread::m_thread_stack_size = LEAN_DEFAULT_THREAD_STACK_SIZE; @@ -31,7 +45,7 @@ size_t lthread::get_thread_stack_size() { return m_thread_stack_size; } -static std::function mk_thread_proc(std::function const & p, unsigned max) { +static runnable mk_thread_proc(runnable const & p, size_t max) { return [=]() { set_max_heartbeat(max); p(); }; // NOLINT } @@ -42,14 +56,14 @@ struct lthread::imp { HANDLE m_thread; static DWORD WINAPI _main(void * p) { - (*reinterpret_cast*>(p))(); + thread_main(p); return 0; } - imp(std::function const & p): - m_proc(mk_thread_proc(p, get_max_heartbeat())) { + imp(runnable const & p) { + runnable * f = new std::function(mk_thread_proc(p, get_max_heartbeat())); m_thread = CreateThread(nullptr, m_thread_stack_size, - _main, &m_proc, 0, nullptr); + _main, f, 0, nullptr); if (m_thread == NULL) { throw exception("failed to create thread"); } @@ -64,23 +78,22 @@ struct lthread::imp { #else /* OSX/Linux version based on pthreads */ struct lthread::imp { - std::function m_proc; pthread_attr_t m_attr; pthread_t m_thread; bool m_joined = false; static void * _main(void * p) { - (*reinterpret_cast*>(p))(); + thread_main(p); return nullptr; } - imp(std::function const & p): - m_proc(mk_thread_proc(p, get_max_heartbeat())) { + imp(runnable const & p) { pthread_attr_init(&m_attr); if (pthread_attr_setstacksize(&m_attr, m_thread_stack_size)) { throw exception("failed to set thread stack size"); } - if (pthread_create(&m_thread, &m_attr, _main, &m_proc)) { + runnable * f = new std::function(mk_thread_proc(p, get_max_heartbeat())); + if (pthread_create(&m_thread, &m_attr, _main, f)) { throw exception("failed to create thread"); } } @@ -206,9 +219,13 @@ void register_post_thread_finalizer(thread_finalizer fn, void * p) { } void run_thread_finalizers() { + if (auto p = g_thread_finalizers_mgr->get_pair()) + run_thread_finalizers_core(p->first); } void run_post_thread_finalizers() { + if (auto p = g_thread_finalizers_mgr->get_pair()) + run_thread_finalizers_core(p->second); } void initialize_thread() {