diff --git a/src/Init/Core.lean b/src/Init/Core.lean index 542f3afd5d..ab2f389078 100644 --- a/src/Init/Core.lean +++ b/src/Init/Core.lean @@ -593,7 +593,9 @@ set_option linter.unusedVariables.funArgs false in be available and then calls `f` on the result. `prio`, if provided, is the priority of the task. -If `sync` is set to true, `f` is executed on the current thread if `x` has already finished. +If `sync` is set to true, `f` is executed on the current thread if `x` has already finished and +otherwise on the thread that `x` finished on. `prio` is ignored in this case. This should only be +done when executing `f` is cheap and non-blocking. -/ @[noinline, extern "lean_task_map"] protected def map (f : α → β) (x : Task α) (prio := Priority.default) (sync := false) : Task β := @@ -607,7 +609,9 @@ for the value of `x` to be available and then calls `f` on the result, resulting in a new task which is then run for a result. `prio`, if provided, is the priority of the task. -If `sync` is set to true, `f` is executed on the current thread if `x` has already finished. +If `sync` is set to true, `f` is executed on the current thread if `x` has already finished and +otherwise on the thread that `x` finished on. `prio` is ignored in this case. This should only be +done when executing `f` is cheap and non-blocking. -/ @[noinline, extern "lean_task_bind"] protected def bind (x : Task α) (f : α → Task β) (prio := Priority.default) (sync := false) : diff --git a/src/runtime/object.cpp b/src/runtime/object.cpp index 26102e16ce..636d7a1440 100644 --- a/src/runtime/object.cpp +++ b/src/runtime/object.cpp @@ -49,6 +49,7 @@ extern "C" LEAN_EXPORT __attribute__((weak)) void free_sized(void *ptr, size_t) // see `Task.Priority.max` #define LEAN_MAX_PRIO 8 +#define LEAN_SYNC_PRIO std::numeric_limits::max() namespace lean { @@ -760,7 +761,7 @@ class task_manager { lock.lock(); } else if (v != nullptr) { lean_assert(t->m_imp->m_closure == nullptr); - resolve_core(t, v); + resolve_core(lock, t, v); } else { // `bind` task has not finished yet, re-add as dependency of nested task // NOTE: closure MUST be extracted before unlocking the mutex as otherwise @@ -773,27 +774,30 @@ class task_manager { } } - void resolve_core(lean_task_object * t, object * v) { - handle_finished(t); + void resolve_core(unique_lock & lock, lean_task_object * t, object * v) { mark_mt(v); t->m_value = v; - /* After the task has been finished and we propagated - dependencies, we can release `m_imp` and keep just the value */ - free_task_imp(t->m_imp); + lean_task_imp * imp = t->m_imp; t->m_imp = nullptr; + handle_finished(lock, t, imp); + /* After the task has been finished and we propagated + dependencies, we can release `imp` and keep just the value */ + free_task_imp(imp); m_task_finished_cv.notify_all(); } - void handle_finished(lean_task_object * t) { - lean_task_object * it = t->m_imp->m_head_dep; - t->m_imp->m_head_dep = nullptr; + void handle_finished(unique_lock & lock, lean_task_object * t, lean_task_imp * imp) { + lean_task_object * it = imp->m_head_dep; + imp->m_head_dep = nullptr; while (it) { - if (t->m_imp->m_canceled) + if (imp->m_canceled) it->m_imp->m_canceled = true; lean_task_object * next_it = it->m_imp->m_next_dep; it->m_imp->m_next_dep = nullptr; if (it->m_imp->m_deleted) { free_task(it); + } else if (it->m_imp->m_prio == LEAN_SYNC_PRIO) { + run_task(lock, it); } else { enqueue_core(it); } @@ -844,7 +848,7 @@ public: dec(v); return; } - resolve_core(t, v); + resolve_core(lock, t, v); } void add_dep(lean_task_object * t1, lean_task_object * t2) { @@ -1031,7 +1035,7 @@ extern "C" LEAN_EXPORT obj_res lean_task_map_core(obj_arg f, obj_arg t, unsigned if (!g_task_manager || (sync && lean_to_task(t)->m_value)) { return lean_task_pure(apply_1(f, lean_task_get_own(t))); } else { - lean_task_object * new_task = alloc_task(mk_closure_3_2(task_map_fn, f, t), prio, keep_alive); + lean_task_object * new_task = alloc_task(mk_closure_3_2(task_map_fn, f, t), sync ? LEAN_SYNC_PRIO : prio, keep_alive); g_task_manager->add_dep(lean_to_task(t), new_task); return (lean_object*)new_task; } @@ -1074,7 +1078,7 @@ extern "C" LEAN_EXPORT obj_res lean_task_bind_core(obj_arg x, obj_arg f, unsigne if (!g_task_manager || (sync && lean_to_task(x)->m_value)) { return apply_1(f, lean_task_get_own(x)); } else { - lean_task_object * new_task = alloc_task(mk_closure_3_2(task_bind_fn1, x, f), prio, keep_alive); + lean_task_object * new_task = alloc_task(mk_closure_3_2(task_bind_fn1, x, f), sync ? LEAN_SYNC_PRIO : prio, keep_alive); g_task_manager->add_dep(lean_to_task(x), new_task); return (lean_object*)new_task; }