From ac9708051aedbf25377bc28efa9d565d6bedd757 Mon Sep 17 00:00:00 2001 From: Sebastian Ullrich Date: Fri, 7 Feb 2025 10:06:57 +0100 Subject: [PATCH] feat: respect `Task.map/bind (sync := true)` after waiting (#6976) This PR extends the behavior of the `sync` flag for `Task.map/bind` etc. to encompass synchronous execution even when they first have to wait on completion of the first task, drastically lowering the overhead of such tasks. Thus the flag is now equivalent to e.g. .NET's `TaskContinuationOptions.ExecuteSynchronously`. --- src/Init/Core.lean | 8 ++++++-- src/runtime/object.cpp | 30 +++++++++++++++++------------- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/src/Init/Core.lean b/src/Init/Core.lean index 542f3afd5d..ab2f389078 100644 --- a/src/Init/Core.lean +++ b/src/Init/Core.lean @@ -593,7 +593,9 @@ set_option linter.unusedVariables.funArgs false in be available and then calls `f` on the result. `prio`, if provided, is the priority of the task. -If `sync` is set to true, `f` is executed on the current thread if `x` has already finished. +If `sync` is set to true, `f` is executed on the current thread if `x` has already finished and +otherwise on the thread that `x` finished on. `prio` is ignored in this case. This should only be +done when executing `f` is cheap and non-blocking. -/ @[noinline, extern "lean_task_map"] protected def map (f : α → β) (x : Task α) (prio := Priority.default) (sync := false) : Task β := @@ -607,7 +609,9 @@ for the value of `x` to be available and then calls `f` on the result, resulting in a new task which is then run for a result. `prio`, if provided, is the priority of the task. -If `sync` is set to true, `f` is executed on the current thread if `x` has already finished. +If `sync` is set to true, `f` is executed on the current thread if `x` has already finished and +otherwise on the thread that `x` finished on. `prio` is ignored in this case. This should only be +done when executing `f` is cheap and non-blocking. -/ @[noinline, extern "lean_task_bind"] protected def bind (x : Task α) (f : α → Task β) (prio := Priority.default) (sync := false) : diff --git a/src/runtime/object.cpp b/src/runtime/object.cpp index 26102e16ce..636d7a1440 100644 --- a/src/runtime/object.cpp +++ b/src/runtime/object.cpp @@ -49,6 +49,7 @@ extern "C" LEAN_EXPORT __attribute__((weak)) void free_sized(void *ptr, size_t) // see `Task.Priority.max` #define LEAN_MAX_PRIO 8 +#define LEAN_SYNC_PRIO std::numeric_limits::max() namespace lean { @@ -760,7 +761,7 @@ class task_manager { lock.lock(); } else if (v != nullptr) { lean_assert(t->m_imp->m_closure == nullptr); - resolve_core(t, v); + resolve_core(lock, t, v); } else { // `bind` task has not finished yet, re-add as dependency of nested task // NOTE: closure MUST be extracted before unlocking the mutex as otherwise @@ -773,27 +774,30 @@ class task_manager { } } - void resolve_core(lean_task_object * t, object * v) { - handle_finished(t); + void resolve_core(unique_lock & lock, lean_task_object * t, object * v) { mark_mt(v); t->m_value = v; - /* After the task has been finished and we propagated - dependencies, we can release `m_imp` and keep just the value */ - free_task_imp(t->m_imp); + lean_task_imp * imp = t->m_imp; t->m_imp = nullptr; + handle_finished(lock, t, imp); + /* After the task has been finished and we propagated + dependencies, we can release `imp` and keep just the value */ + free_task_imp(imp); m_task_finished_cv.notify_all(); } - void handle_finished(lean_task_object * t) { - lean_task_object * it = t->m_imp->m_head_dep; - t->m_imp->m_head_dep = nullptr; + void handle_finished(unique_lock & lock, lean_task_object * t, lean_task_imp * imp) { + lean_task_object * it = imp->m_head_dep; + imp->m_head_dep = nullptr; while (it) { - if (t->m_imp->m_canceled) + if (imp->m_canceled) it->m_imp->m_canceled = true; lean_task_object * next_it = it->m_imp->m_next_dep; it->m_imp->m_next_dep = nullptr; if (it->m_imp->m_deleted) { free_task(it); + } else if (it->m_imp->m_prio == LEAN_SYNC_PRIO) { + run_task(lock, it); } else { enqueue_core(it); } @@ -844,7 +848,7 @@ public: dec(v); return; } - resolve_core(t, v); + resolve_core(lock, t, v); } void add_dep(lean_task_object * t1, lean_task_object * t2) { @@ -1031,7 +1035,7 @@ extern "C" LEAN_EXPORT obj_res lean_task_map_core(obj_arg f, obj_arg t, unsigned if (!g_task_manager || (sync && lean_to_task(t)->m_value)) { return lean_task_pure(apply_1(f, lean_task_get_own(t))); } else { - lean_task_object * new_task = alloc_task(mk_closure_3_2(task_map_fn, f, t), prio, keep_alive); + lean_task_object * new_task = alloc_task(mk_closure_3_2(task_map_fn, f, t), sync ? LEAN_SYNC_PRIO : prio, keep_alive); g_task_manager->add_dep(lean_to_task(t), new_task); return (lean_object*)new_task; } @@ -1074,7 +1078,7 @@ extern "C" LEAN_EXPORT obj_res lean_task_bind_core(obj_arg x, obj_arg f, unsigne if (!g_task_manager || (sync && lean_to_task(x)->m_value)) { return apply_1(f, lean_task_get_own(x)); } else { - lean_task_object * new_task = alloc_task(mk_closure_3_2(task_bind_fn1, x, f), prio, keep_alive); + lean_task_object * new_task = alloc_task(mk_closure_3_2(task_bind_fn1, x, f), sync ? LEAN_SYNC_PRIO : prio, keep_alive); g_task_manager->add_dep(lean_to_task(x), new_task); return (lean_object*)new_task; }