From b9cc6a709fd52e54d3dfbbf32390ae1da526d64c Mon Sep 17 00:00:00 2001 From: Wojciech Nawrocki Date: Wed, 20 Jan 2021 19:13:13 -0500 Subject: [PATCH] feat: amortise fileworker restarts and change processing --- src/Lean/Server/Watchdog.lean | 114 +++++++++++++++++++++++++--------- 1 file changed, 83 insertions(+), 31 deletions(-) diff --git a/src/Lean/Server/Watchdog.lean b/src/Lean/Server/Watchdog.lean index ff5c13850f..8725c505f6 100644 --- a/src/Lean/Server/Watchdog.lean +++ b/src/Lean/Server/Watchdog.lean @@ -77,10 +77,13 @@ section Utils stderr := Process.Stdio.inherit } - /-- Events that a forwarding task of a worker signals to the main task -/ + /-- Events that worker-specific tasks signal to the main thread. -/ inductive WorkerEvent where + /- A synthetic event signalling that the grouped edits should be processed. -/ + | processGroupedEdits | terminated | crashed (e : IO.Error) + | ioError (e : IO.Error) inductive WorkerState where /- The watchdog can detect a crashed file worker in two places: When trying to send a message to the file worker @@ -102,6 +105,14 @@ section Utils end Utils section FileWorker + /-- A group of edits which will be processed at a future instant. -/ + structure GroupedEdits where + /-- When to process the edits. -/ + applyTime : Nat + params : DidChangeTextDocumentParams + /-- Signals when `applyTime` has been reached. -/ + signalTask : Task WorkerEvent + structure FileWorker where doc : OpenDocument proc : Process.Child workerCfg @@ -109,6 +120,7 @@ section FileWorker state : WorkerState -- This should not be mutated outside of namespace FileWorker, as it is used as shared mutable state pendingRequestsRef : IO.Ref PendingRequestMap + groupedEditsRef : IO.Ref (Option GroupedEdits) namespace FileWorker @@ -143,6 +155,22 @@ section FileWorker for ⟨id, _⟩ in pendingRequests do hError.writeLspResponseError { id := id, code := code, message := msg } + partial def runEditsSignalTask (fw : FileWorker) : IO (Task WorkerEvent) := do + let rec loopAction : IO WorkerEvent := do + let now ← monoMsNow + let some ge ← fw.groupedEditsRef.get + | throwServerError "Internal error: empty grouped edits reference in signal task" + if ge.applyTime ≤ now then + return WorkerEvent.processGroupedEdits + else + IO.sleep <| UInt32.ofNat <| ge.applyTime - now + loopAction + + let t ← IO.asTask loopAction + return t.map fun + | Except.ok ev => ev + | Except.error e => WorkerEvent.ioError e + end FileWorker end FileWorker @@ -205,7 +233,7 @@ section ServerM let task ← IO.asTask (loop $ ←read) Task.Priority.dedicated task.map $ fun | Except.ok ev => ev - | Except.error e => WorkerEvent.crashed e + | Except.error e => WorkerEvent.ioError e def startFileWorker (m : DocumentMeta) : ServerM Unit := do let st ← read @@ -217,15 +245,16 @@ section ServerM } let pendingRequestsRef ← IO.mkRef (RBMap.empty : PendingRequestMap) -- The task will never access itself, so this is fine - let commTaskFw : FileWorker := { + let fw : FileWorker := { doc := ⟨m, headerAst⟩ proc := workerProc commTask := Task.pure WorkerEvent.terminated state := WorkerState.running pendingRequestsRef := pendingRequestsRef + groupedEditsRef := ← IO.mkRef none } - let commTask ← forwardMessages commTaskFw - let fw : FileWorker := { commTaskFw with commTask := commTask } + let commTask ← forwardMessages fw + let fw : FileWorker := { fw with commTask := commTask } fw.stdin.writeLspRequest ⟨0, "initialize", st.initParams⟩ fw.writeNotification { method := "textDocument/didOpen" @@ -303,10 +332,11 @@ section NotificationHandling is a CR there. -/ startFileWorker ⟨doc.uri, doc.version, doc.text.toFileMap⟩ - def handleDidChange (p : DidChangeTextDocumentParams) : ServerM Unit := do - let doc := p.textDocument - let changes := p.contentChanges - let fw ← findFileWorker doc.uri + def handleEdits (fw : FileWorker) : ServerM Unit := do + let some ge ← fw.groupedEditsRef.modifyGet (·, none) + | throwServerError "Internal error: empty grouped edits reference" + let doc := ge.params.textDocument + let changes := ge.params.contentChanges let oldDoc := fw.doc let some newVersion ← pure doc.version? | throwServerError "Expected version number" @@ -318,15 +348,12 @@ section NotificationHandling let newMeta : DocumentMeta := ⟨doc.uri, newVersion, newDocText⟩ let newHeaderAst ← parseHeaderAst newDocText.source if newHeaderAst != oldDoc.headerAst then - /- TODO(WN): we should amortize this somehow - when the user is typing in an import, this - may rapidly destroy/create new processes -/ terminateFileWorker doc.uri startFileWorker newMeta else let newDoc : OpenDocument := ⟨newMeta, oldDoc.headerAst⟩ updateFileWorkers { fw with doc := newDoc } - tryWriteMessage doc.uri ⟨"textDocument/didChange", p⟩ FileWorker.writeNotification (restartCrashedWorker := true) + tryWriteMessage doc.uri ⟨"textDocument/didChange", ge.params⟩ FileWorker.writeNotification (restartCrashedWorker := true) def handleDidClose (p : DidCloseTextDocumentParams) : ServerM Unit := terminateFileWorker p.textDocument.uri @@ -368,7 +395,7 @@ section MessageHandling let handle := (fun α [FromJson α] (handler : α → ServerM Unit) => parseParams α params >>= handler) match method with | "textDocument/didOpen" => handle DidOpenTextDocumentParams handleDidOpen - | "textDocument/didChange" => handle DidChangeTextDocumentParams handleDidChange + /- NOTE: textDocument/didChange is handled in the main loop. -/ | "textDocument/didClose" => handle DidCloseTextDocumentParams handleDidClose | "$/cancelRequest" => handle CancelParams handleCancelRequest | _ => (←read).hLog.putStrLn s!"Got unsupported notification: {method}" @@ -383,33 +410,34 @@ section MainLoop discard <| IO.wait fw.commTask inductive ServerEvent where - | WorkerEvent (fw : FileWorker) (ev : WorkerEvent) - | ClientMsg (msg : JsonRpc.Message) - | ClientError (e : IO.Error) + | workerEvent (fw : FileWorker) (ev : WorkerEvent) + | clientMsg (msg : JsonRpc.Message) + | clientError (e : IO.Error) def runClientTask : ServerM (Task ServerEvent) := do let st ← read let readMsgAction : IO ServerEvent := do /- Runs asynchronously. -/ let msg ← st.hIn.readLspMessage - ServerEvent.ClientMsg msg + ServerEvent.clientMsg msg let clientTask := (←IO.asTask readMsgAction).map $ fun | Except.ok ev => ev - | Except.error e => ServerEvent.ClientError e + | Except.error e => ServerEvent.clientError e return clientTask partial def mainLoop (clientTask : Task ServerEvent) : ServerM Unit := do let st ← read let workers ← st.fileWorkersRef.get - let workerTasks := workers.fold - (fun acc _ fw => - match fw.state with - | WorkerState.running => fw.commTask.map (ServerEvent.WorkerEvent fw) :: acc - | _ => acc) - ([] : List (Task ServerEvent)) - let ev ← IO.waitAny $ clientTask :: workerTasks + let mut workerTasks := #[] + for (_, fw) in workers do + if let WorkerState.running := fw.state then + workerTasks := workerTasks.push <| fw.commTask.map (ServerEvent.workerEvent fw) + if let some ge ← fw.groupedEditsRef.get then + workerTasks := workerTasks.push <| ge.signalTask.map (ServerEvent.workerEvent fw) + + let ev ← IO.waitAny (workerTasks.push clientTask |>.toList) match ev with - | ServerEvent.ClientMsg msg => + | ServerEvent.clientMsg msg => match msg with | Message.request id "shutdown" _ => shutdown @@ -417,17 +445,41 @@ section MainLoop | Message.request id method (some params) => handleRequest id method (toJson params) mainLoop (←runClientTask) + | Message.notification "textDocument/didChange" (some params) => + let p ← parseParams DidChangeTextDocumentParams (toJson params) + let fw ← findFileWorker p.textDocument.uri + let now ← monoMsNow + /- We wait 500ms since last edit before applying the changes. -/ + let applyTime := now + 500 + let startingGroup? ← fw.groupedEditsRef.modifyGet fun + | some ge => (false, some { ge with applyTime := applyTime + params.textDocument := p.textDocument + params.contentChanges := ge.params.contentChanges ++ p.contentChanges } ) + | none => (true, some { applyTime := applyTime + params := p + /- This is overwritten just below. -/ + signalTask := Task.pure WorkerEvent.processGroupedEdits } ) + if startingGroup? then + let t ← fw.runEditsSignalTask + fw.groupedEditsRef.modify (Option.map fun ge => { ge with signalTask := t } ) + mainLoop (←runClientTask) | Message.notification method (some params) => handleNotification method (toJson params) mainLoop (←runClientTask) | _ => throwServerError "Got invalid JSON-RPC message" - | ServerEvent.ClientError e => throw e - | ServerEvent.WorkerEvent fw err => - match err with + | ServerEvent.clientError e => throw e + | ServerEvent.workerEvent fw ev => + match ev with + | WorkerEvent.processGroupedEdits => + handleEdits fw + mainLoop clientTask + | WorkerEvent.ioError e => + throwServerError s!"IO error while processing events for {fw.doc.meta.uri}: {e}" | WorkerEvent.crashed e => handleCrash fw.doc.meta.uri #[] mainLoop clientTask - | WorkerEvent.terminated => throwServerError "Internal server error: got termination event for worker that should have been removed" + | WorkerEvent.terminated => + throwServerError "Internal server error: got termination event for worker that should have been removed" end MainLoop def mkLeanServerCapabilities : ServerCapabilities := {