feat: add AsyncStream, AsyncWrite and AsyncRead type classes (#10370)
This PR adds async type classes for streams.
This commit is contained in:
parent
8443600762
commit
94e5b66dfe
3 changed files with 81 additions and 2 deletions
|
|
@ -15,5 +15,6 @@ public import Std.Internal.Async.Select
|
|||
public import Std.Internal.Async.Process
|
||||
public import Std.Internal.Async.System
|
||||
public import Std.Internal.Async.Signal
|
||||
public import Std.Internal.Async.IO
|
||||
|
||||
public section
|
||||
|
|
|
|||
54
src/Std/Internal/Async/IO.lean
Normal file
54
src/Std/Internal/Async/IO.lean
Normal file
|
|
@ -0,0 +1,54 @@
|
|||
/-
|
||||
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
|
||||
Released under Apache 2.0 license as described in the file LICENSE.
|
||||
Authors: Sofia Rodrigues
|
||||
-/
|
||||
module
|
||||
|
||||
prelude
|
||||
public import Std.Internal.Async.Select
|
||||
|
||||
public section
|
||||
|
||||
namespace Std
|
||||
namespace Internal
|
||||
namespace Async
|
||||
namespace IO
|
||||
|
||||
open Std.Internal.IO.Async
|
||||
|
||||
/-!
|
||||
This module provides buffered asynchronous I/O operations for efficient reading and writing.
|
||||
-/
|
||||
|
||||
/--
|
||||
Interface for asynchronous reading operations.
|
||||
-/
|
||||
class AsyncRead (α : Type) (β : Type) where
|
||||
read : α → Async β
|
||||
|
||||
/--
|
||||
Interface for asynchronous writing operations.
|
||||
-/
|
||||
class AsyncWrite (α : Type) (β : Type) where
|
||||
write : α → β → Async Unit
|
||||
|
||||
writeAll : α → Array β → Async Unit :=
|
||||
fun socket data => data.forM (write socket)
|
||||
|
||||
flush : α → Async Unit :=
|
||||
fun _ => pure ()
|
||||
|
||||
/--
|
||||
Interface for asynchronous streaming with selector-based iteration.
|
||||
-/
|
||||
class AsyncStream (α : Type) (β : outParam Type) where
|
||||
next : α → Selector β
|
||||
|
||||
stop : α → IO Unit :=
|
||||
fun _ => pure ()
|
||||
|
||||
end IO
|
||||
end Async
|
||||
end Internal
|
||||
end Std
|
||||
|
|
@ -10,9 +10,13 @@ public import Init.System.Promise
|
|||
public import Init.Data.Queue
|
||||
public import Std.Sync.Mutex
|
||||
public import Std.Internal.Async.Select
|
||||
public import Std.Internal.Async.IO
|
||||
|
||||
public section
|
||||
|
||||
open Std.Internal.Async.IO
|
||||
open Std.Internal.IO.Async
|
||||
|
||||
/-!
|
||||
This module contains the implementation of `Std.Channel`. `Std.Channel` is a multi-producer
|
||||
multi-consumer FIFO channel that offers both bounded and unbounded buffering as well as synchronous
|
||||
|
|
@ -24,7 +28,6 @@ for cleaner code.
|
|||
-/
|
||||
|
||||
namespace Std
|
||||
|
||||
namespace CloseableChannel
|
||||
|
||||
/--
|
||||
|
|
@ -753,6 +756,17 @@ partial def forAsync (f : α → BaseIO Unit) (ch : CloseableChannel α)
|
|||
| none => return .pure ()
|
||||
| some v => do f v; ch.forAsync f prio
|
||||
|
||||
instance [Inhabited α] : AsyncStream (CloseableChannel α) (Option α) where
|
||||
next channel := channel.recvSelector
|
||||
|
||||
instance [Inhabited α] : AsyncRead (CloseableChannel α) (Option α) where
|
||||
read receiver := Internal.IO.Async.Async.ofIOTask receiver.recv
|
||||
|
||||
instance [Inhabited α] : AsyncWrite (CloseableChannel α) α where
|
||||
write receiver x := do
|
||||
let task ← receiver.send x
|
||||
Async.ofAsyncTask <| task.map (Except.mapError (IO.userError ∘ toString))
|
||||
|
||||
/--
|
||||
This function is a no-op and just a convenient way to expose the synchronous API of the channel.
|
||||
-/
|
||||
|
|
@ -804,7 +818,6 @@ instance [MonadLiftT BaseIO m] : ForIn m (Sync α) α where
|
|||
forIn ch b f := private ch.forIn f b
|
||||
|
||||
end Sync
|
||||
|
||||
end CloseableChannel
|
||||
|
||||
/--
|
||||
|
|
@ -893,6 +906,17 @@ partial def forAsync [Inhabited α] (f : α → BaseIO Unit) (ch : Channel α)
|
|||
(prio : Task.Priority := .default) : BaseIO (Task Unit) := do
|
||||
BaseIO.bindTask (prio := prio) (← ch.recv) fun v => do f v; ch.forAsync f prio
|
||||
|
||||
instance [Inhabited α] : AsyncStream (Channel α) α where
|
||||
next channel := channel.recvSelector
|
||||
|
||||
instance [Inhabited α] : AsyncRead (Channel α) α where
|
||||
read receiver := Internal.IO.Async.Async.ofIOTask receiver.recv
|
||||
|
||||
instance [Inhabited α] : AsyncWrite (Channel α) α where
|
||||
write receiver x := do
|
||||
let task ← receiver.send x
|
||||
Async.ofTask task
|
||||
|
||||
@[inherit_doc CloseableChannel.sync, inline]
|
||||
def sync (ch : Channel α) : Channel.Sync α := ch
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue