fix: remove auto-cancellation of IO tasks
Chained tasks were never auto-canceled, so let's be explicit everywhere
This commit is contained in:
parent
91dac6ccff
commit
4a262fbc5b
3 changed files with 14 additions and 41 deletions
|
|
@ -93,7 +93,7 @@ def sleep (ms : UInt32) : IO Unit :=
|
|||
except that the `Task` is started eagerly as usual. Thus pure accesses to the `Task` do not influence the impure `act`
|
||||
computation.
|
||||
Unlike with pure tasks created by `Task.mk`, tasks created by this function will be run even if the last reference
|
||||
to the task is dropped. `act` should manually check for cancellation via `IO.checkInterrupt` if it wants to react
|
||||
to the task is dropped. `act` should manually check for cancellation via `IO.checkCanceled` if it wants to react
|
||||
to that. -/
|
||||
@[extern "lean_io_as_task"]
|
||||
constant asTask (act : IO α) (prio := Task.Priority.default) : IO (Task (Except IO.Error α))
|
||||
|
|
|
|||
|
|
@ -214,12 +214,6 @@ typedef struct {
|
|||
uint8_t m_canceled;
|
||||
// If true, task will not be freed until finished
|
||||
uint8_t m_keep_alive;
|
||||
// If true, task should be destroyed when finished.
|
||||
// Invariant: m_kept_alive ==> m_keep_alive && RC == 0
|
||||
// The reverse direction is violated only in the window between `lean_dec`
|
||||
// and `deactivate_task`, which is not covered by the task_manager mutex.
|
||||
// Thus we need this additional flag, which is covered.
|
||||
uint8_t m_kept_alive;
|
||||
uint8_t m_deleted;
|
||||
} lean_task_imp;
|
||||
|
||||
|
|
@ -237,24 +231,24 @@ typedef struct {
|
|||
* Queued
|
||||
* condition: in task_manager::m_queues && m_imp != nullptr && !m_imp->m_deleted
|
||||
* invariant: m_value == nullptr
|
||||
* transition: RC becomes 0 && !m_imp->m_keep_alive ==> Deactivated (`deactivate_task` lock)
|
||||
* transition: RC becomes 0 ==> Deactivated (`deactivate_task` lock)
|
||||
* transition: dequeued by worker thread ==> Running (`spawn_worker` lock)
|
||||
* Waiting
|
||||
* condition: reachable from task via `m_head_dep->m_next_dep->...` && !m_imp->m_deleted
|
||||
* invariant: m_imp != nullptr && m_value == nullptr
|
||||
* invariant: task dependency is Queued/Waiting/Running
|
||||
* It cannot become Deactivated because this task should be holding an owned reference to it
|
||||
* transition: RC becomes 0 && !m_imp->m_keep_alive ==> Deactivated (`deactivate_task` lock)
|
||||
* transition: RC becomes 0 ==> Deactivated (`deactivate_task` lock)
|
||||
* transition: task dependency Finished ==> Queued (`handle_finished` under `spawn_worker` lock)
|
||||
* Running
|
||||
* condition: m_imp != nullptr && m_imp->m_closure == nullptr
|
||||
* The worker takes ownership of the closure when running it
|
||||
* invariant: m_value == nullptr
|
||||
* transition: RC becomes 0 && !m_imp->m_keep_alive ==> Deactivated (`deactivate_task` lock)
|
||||
* transition: RC becomes 0 ==> Deactivated (`deactivate_task` lock)
|
||||
* transition: finished execution ==> Finished (`spawn_worker` lock)
|
||||
* Deactivated
|
||||
* condition: m_imp != nullptr && m_imp->m_deleted
|
||||
* invariant: RC == 0 && !m_imp->m_keep_alive
|
||||
* invariant: RC == 0
|
||||
* invariant: m_imp->m_closure == nullptr && m_imp->m_head_dep == nullptr (both freed by `deactivate_task_core`)
|
||||
* Note that all dependent tasks must have already been Deactivated by the converse of the second Waiting invariant
|
||||
* invariant: m_value == nullptr
|
||||
|
|
@ -267,22 +261,7 @@ typedef struct {
|
|||
* Finished
|
||||
* condition: m_value != nullptr
|
||||
* invariant: m_imp == nullptr
|
||||
* transition: RC becomes 0 ==> freed (`deactivate_task` lock)
|
||||
|
||||
Queued/Waiting/Running tasks can also enter the following sub-state, which is used in place of Deactivated to make
|
||||
sure that IO tasks are always executed even when no live references to them exist. All transitions of the super-state
|
||||
still hold (i.e. a waiting kept-alive task is still enqueued, then run, and finally freed).
|
||||
* Queued/Waiting/Running
|
||||
* transition: RC becomes 0 && m_imp->m_keep_alive ==> KeptAlive (`deactivate_task` lock)
|
||||
* KeptAlive
|
||||
* condition: m_imp != nullptr && m_imp->m_kept_alive
|
||||
* invariant: RC == 0 && m_imp->m_keep_alive
|
||||
* invariant: m_imp->canceled
|
||||
* This is the reason we cannot simply keep the task alive by holding an extra RC token: we want it canceled when
|
||||
the RC becomes 0!
|
||||
* invariant: m_value == nullptr
|
||||
* transition: finished execution ==> freed
|
||||
* actually implemented as two transitions "... ==> Deactivated ==> freed" */
|
||||
* transition: RC becomes 0 ==> freed (`deactivate_task` lock) */
|
||||
typedef struct lean_task {
|
||||
lean_object m_header;
|
||||
_Atomic(lean_object *) m_value;
|
||||
|
|
|
|||
|
|
@ -626,7 +626,6 @@ static lean_task_imp * alloc_task_imp(obj_arg c, unsigned prio, bool keep_alive)
|
|||
imp->m_prio = prio;
|
||||
imp->m_canceled = false;
|
||||
imp->m_keep_alive = keep_alive;
|
||||
imp->m_kept_alive = false;
|
||||
imp->m_deleted = false;
|
||||
return imp;
|
||||
}
|
||||
|
|
@ -759,16 +758,13 @@ class task_manager {
|
|||
t->m_imp->m_closure = nullptr;
|
||||
lock.unlock();
|
||||
v = lean_apply_1(c, box(0));
|
||||
// If deactivation was delayed by `m_keep_alive`, deactivate after the final execution (`v != nulltpr`)
|
||||
if (v != nullptr && t->m_imp->m_keep_alive) {
|
||||
lean_dec_ref((lean_object*)t);
|
||||
}
|
||||
lock.lock();
|
||||
}
|
||||
lean_assert(t->m_imp);
|
||||
// If deactivation was delayed by `m_keep_alive`, deactivate after the final execution (`v != nulltpr`)
|
||||
if (v != nullptr && t->m_imp->m_kept_alive) {
|
||||
lean_assert(!lean_nonzero_rc((lean_object *)t));
|
||||
deactivate_task_core(lock, t);
|
||||
}
|
||||
// Note: if deactivation was not delayed yet, `m_keep_alive` will be discarded below when
|
||||
// `m_imp` is freed
|
||||
if (t->m_imp->m_deleted) {
|
||||
lock.unlock();
|
||||
if (v) lean_dec(v);
|
||||
|
|
@ -884,12 +880,7 @@ public:
|
|||
return;
|
||||
} else {
|
||||
lean_assert(t->m_imp);
|
||||
if (t->m_imp->m_keep_alive) {
|
||||
t->m_imp->m_canceled = true;
|
||||
t->m_imp->m_kept_alive = true;
|
||||
} else {
|
||||
deactivate_task_core(lock, t);
|
||||
}
|
||||
deactivate_task_core(lock, t);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -962,6 +953,8 @@ static lean_task_object * alloc_task(obj_arg c, unsigned prio, bool keep_alive)
|
|||
lean_set_task_header((lean_object*)o);
|
||||
o->m_value = nullptr;
|
||||
o->m_imp = alloc_task_imp(c, prio, keep_alive);
|
||||
if (keep_alive)
|
||||
lean_inc_ref((lean_object*)o);
|
||||
return o;
|
||||
}
|
||||
|
||||
|
|
@ -1034,6 +1027,7 @@ static obj_res task_bind_fn1(obj_arg x, obj_arg f, obj_arg) {
|
|||
lean_assert(g_current_task_object->m_imp->m_closure == nullptr);
|
||||
obj_res c = mk_closure_2_1(task_bind_fn2, new_task);
|
||||
mark_mt(c);
|
||||
std::cerr << "reviving " << g_current_task_object << std::endl;
|
||||
g_current_task_object->m_imp->m_closure = c;
|
||||
return nullptr; /* notify queue that task did not finish yet. */
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue