refactor(util/thread): simplify thread life-cycle

This commit is contained in:
Gabriel Ebner 2017-03-01 09:08:23 +01:00
parent 7fc190c730
commit 901cef6629
3 changed files with 38 additions and 23 deletions

View file

@ -40,10 +40,7 @@ mt_task_queue::~mt_task_queue() {
m_queue_added.notify_all();
m_queue_changed.notify_all();
m_wake_up_worker.notify_all();
m_shut_down_cv.notify_all();
auto workers = m_workers;
lock.unlock();
for (auto & w : workers) w->m_thread->join();
m_shut_down_cv.wait(lock, [=] { return m_workers.empty(); });
}
bool mt_task_queue::empty_core() {
@ -62,12 +59,10 @@ constexpr chrono::milliseconds g_worker_max_idle_time = chrono::milliseconds(100
void mt_task_queue::spawn_worker() {
lean_always_assert(!m_shutting_down);
auto this_worker_strong = std::make_shared<worker_info>();
m_workers.push_back(this_worker_strong);
auto this_worker = std::make_shared<worker_info>();
m_workers.push_back(this_worker);
m_required_workers--;
std::weak_ptr<worker_info> this_worker_weak = this_worker_strong;
this_worker_strong->m_thread.reset(new lthread([=]() {
auto this_worker = this_worker_weak.lock();
this_worker->m_thread.reset(new lthread([this, this_worker]() {
save_stack_info(false);
unique_lock<mutex> lock(m_mutex);
@ -79,7 +74,7 @@ void mt_task_queue::spawn_worker() {
scoped_add<int> inc_required(m_required_workers, +1);
scoped_add<unsigned> inc_sleeping(m_sleeping_workers, +1);
if (m_wake_up_worker.wait_for(lock, g_worker_max_idle_time,
[&] { return m_required_workers >= 1; })) {
[&] { return m_required_workers >= 1 || m_shutting_down; })) {
continue;
} else {
break;
@ -87,7 +82,7 @@ void mt_task_queue::spawn_worker() {
}
if (m_queue.empty()) {
if (m_queue_added.wait_for(lock, g_worker_max_idle_time,
[&] { return !m_queue.empty(); })) {
[&] { return !m_queue.empty() || m_shutting_down; })) {
continue;
} else {
break;
@ -114,10 +109,15 @@ void mt_task_queue::spawn_worker() {
notify_queue_changed();
}
// We need to run the finalizers while the lock is held,
// otherwise we risk a race condition at the end of the program.
// We would finalize in the thread, while we call the finalize() function.
run_thread_finalizers();
run_post_thread_finalizers();
m_workers.erase(std::find(m_workers.begin(), m_workers.end(), this_worker));
m_required_workers++;
m_shut_down_cv.notify_all();
}));
}

View file

@ -76,8 +76,6 @@ public:
save_stack_info(false);
scoped_interrupt_flag scope_int_flag(&m_flag);
fun();
run_thread_finalizers();
run_post_thread_finalizers();
})
{}

View file

@ -20,6 +20,20 @@ Author: Leonardo de Moura
#endif
namespace lean {
using runnable = std::function<void()>;
static void thread_main(void * p) {
std::unique_ptr<runnable> f;
f.reset(reinterpret_cast<runnable *>(p));
(*f)();
f.reset();
run_thread_finalizers();
run_post_thread_finalizers();
}
#if defined(LEAN_MULTI_THREAD)
size_t lthread::m_thread_stack_size = LEAN_DEFAULT_THREAD_STACK_SIZE;
@ -31,7 +45,7 @@ size_t lthread::get_thread_stack_size() {
return m_thread_stack_size;
}
static std::function<void(void)> mk_thread_proc(std::function<void(void)> const & p, unsigned max) {
static runnable mk_thread_proc(runnable const & p, size_t max) {
return [=]() { set_max_heartbeat(max); p(); }; // NOLINT
}
@ -42,14 +56,14 @@ struct lthread::imp {
HANDLE m_thread;
static DWORD WINAPI _main(void * p) {
(*reinterpret_cast<std::function<void(void)>*>(p))();
thread_main(p);
return 0;
}
imp(std::function<void(void)> const & p):
m_proc(mk_thread_proc(p, get_max_heartbeat())) {
imp(runnable const & p) {
runnable * f = new std::function<void()>(mk_thread_proc(p, get_max_heartbeat()));
m_thread = CreateThread(nullptr, m_thread_stack_size,
_main, &m_proc, 0, nullptr);
_main, f, 0, nullptr);
if (m_thread == NULL) {
throw exception("failed to create thread");
}
@ -64,23 +78,22 @@ struct lthread::imp {
#else
/* OSX/Linux version based on pthreads */
struct lthread::imp {
std::function<void(void)> m_proc;
pthread_attr_t m_attr;
pthread_t m_thread;
bool m_joined = false;
static void * _main(void * p) {
(*reinterpret_cast<std::function<void(void)>*>(p))();
thread_main(p);
return nullptr;
}
imp(std::function<void(void)> const & p):
m_proc(mk_thread_proc(p, get_max_heartbeat())) {
imp(runnable const & p) {
pthread_attr_init(&m_attr);
if (pthread_attr_setstacksize(&m_attr, m_thread_stack_size)) {
throw exception("failed to set thread stack size");
}
if (pthread_create(&m_thread, &m_attr, _main, &m_proc)) {
runnable * f = new std::function<void()>(mk_thread_proc(p, get_max_heartbeat()));
if (pthread_create(&m_thread, &m_attr, _main, f)) {
throw exception("failed to create thread");
}
}
@ -206,9 +219,13 @@ void register_post_thread_finalizer(thread_finalizer fn, void * p) {
}
void run_thread_finalizers() {
if (auto p = g_thread_finalizers_mgr->get_pair())
run_thread_finalizers_core(p->first);
}
void run_post_thread_finalizers() {
if (auto p = g_thread_finalizers_mgr->get_pair())
run_thread_finalizers_core(p->second);
}
void initialize_thread() {