This PR speeds up some benchmarks when run as tests by lowering their workload. It also stops testing some of the more expensive benchmarks that can't be easily made smaller.
116 lines
3.7 KiB
Text
116 lines
3.7 KiB
Text
import Std.Sync.Channel
|
|
|
|
|
|
/-
|
|
Inspired by:
|
|
https://github.com/crossbeam-rs/crossbeam/tree/bd87a61ce3858ca772c42525d5f0c9aa12cc80ac/crossbeam-channel/benchmarks.
|
|
|
|
We conduct for:
|
|
- capacity 0 channels
|
|
- capacity 1 channels
|
|
- capacity `N` channels
|
|
- unbounded channels
|
|
|
|
the following tests:
|
|
- `seq`: A single thread sends `N` messages. Then it receives `N` messages.
|
|
- `spsc`: One thread sends `N` messages. Another thread receives `N` messages.
|
|
- `mpsc`: `T` threads send `N / T` messages each. One thread receives `N` messages.
|
|
- `mpmc`: `T` threads send `N / T` messages each. `T` other threads receive `N / T` messages each.
|
|
|
|
Note that we will stick exclusively to the sync interface for this as there is no benefit to be
|
|
reaped from async in this benchmark so we might as well just block.
|
|
-/
|
|
|
|
def seq (ch : Std.CloseableChannel.Sync Nat) (amount : Nat) : IO Unit := do
|
|
for i in *...amount do
|
|
ch.send i
|
|
|
|
for _ in *...amount do
|
|
discard <| ch.recv
|
|
|
|
def spsc (ch : Std.CloseableChannel.Sync Nat) (amount : Nat) : IO Unit := do
|
|
let t1 ← IO.asTask (prio := .dedicated) do
|
|
for i in *...amount do
|
|
ch.send i
|
|
|
|
let t2 ← BaseIO.asTask (prio := .dedicated) do
|
|
for _ in *...amount do
|
|
discard <| ch.recv
|
|
|
|
IO.ofExcept (← IO.wait t1)
|
|
IO.wait t2
|
|
|
|
def mpsc (threads : Nat) (ch : Std.CloseableChannel.Sync Nat) (amount : Nat) : IO Unit := do
|
|
let mut producers := Array.emptyWithCapacity threads
|
|
for _ in *...threads do
|
|
let t ← IO.asTask (prio := .dedicated) do
|
|
for i in *...(amount/threads) do
|
|
ch.send i
|
|
producers := producers.push t
|
|
|
|
let consumer ← BaseIO.asTask (prio := .dedicated) do
|
|
for _ in *...amount do
|
|
discard <| ch.recv
|
|
|
|
IO.wait consumer
|
|
for producer in producers do
|
|
(IO.ofExcept (← IO.wait producer))
|
|
|
|
def mpmc (threads : Nat) (ch : Std.CloseableChannel.Sync Nat) (amount : Nat) : IO Unit := do
|
|
let mut producers := Array.emptyWithCapacity threads
|
|
for _ in *...threads do
|
|
let t ← IO.asTask (prio := .dedicated) do
|
|
for i in *...(amount/threads) do
|
|
ch.send i
|
|
producers := producers.push t
|
|
|
|
let mut consumers := Array.emptyWithCapacity threads
|
|
for _ in *...threads do
|
|
let t ← IO.asTask (prio := .dedicated) do
|
|
while true do
|
|
if let some _ ← ch.recv then
|
|
continue
|
|
else
|
|
break
|
|
consumers := consumers.push t
|
|
|
|
for producer in producers do
|
|
(IO.ofExcept (← IO.wait producer))
|
|
|
|
ch.close
|
|
|
|
for consumer in consumers do
|
|
(IO.ofExcept (← IO.wait consumer))
|
|
|
|
return ()
|
|
|
|
def run (name : String) (cap : Option Nat) (messages : Nat)
|
|
(bench : Std.CloseableChannel.Sync Nat → Nat → IO Unit) : IO Unit := do
|
|
let ch ← Std.CloseableChannel.new cap
|
|
let t1 ← IO.monoMsNow
|
|
bench ch.sync messages
|
|
let t2 ← IO.monoMsNow
|
|
let time : Float := (t2 - t1).toFloat / 1000.0
|
|
IO.println s!"measurement: {name} {time} s"
|
|
|
|
|
|
def main (args : List String) : IO Unit := do
|
|
let threads := args[0]!.toNat!
|
|
let messages := args[1]!.toNat!
|
|
run "bounded0_spsc" (some 0) messages spsc
|
|
run "bounded0_mpsc" (some 0) messages (mpsc threads)
|
|
run "bounded0_mpmc" (some 0) messages (mpmc threads)
|
|
|
|
run "bounded1_spsc" (some 1) messages spsc
|
|
run "bounded1_mpsc" (some 1) messages (mpsc threads)
|
|
run "bounded1_mpmc" (some 1) messages (mpmc threads)
|
|
|
|
run "boundedn_spsc" (some messages) messages spsc
|
|
run "boundedn_mpsc" (some messages) messages (mpsc threads)
|
|
run "boundedn_mpmc" (some messages) messages (mpmc threads)
|
|
run "boundedn_seq" (some messages) messages seq
|
|
|
|
run "unbounded_spsc" none messages spsc
|
|
run "unbounded_mpsc" none messages (mpsc threads)
|
|
run "unbounded_mpmc" none messages (mpmc threads)
|
|
run "unbounded_seq" none messages seq
|