From 94e5b66dfe519994561016803cb021f06ef9aa10 Mon Sep 17 00:00:00 2001 From: Sofia Rodrigues Date: Tue, 23 Sep 2025 20:30:33 -0300 Subject: [PATCH] feat: add `AsyncStream`, `AsyncWrite` and `AsyncRead` type classes (#10370) This PR adds async type classes for streams. --- src/Std/Internal/Async.lean | 1 + src/Std/Internal/Async/IO.lean | 54 ++++++++++++++++++++++++++++++++++ src/Std/Sync/Channel.lean | 28 ++++++++++++++++-- 3 files changed, 81 insertions(+), 2 deletions(-) create mode 100644 src/Std/Internal/Async/IO.lean diff --git a/src/Std/Internal/Async.lean b/src/Std/Internal/Async.lean index 05627b599f..d17797a266 100644 --- a/src/Std/Internal/Async.lean +++ b/src/Std/Internal/Async.lean @@ -15,5 +15,6 @@ public import Std.Internal.Async.Select public import Std.Internal.Async.Process public import Std.Internal.Async.System public import Std.Internal.Async.Signal +public import Std.Internal.Async.IO public section diff --git a/src/Std/Internal/Async/IO.lean b/src/Std/Internal/Async/IO.lean new file mode 100644 index 0000000000..cb3a80227a --- /dev/null +++ b/src/Std/Internal/Async/IO.lean @@ -0,0 +1,54 @@ +/- +Copyright (c) 2025 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Authors: Sofia Rodrigues +-/ +module + +prelude +public import Std.Internal.Async.Select + +public section + +namespace Std +namespace Internal +namespace Async +namespace IO + +open Std.Internal.IO.Async + +/-! +This module provides buffered asynchronous I/O operations for efficient reading and writing. +-/ + +/-- +Interface for asynchronous reading operations. +-/ +class AsyncRead (α : Type) (β : Type) where + read : α → Async β + +/-- +Interface for asynchronous writing operations. +-/ +class AsyncWrite (α : Type) (β : Type) where + write : α → β → Async Unit + + writeAll : α → Array β → Async Unit := + fun socket data => data.forM (write socket) + + flush : α → Async Unit := + fun _ => pure () + +/-- +Interface for asynchronous streaming with selector-based iteration. +-/ +class AsyncStream (α : Type) (β : outParam Type) where + next : α → Selector β + + stop : α → IO Unit := + fun _ => pure () + +end IO +end Async +end Internal +end Std diff --git a/src/Std/Sync/Channel.lean b/src/Std/Sync/Channel.lean index 68b14ce15c..35cac026ad 100644 --- a/src/Std/Sync/Channel.lean +++ b/src/Std/Sync/Channel.lean @@ -10,9 +10,13 @@ public import Init.System.Promise public import Init.Data.Queue public import Std.Sync.Mutex public import Std.Internal.Async.Select +public import Std.Internal.Async.IO public section +open Std.Internal.Async.IO +open Std.Internal.IO.Async + /-! This module contains the implementation of `Std.Channel`. `Std.Channel` is a multi-producer multi-consumer FIFO channel that offers both bounded and unbounded buffering as well as synchronous @@ -24,7 +28,6 @@ for cleaner code. -/ namespace Std - namespace CloseableChannel /-- @@ -753,6 +756,17 @@ partial def forAsync (f : α → BaseIO Unit) (ch : CloseableChannel α) | none => return .pure () | some v => do f v; ch.forAsync f prio +instance [Inhabited α] : AsyncStream (CloseableChannel α) (Option α) where + next channel := channel.recvSelector + +instance [Inhabited α] : AsyncRead (CloseableChannel α) (Option α) where + read receiver := Internal.IO.Async.Async.ofIOTask receiver.recv + +instance [Inhabited α] : AsyncWrite (CloseableChannel α) α where + write receiver x := do + let task ← receiver.send x + Async.ofAsyncTask <| task.map (Except.mapError (IO.userError ∘ toString)) + /-- This function is a no-op and just a convenient way to expose the synchronous API of the channel. -/ @@ -804,7 +818,6 @@ instance [MonadLiftT BaseIO m] : ForIn m (Sync α) α where forIn ch b f := private ch.forIn f b end Sync - end CloseableChannel /-- @@ -893,6 +906,17 @@ partial def forAsync [Inhabited α] (f : α → BaseIO Unit) (ch : Channel α) (prio : Task.Priority := .default) : BaseIO (Task Unit) := do BaseIO.bindTask (prio := prio) (← ch.recv) fun v => do f v; ch.forAsync f prio +instance [Inhabited α] : AsyncStream (Channel α) α where + next channel := channel.recvSelector + +instance [Inhabited α] : AsyncRead (Channel α) α where + read receiver := Internal.IO.Async.Async.ofIOTask receiver.recv + +instance [Inhabited α] : AsyncWrite (Channel α) α where + write receiver x := do + let task ← receiver.send x + Async.ofTask task + @[inherit_doc CloseableChannel.sync, inline] def sync (ch : Channel α) : Channel.Sync α := ch