This PR adds `Notify` that is a structure that is similar to `CondVar`
but it's used for concurrency. The main difference between
`Std.Sync.Notify` and `Std.Condvar` is that depends on a `Std.Mutex` and
blocks the entire thread that the `Task` is using while waiting. If I
try to use it with async and a lot of `Task`s like this:
```lean
def condvar : Async Unit := do
let condvar ← Std.Condvar.new
let mutex ← Std.Mutex.new false
for i in [0:threads] do
background do
IO.println s!"start {i + 1}"
await =<< (show IO (ETask _ _) from IO.asTask (mutex.atomically (condvar.wait mutex)))
IO.println s!"end {i + 1}"
IO.sleep 2000
condvar.notifyAll
```
It causes some weird behavior because some tasks start running and get
notified, while others don’t, because `condvar.wait` blocks the `Task`
entire task and right now afaik it blocks an entire thread and cannot be
paused while doing blocking operations like that.
`Notify` uses `Promise`s so it’s better suited for concurrency. The
`Task` is not blocked while waiting for a notification which makes it
simpler for use cases that just involve notifying:
```lean
def notify : Async Unit := do
let notify ← Std.Notify.new
for i in [0:threads] do
background do
IO.println s!"start {i}"
notify.wait
IO.println s!"end {i}"
IO.sleep 2000
notify.notify
```
This PR depends on: #10366, #10367 and #10370.
108 lines
2.7 KiB
Text
108 lines
2.7 KiB
Text
import Std.Internal.Async
|
|
import Std.Sync
|
|
|
|
open Std.Internal.IO Async
|
|
|
|
-- Test basic wait and notifyOne functionality
|
|
def testBasicWaitNotifyOne : Async Unit := do
|
|
let notify ← Std.Notify.new
|
|
let waitTask ← notify.wait
|
|
|
|
assert! (← waitTask.getState) = .waiting
|
|
discard <| notify.notifyOne
|
|
await waitTask
|
|
assert! (← waitTask.getState) = .finished
|
|
|
|
#eval testBasicWaitNotifyOne.block
|
|
|
|
-- Test multiple waiters with notifyOne (only one should be notified)
|
|
def testMultipleWaitersNotifyOne : Async Unit := do
|
|
let notify ← Std.Notify.new
|
|
let task1 ← notify.wait
|
|
let task2 ← notify.wait
|
|
let task3 ← notify.wait
|
|
|
|
assert! (← task1.getState) = .waiting
|
|
assert! (← task2.getState) = .waiting
|
|
assert! (← task3.getState) = .waiting
|
|
|
|
discard <| notify.notifyOne
|
|
|
|
IO.sleep 100
|
|
|
|
let states ← [task1, task2, task3].mapM (fun t => t.getState)
|
|
let finishedCount := states.filter (· == .finished) |>.length
|
|
let waitingCount := states.filter (· == .waiting) |>.length
|
|
|
|
assert! finishedCount == 1
|
|
assert! waitingCount == 2
|
|
|
|
discard <| notify.notifyOne
|
|
|
|
#eval testMultipleWaitersNotifyOne.block
|
|
|
|
-- Test multiple waiters with notify (all should be notified)
|
|
def testMultipleWaitersNotifyAll : Async Unit := do
|
|
let notify ← Std.Notify.new
|
|
let task1 ← notify.wait
|
|
let task2 ← notify.wait
|
|
let task3 ← notify.wait
|
|
|
|
assert! (← task1.getState) = .waiting
|
|
assert! (← task2.getState) = .waiting
|
|
assert! (← task3.getState) = .waiting
|
|
|
|
discard <| notify.notify
|
|
|
|
await task1
|
|
await task2
|
|
await task3
|
|
|
|
assert! (← task1.getState) = .finished
|
|
assert! (← task2.getState) = .finished
|
|
assert! (← task3.getState) = .finished
|
|
|
|
#eval testMultipleWaitersNotifyAll.block
|
|
|
|
-- Test sequential notification
|
|
def testSequentialNotification : Async Unit := do
|
|
let notify ← Std.Notify.new
|
|
let task1 ← notify.wait
|
|
let task2 ← notify.wait
|
|
let task3 ← notify.wait
|
|
|
|
discard <| notify.notifyOne
|
|
await task1
|
|
assert! (← task1.getState) = .finished
|
|
assert! (← task2.getState) = .waiting
|
|
assert! (← task3.getState) = .waiting
|
|
|
|
discard <| notify.notifyOne
|
|
await task2
|
|
assert! (← task2.getState) = .finished
|
|
assert! (← task3.getState) = .waiting
|
|
|
|
discard <| notify.notifyOne
|
|
await task3
|
|
assert! (← task3.getState) = .finished
|
|
|
|
#eval testSequentialNotification.block
|
|
|
|
def testReuseAfterCompletion : Async Unit := do
|
|
let notify ← Std.Notify.new
|
|
|
|
let task1 ← notify.wait
|
|
discard <| notify.notifyOne
|
|
await task1
|
|
assert! (← task1.getState) = .finished
|
|
|
|
let task2 ← notify.wait
|
|
let task3 ← notify.wait
|
|
discard <| notify.notify
|
|
await task2
|
|
await task3
|
|
|
|
assert! (← task2.getState) = .finished
|
|
assert! (← task3.getState) = .finished
|
|
|
|
#eval testReuseAfterCompletion.block
|