From d6e69649b6febc4834ae9f32da734a9fb43c34f2 Mon Sep 17 00:00:00 2001 From: Mac Malone Date: Sat, 28 Mar 2026 00:46:43 -0400 Subject: [PATCH] refactor: lake: fetch artifact URLs in a single Reservoir request (#13164) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR changes `lake cache get` to fetch artifact cloud storage URLs from Reservoir in a single bulk POST request rather than relying on per-artifact HTTP redirects. When downloading many artifacts, the redirect-based approach sends one request per artifact to the Reservoir web host (Netlify), which can be slow and risks hitting rate limits. The bulk endpoint returns all URLs at once, so curl only talks to the CDN after that. Non-Reservoir cache services are unaffected and continue using direct URLs as before. 🤖 Prepared with Claude Code --- src/lake/Lake/Config/Cache.lean | 85 ++++++++++++++++++++++++++++--- src/lake/Lake/Reservoir.lean | 18 ------- src/lake/Lake/Util/Reservoir.lean | 25 ++++++++- 3 files changed, 100 insertions(+), 28 deletions(-) diff --git a/src/lake/Lake/Config/Cache.lean b/src/lake/Lake/Config/Cache.lean index 7d34182ee1..371fadaa7d 100644 --- a/src/lake/Lake/Config/Cache.lean +++ b/src/lake/Lake/Config/Cache.lean @@ -765,12 +765,13 @@ where \n remote URL: {info.url}" match cfg.kind with | .get => - if let .ok size := out.getAs Nat "size_download" then - if size > 0 then - if let .ok contentType := out.getAs String "content_type" then - if contentType != artifactContentType then - if let .ok resp ← IO.FS.readFile info.path |>.toBaseIO then - msg := s!"{msg}\nunexpected response:\n{resp}" + unless code? matches .ok 404 do -- ignore response bodies on 404s + if let .ok size := out.getAs Nat "size_download" then + if size > 0 then + if let .ok contentType := out.getAs String "content_type" then + if contentType != artifactContentType then + if let .ok resp ← IO.FS.readFile info.path |>.toBaseIO then + msg := s!"{msg}\nunexpected response:\n{resp}" removeFileIfExists info.path | .put => if let .ok size := out.getAs Nat "size_download" then @@ -787,7 +788,7 @@ private def transferArtifacts match cfg.kind with | .get => cfg.infos.forM fun info => do - h.putStrLn s!"url = {info.url}" + h.putStrLn s!"url = {info.url.quote}" h.putStrLn s!"-o {info.path.toString.quote}" h.flush return #[ @@ -798,7 +799,7 @@ private def transferArtifacts | .put => cfg.infos.forM fun info => do h.putStrLn s!"-T {info.path.toString.quote}" - h.putStrLn s!"url = {info.url}" + h.putStrLn s!"url = {info.url.quote}" h.flush return #[ "-Z", "-X", "PUT", "-L", @@ -827,6 +828,13 @@ private def transferArtifacts if s.didError then failure +private def reservoirArtifactsUrl (service : CacheService) (scope : CacheServiceScope) : String := + let endpoint := + match scope.impl with + | .repo scope => appendScope s!"{service.impl.apiEndpoint}/repositories" scope + | .str scope => appendScope s!"{service.impl.apiEndpoint}/packages" scope + s!"{endpoint}/artifacts" + public def downloadArtifacts (descrs : Array ArtifactDescr) (cache : Cache) (service : CacheService) (scope : CacheServiceScope) (force := false) @@ -844,8 +852,69 @@ public def downloadArtifacts return s.push {url, path, descr} if infos.isEmpty then return + let infos ← id do + if service.isReservoir then + -- Artifact cloud storage URLs are fetched in a single request + -- to avoid hammering the Reservoir web host + fetchUrls (service.reservoirArtifactsUrl scope) infos + else return infos IO.FS.createDirAll cache.artifactDir transferArtifacts {scope, infos, kind := .get} +where + fetchUrls url infos := IO.FS.withTempFile fun h path => do + let body := Json.arr <| infos.map (toJson ·.descr.hash) + h.putStr body.compress + h.flush + let args := #[ + "-X", "POST", "-L", "-d", s!"@{path}", + "--retry", "3", -- intermittent network errors can occur + "-s", "-w", "%{stderr}%{json}\n", + "-H", "Content-Type: application/json", + ] + let args := Reservoir.lakeHeaders.foldl (· ++ #["-H", ·]) args + let child ← IO.Process.spawn { + cmd := "curl", args := args.push url + stdout := .piped, stderr := .piped + } + let rc ← child.wait + let stdout ← child.stdout.readToEnd + let stderr ← child.stderr.readToEnd + match Json.parse stdout >>= fromJson? with + | .ok (resp : ReservoirResp (Array String)) => + match resp with + | .data urls => + if h : infos.size = urls.size then + let s := infos.size.fold (init := infos.toVector) fun i hi s => + s.set i {s[i] with url := urls[i]'(h ▸ hi)} + return s.toArray + else + error s!"failed to fetch artifact URLs\ + \n POST {url}\ + \nIncorrect number of results: expected {infos.size}, got {urls.size}" + | .error status message => + error s!"failed to fetch artifact URLs (status code: {status})\ + \n POST {url}\ + \nReservoir error: {message}" + | .error _ => + match Json.parse stderr >>= fromJson? with + | .ok (out : JsonObject) => + let mut msg := "failed to fetch artifact URLs" + if let .ok code := out.getAs Nat "http_code" then + msg := s!"{msg} (status code: {code})" + msg := s!"{msg}\n POST {url}" + if let .ok errMsg := out.getAs String "errormsg" then + msg := s!"{msg}\n Transfer error: {errMsg}" + unless stdout.isEmpty do + msg := s!"{msg}\nstdout:\n{stdout.trimAsciiEnd}" + logError msg + logVerbose s!"curl JSON:\n{stderr.trimAsciiEnd}" + | .error e => + logError s!"failed to fetch artifact URLs\ + \n POST {url} + \nInvalid curl JSON: {e}; received: {stderr.trimAscii}" + unless stdout.isEmpty do + logWarning s!"curl produced unexpected output:\n{stdout.trimAsciiEnd}" + error s!"curl exited with code {rc}" @[deprecated "Deprecated without replacement." (since := "2026-02-27")] public def downloadOutputArtifacts diff --git a/src/lake/Lake/Reservoir.lean b/src/lake/Lake/Reservoir.lean index 2973a6505e..20789b581b 100644 --- a/src/lake/Lake/Reservoir.lean +++ b/src/lake/Lake/Reservoir.lean @@ -103,24 +103,6 @@ public instance : FromJson RegistryPkg := ⟨RegistryPkg.fromJson?⟩ end RegistryPkg -/-- A Reservoir API response object. -/ -public inductive ReservoirResp (α : Type u) -| data (a : α) -| error (status : Nat) (message : String) - -public protected def ReservoirResp.fromJson? [FromJson α] (val : Json) : Except String (ReservoirResp α) := do - let obj ← JsonObject.fromJson? val - if let some (err : JsonObject) ← obj.get? "error" then - let status ← err.get "status" - let message ← err.get "message" - return .error status message - else if let some (val : Json) ← obj.get? "data" then - .data <$> fromJson? val - else - .data <$> fromJson? val - -public instance [FromJson α] : FromJson (ReservoirResp α) := ⟨ReservoirResp.fromJson?⟩ - public def Reservoir.pkgApiUrl (lakeEnv : Lake.Env) (owner pkg : String) := s!"{lakeEnv.reservoirApiUrl}/packages/{uriEncode owner}/{uriEncode pkg}" diff --git a/src/lake/Lake/Util/Reservoir.lean b/src/lake/Lake/Util/Reservoir.lean index 97c728ac7c..ab19b5e5c7 100644 --- a/src/lake/Lake/Util/Reservoir.lean +++ b/src/lake/Lake/Util/Reservoir.lean @@ -6,8 +6,9 @@ Authors: Mac Malone module prelude -public import Init.Prelude -import Init.Data.Array.Basic +public import Lake.Util.JsonObject + +open Lean namespace Lake @@ -15,3 +16,23 @@ public def Reservoir.lakeHeaders : Array String := #[ "X-Reservoir-Api-Version:1.0.0", "X-Lake-Registry-Api-Version:0.1.0" ] + +/-- A Reservoir API response object. -/ +public inductive ReservoirResp (α : Type u) +| data (a : α) +| error (status : Nat) (message : String) + +public protected def ReservoirResp.fromJson? [FromJson α] (val : Json) : Except String (ReservoirResp α) := do + if let .ok obj := JsonObject.fromJson? val then + if let some (err : JsonObject) ← obj.get? "error" then + let status ← err.get "status" + let message ← err.get "message" + return .error status message + else if let some (val : Json) ← obj.get? "data" then + .data <$> fromJson? val + else + .data <$> fromJson? val + else + .data <$> fromJson? val + +public instance [FromJson α] : FromJson (ReservoirResp α) := ⟨ReservoirResp.fromJson?⟩