refactor: lake: fetch artifact URLs in a single Reservoir request (#13164)
This PR changes `lake cache get` to fetch artifact cloud storage URLs
from Reservoir in a single bulk POST request rather than relying on
per-artifact HTTP redirects. When downloading many artifacts, the
redirect-based approach sends one request per artifact to the Reservoir
web host (Netlify), which can be slow and risks hitting rate limits. The
bulk endpoint returns all URLs at once, so curl only talks to the CDN
after that.
Non-Reservoir cache services are unaffected and continue using direct
URLs as before.
🤖 Prepared with Claude Code
This commit is contained in:
parent
337f1c455b
commit
d6e69649b6
3 changed files with 100 additions and 28 deletions
|
|
@ -765,12 +765,13 @@ where
|
|||
\n remote URL: {info.url}"
|
||||
match cfg.kind with
|
||||
| .get =>
|
||||
if let .ok size := out.getAs Nat "size_download" then
|
||||
if size > 0 then
|
||||
if let .ok contentType := out.getAs String "content_type" then
|
||||
if contentType != artifactContentType then
|
||||
if let .ok resp ← IO.FS.readFile info.path |>.toBaseIO then
|
||||
msg := s!"{msg}\nunexpected response:\n{resp}"
|
||||
unless code? matches .ok 404 do -- ignore response bodies on 404s
|
||||
if let .ok size := out.getAs Nat "size_download" then
|
||||
if size > 0 then
|
||||
if let .ok contentType := out.getAs String "content_type" then
|
||||
if contentType != artifactContentType then
|
||||
if let .ok resp ← IO.FS.readFile info.path |>.toBaseIO then
|
||||
msg := s!"{msg}\nunexpected response:\n{resp}"
|
||||
removeFileIfExists info.path
|
||||
| .put =>
|
||||
if let .ok size := out.getAs Nat "size_download" then
|
||||
|
|
@ -787,7 +788,7 @@ private def transferArtifacts
|
|||
match cfg.kind with
|
||||
| .get =>
|
||||
cfg.infos.forM fun info => do
|
||||
h.putStrLn s!"url = {info.url}"
|
||||
h.putStrLn s!"url = {info.url.quote}"
|
||||
h.putStrLn s!"-o {info.path.toString.quote}"
|
||||
h.flush
|
||||
return #[
|
||||
|
|
@ -798,7 +799,7 @@ private def transferArtifacts
|
|||
| .put =>
|
||||
cfg.infos.forM fun info => do
|
||||
h.putStrLn s!"-T {info.path.toString.quote}"
|
||||
h.putStrLn s!"url = {info.url}"
|
||||
h.putStrLn s!"url = {info.url.quote}"
|
||||
h.flush
|
||||
return #[
|
||||
"-Z", "-X", "PUT", "-L",
|
||||
|
|
@ -827,6 +828,13 @@ private def transferArtifacts
|
|||
if s.didError then
|
||||
failure
|
||||
|
||||
private def reservoirArtifactsUrl (service : CacheService) (scope : CacheServiceScope) : String :=
|
||||
let endpoint :=
|
||||
match scope.impl with
|
||||
| .repo scope => appendScope s!"{service.impl.apiEndpoint}/repositories" scope
|
||||
| .str scope => appendScope s!"{service.impl.apiEndpoint}/packages" scope
|
||||
s!"{endpoint}/artifacts"
|
||||
|
||||
public def downloadArtifacts
|
||||
(descrs : Array ArtifactDescr) (cache : Cache)
|
||||
(service : CacheService) (scope : CacheServiceScope) (force := false)
|
||||
|
|
@ -844,8 +852,69 @@ public def downloadArtifacts
|
|||
return s.push {url, path, descr}
|
||||
if infos.isEmpty then
|
||||
return
|
||||
let infos ← id do
|
||||
if service.isReservoir then
|
||||
-- Artifact cloud storage URLs are fetched in a single request
|
||||
-- to avoid hammering the Reservoir web host
|
||||
fetchUrls (service.reservoirArtifactsUrl scope) infos
|
||||
else return infos
|
||||
IO.FS.createDirAll cache.artifactDir
|
||||
transferArtifacts {scope, infos, kind := .get}
|
||||
where
|
||||
fetchUrls url infos := IO.FS.withTempFile fun h path => do
|
||||
let body := Json.arr <| infos.map (toJson ·.descr.hash)
|
||||
h.putStr body.compress
|
||||
h.flush
|
||||
let args := #[
|
||||
"-X", "POST", "-L", "-d", s!"@{path}",
|
||||
"--retry", "3", -- intermittent network errors can occur
|
||||
"-s", "-w", "%{stderr}%{json}\n",
|
||||
"-H", "Content-Type: application/json",
|
||||
]
|
||||
let args := Reservoir.lakeHeaders.foldl (· ++ #["-H", ·]) args
|
||||
let child ← IO.Process.spawn {
|
||||
cmd := "curl", args := args.push url
|
||||
stdout := .piped, stderr := .piped
|
||||
}
|
||||
let rc ← child.wait
|
||||
let stdout ← child.stdout.readToEnd
|
||||
let stderr ← child.stderr.readToEnd
|
||||
match Json.parse stdout >>= fromJson? with
|
||||
| .ok (resp : ReservoirResp (Array String)) =>
|
||||
match resp with
|
||||
| .data urls =>
|
||||
if h : infos.size = urls.size then
|
||||
let s := infos.size.fold (init := infos.toVector) fun i hi s =>
|
||||
s.set i {s[i] with url := urls[i]'(h ▸ hi)}
|
||||
return s.toArray
|
||||
else
|
||||
error s!"failed to fetch artifact URLs\
|
||||
\n POST {url}\
|
||||
\nIncorrect number of results: expected {infos.size}, got {urls.size}"
|
||||
| .error status message =>
|
||||
error s!"failed to fetch artifact URLs (status code: {status})\
|
||||
\n POST {url}\
|
||||
\nReservoir error: {message}"
|
||||
| .error _ =>
|
||||
match Json.parse stderr >>= fromJson? with
|
||||
| .ok (out : JsonObject) =>
|
||||
let mut msg := "failed to fetch artifact URLs"
|
||||
if let .ok code := out.getAs Nat "http_code" then
|
||||
msg := s!"{msg} (status code: {code})"
|
||||
msg := s!"{msg}\n POST {url}"
|
||||
if let .ok errMsg := out.getAs String "errormsg" then
|
||||
msg := s!"{msg}\n Transfer error: {errMsg}"
|
||||
unless stdout.isEmpty do
|
||||
msg := s!"{msg}\nstdout:\n{stdout.trimAsciiEnd}"
|
||||
logError msg
|
||||
logVerbose s!"curl JSON:\n{stderr.trimAsciiEnd}"
|
||||
| .error e =>
|
||||
logError s!"failed to fetch artifact URLs\
|
||||
\n POST {url}
|
||||
\nInvalid curl JSON: {e}; received: {stderr.trimAscii}"
|
||||
unless stdout.isEmpty do
|
||||
logWarning s!"curl produced unexpected output:\n{stdout.trimAsciiEnd}"
|
||||
error s!"curl exited with code {rc}"
|
||||
|
||||
@[deprecated "Deprecated without replacement." (since := "2026-02-27")]
|
||||
public def downloadOutputArtifacts
|
||||
|
|
|
|||
|
|
@ -103,24 +103,6 @@ public instance : FromJson RegistryPkg := ⟨RegistryPkg.fromJson?⟩
|
|||
|
||||
end RegistryPkg
|
||||
|
||||
/-- A Reservoir API response object. -/
|
||||
public inductive ReservoirResp (α : Type u)
|
||||
| data (a : α)
|
||||
| error (status : Nat) (message : String)
|
||||
|
||||
public protected def ReservoirResp.fromJson? [FromJson α] (val : Json) : Except String (ReservoirResp α) := do
|
||||
let obj ← JsonObject.fromJson? val
|
||||
if let some (err : JsonObject) ← obj.get? "error" then
|
||||
let status ← err.get "status"
|
||||
let message ← err.get "message"
|
||||
return .error status message
|
||||
else if let some (val : Json) ← obj.get? "data" then
|
||||
.data <$> fromJson? val
|
||||
else
|
||||
.data <$> fromJson? val
|
||||
|
||||
public instance [FromJson α] : FromJson (ReservoirResp α) := ⟨ReservoirResp.fromJson?⟩
|
||||
|
||||
public def Reservoir.pkgApiUrl (lakeEnv : Lake.Env) (owner pkg : String) :=
|
||||
s!"{lakeEnv.reservoirApiUrl}/packages/{uriEncode owner}/{uriEncode pkg}"
|
||||
|
||||
|
|
|
|||
|
|
@ -6,8 +6,9 @@ Authors: Mac Malone
|
|||
module
|
||||
|
||||
prelude
|
||||
public import Init.Prelude
|
||||
import Init.Data.Array.Basic
|
||||
public import Lake.Util.JsonObject
|
||||
|
||||
open Lean
|
||||
|
||||
namespace Lake
|
||||
|
||||
|
|
@ -15,3 +16,23 @@ public def Reservoir.lakeHeaders : Array String := #[
|
|||
"X-Reservoir-Api-Version:1.0.0",
|
||||
"X-Lake-Registry-Api-Version:0.1.0"
|
||||
]
|
||||
|
||||
/-- A Reservoir API response object. -/
|
||||
public inductive ReservoirResp (α : Type u)
|
||||
| data (a : α)
|
||||
| error (status : Nat) (message : String)
|
||||
|
||||
public protected def ReservoirResp.fromJson? [FromJson α] (val : Json) : Except String (ReservoirResp α) := do
|
||||
if let .ok obj := JsonObject.fromJson? val then
|
||||
if let some (err : JsonObject) ← obj.get? "error" then
|
||||
let status ← err.get "status"
|
||||
let message ← err.get "message"
|
||||
return .error status message
|
||||
else if let some (val : Json) ← obj.get? "data" then
|
||||
.data <$> fromJson? val
|
||||
else
|
||||
.data <$> fromJson? val
|
||||
else
|
||||
.data <$> fromJson? val
|
||||
|
||||
public instance [FromJson α] : FromJson (ReservoirResp α) := ⟨ReservoirResp.fromJson?⟩
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue