From ca7fee7f0ade540921fc7507969aa0288103f0c7 Mon Sep 17 00:00:00 2001 From: Wojciech Nawrocki Date: Wed, 7 Oct 2020 13:52:00 -0400 Subject: [PATCH] fix: detect header changes in watchdog --- src/Lean/Server/FileWorker.lean | 23 ++-------- src/Lean/Server/README.md | 4 +- src/Lean/Server/Utils.lean | 20 +++++++++ src/Lean/Server/Watchdog.lean | 76 ++++++++++++++------------------- 4 files changed, 57 insertions(+), 66 deletions(-) diff --git a/src/Lean/Server/FileWorker.lean b/src/Lean/Server/FileWorker.lean index 1b3b07b65c..60de6b5f0f 100644 --- a/src/Lean/Server/FileWorker.lean +++ b/src/Lean/Server/FileWorker.lean @@ -217,29 +217,14 @@ oldDoc ← getDocument; some newVersion ← pure docId.version? | throw (userError "expected version number"); if newVersion <= oldDoc.version then do throw (userError "got outdated version number") -else match changes.get? 0 with -| none => pure () -| some firstChange => do - let firstStartOff := match firstChange with - | TextDocumentContentChangeEvent.rangeChange (range : Range) _ => - oldDoc.text.lspPosToUtf8Pos range.start - | TextDocumentContentChangeEvent.fullChange _ => 0; - let accumulateChanges : TextDocumentContentChangeEvent → FileMap × String.Pos → FileMap × String.Pos := - fun change ⟨newDocText, minStartOff⟩ => - match change with - | TextDocumentContentChangeEvent.rangeChange (range : Range) (newText : String) => - let startOff := oldDoc.text.lspPosToUtf8Pos range.start; - let newDocText := replaceLspRange newDocText range newText; - let minStartOff := if startOff < minStartOff then startOff else minStartOff; - ⟨newDocText, minStartOff⟩ - | TextDocumentContentChangeEvent.fullChange (newText : String) => - ⟨newText.toFileMap, 0⟩; - let (newDocText, minStartOff) := changes.foldr accumulateChanges (oldDoc.text, firstStartOff); +else if not changes.isEmpty then do + let (newDocText, minStartOff) := foldDocumentChanges changes oldDoc.text; IO.eprintln newDocText.source; st ← read; newDoc ← monadLift $ updateDocument st.hOut docId.uri oldDoc minStartOff newVersion newDocText; setDocument newDoc +else pure () def handleCancelRequest (p : CancelParams) : ServerM Unit := do updatePendingRequests (fun pendingRequests => pendingRequests.erase p.id) @@ -287,7 +272,7 @@ partial def mainLoop : Unit → ServerM Unit st ← read; msg ← readLspMessage st.hIn; pendingRequests ← st.pendingRequestsRef.get; - let filterFinishedTasks : PendingRequestMap → RequestID → Task (Except IO.Error Unit) → IO PendingRequestMap := + let filterFinishedTasks : PendingRequestMap → RequestID → Task (Except IO.Error Unit) → IO PendingRequestMap := (fun acc id task => do f ← hasFinished task; pure $ if f then diff --git a/src/Lean/Server/README.md b/src/Lean/Server/README.md index 2261fb4f91..87cf342a99 100644 --- a/src/Lean/Server/README.md +++ b/src/Lean/Server/README.md @@ -1,6 +1,6 @@ ## Building -(Assuming `lean4` is the `elan` toolchain for stage 0.5) +(Assuming `lean4` is the `elan` toolchain for stage 1) ``` cd $LEAN4_HOME/src/Lean/Server/ leanmake +lean4 bin PKG=Watchdog LINK_OPTS=-rdynamic @@ -25,7 +25,7 @@ An easy way to get an LSP client is to build the [sample extension](https://gith args: [], options: { env: { - LEAN_PATH: "$LEAN4_HOME/build/$RELEASE_OR_DEBUG/stage0.5/lib/lean/", + LEAN_PATH: "$LEAN4_HOME/build/$RELEASE_OR_DEBUG/stage1/lib/lean/", LEAN_WORKER_PATH: "$LEAN4_HOME/src/Lean/Server/build/bin/FileWorker" } } diff --git a/src/Lean/Server/Utils.lean b/src/Lean/Server/Utils.lean index fd83283b7f..9550273bab 100644 --- a/src/Lean/Server/Utils.lean +++ b/src/Lean/Server/Utils.lean @@ -11,6 +11,26 @@ let pre := text.source.extract 0 start; let post := text.source.extract «end» text.source.bsize; (pre ++ newText ++ post).toFileMap +open Lsp + +/-- Returns the document contents with all changes applied, together with the position of the change +which lands earliest in the file. Panics if there are no changes. -/ +def foldDocumentChanges (changes : @& Array Lsp.TextDocumentContentChangeEvent) (oldText : FileMap) + : FileMap × String.Pos := +if changes.isEmpty then panic! "Lean.Server.foldDocumentChanges: empty change array" else +let accumulateChanges : TextDocumentContentChangeEvent → FileMap × String.Pos → FileMap × String.Pos := + fun change ⟨newDocText, minStartOff⟩ => + match change with + | TextDocumentContentChangeEvent.rangeChange (range : Range) (newText : String) => + let startOff := oldText.lspPosToUtf8Pos range.start; + let newDocText := replaceLspRange newDocText range newText; + let minStartOff := minStartOff.min startOff; + ⟨newDocText, minStartOff⟩ + | TextDocumentContentChangeEvent.fullChange (newText : String) => + ⟨newText.toFileMap, 0⟩; +-- NOTE: We assume Lean files are below 16 EiB. +changes.foldr accumulateChanges (oldText, 0xffffffff) + -- TODO(WN): should these instances be in Core? instance Thunk.monad : Monad Thunk := { pure := @Thunk.pure, diff --git a/src/Lean/Server/Watchdog.lean b/src/Lean/Server/Watchdog.lean index 715315616e..fc7aa6d488 100644 --- a/src/Lean/Server/Watchdog.lean +++ b/src/Lean/Server/Watchdog.lean @@ -49,7 +49,7 @@ Moreover, we don't implement the full protocol at this level: - After `initialize`, the watchdog sends the corresponding `didOpen` notification with the full current state of the file. No additional `didOpen` notifications will be forwarded to the worker process. - `$/cancelRequest` notifications are forwarded to all file workers. -- File workers are always terminated with an `exit` notification, without previously receiving a `shutdown` request. +- File workers are always terminated with an `exit` notification, without previously receiving a `shutdown` request. Similarly, they never receive a `didClose` notification. ## Watchdog <-> client communication @@ -69,7 +69,7 @@ open JsonRpc structure OpenDocument := (version : Nat) (text : FileMap) -(headerEndPos : String.Pos) +(headerAst : Syntax) def workerCfg : Process.StdioConfig := ⟨Process.Stdio.piped, Process.Stdio.piped, Process.Stdio.piped⟩ @@ -81,7 +81,7 @@ inductive WorkerEvent inductive WorkerState -- The watchdog can detect a crashed file worker in two places: When trying to send a message to the file worker and when reading a request reply. -- In the latter case, the forwarding task terminates and delegates a `crashed` event to the main task. Then, in both cases, the --- file worker has its state set to `crashed` and requests that are in-flight are errored. +-- file worker has its state set to `crashed` and requests that are in-flight are errored. -- Upon receiving the next packet for that file worker, the file worker is restarted and the packet is forwarded to it. -- If the crash was detected while writing a packet, we queue that packet until the next packet for the file worker arrives. | crashed (queuedMsgs : Array JsonRpc.Message) @@ -113,7 +113,7 @@ variables {m : Type → Type} [Monad m] [MonadIO m] def readMessage (fw : FileWorker) : m JsonRpc.Message := do msg ← readLspMessage fw.stdout; match msg with -| Message.request id method params? => +| Message.request id method params? => liftIO $ fw.pendingRequestsRef.modify (fun pendingRequests => pendingRequests.erase id) | _ => pure (); pure msg @@ -126,7 +126,7 @@ writeLspNotification fw.stdin method param def writeRequest {α : Type*} [HasToJson α] (fw : FileWorker) (id : RequestID) (method : String) (param : α) : m Unit := do writeLspRequest fw.stdin id method param; -liftIO $ fw.pendingRequestsRef.modify $ fun pendingRequests => +liftIO $ fw.pendingRequestsRef.modify $ fun pendingRequests => pendingRequests.insert id (Message.request id method (fromJson? (toJson param))) def errorPendingRequests (fw : FileWorker) (hOut : FS.Stream) (code : ErrorCode) (msg : String) : m Unit := do @@ -160,12 +160,12 @@ match fileWorkers.find? uri with def eraseFileWorker (uri : DocumentUri) : ServerM Unit := fun st => st.fileWorkersRef.modify (fun fileWorkers => fileWorkers.erase uri) -def log (msg : String) : ServerM Unit := +def log (msg : String) : ServerM Unit := fun st => st.log.putStrLn msg -- TODO: this creates a long-running Task, which should be okay with upcoming API changes. partial def fwdMsgAux (fw : FileWorker) (hOut : FS.Stream) : Unit → IO WorkerEvent -| ⟨⟩ => catch +| ⟨⟩ => catch (do msg ← fw.readMessage; -- NOTE: Writes to Lean I/O channels are atomic, so these won't trample on each other. @@ -178,12 +178,12 @@ partial def fwdMsgAux (fw : FileWorker) (hOut : FS.Stream) : Unit → IO WorkerE if exitCode = 0 then do -- worker was terminated fw.errorPendingRequests hOut ErrorCode.contentModified "File header changed or file was closed"; - pure WorkerEvent.terminated + pure WorkerEvent.terminated else do -- worker crashed fw.errorPendingRequests hOut ErrorCode.internalError "Server process of file crashed, likely due to a stack overflow in user code"; pure (WorkerEvent.crashed err)) - + /-- A Task which forwards a worker's messages into the output stream until an event which must be handled in the main watchdog thread (e.g. an I/O error) happens. -/ def fwdMsgTask (fw : FileWorker) : ServerM (Task WorkerEvent) := @@ -192,16 +192,16 @@ fun st => | Except.ok ev => ev | Except.error e => WorkerEvent.crashed e)) <$> (IO.asTask (fwdMsgAux fw st.hOut ()) Task.Priority.dedicated) -private def parsedImportsEndPos (input : String) : IO String.Pos := do +private def parseHeaderAst (input : String) : IO Syntax := do emptyEnv ← mkEmptyEnvironment; let inputCtx := Parser.mkInputContext input ""; -let (_, parserState, _) := Parser.parseHeader emptyEnv inputCtx; -pure parserState.pos +let (stx, _, _) := Parser.parseHeader emptyEnv inputCtx; +pure stx def startFileWorker (uri : DocumentUri) (version : Nat) (text : FileMap) : ServerM Unit := do st ← read; -pos ← monadLift $ parsedImportsEndPos text.source; -let doc : OpenDocument := ⟨version, text, pos⟩; +headerAst ← monadLift $ parseHeaderAst text.source; +let doc : OpenDocument := ⟨version, text, headerAst⟩; workerProc ← monadLift $ Process.spawn {workerCfg with cmd := st.workerPath}; pendingRequestsRef ← IO.mkRef (RBMap.empty : PendingRequestMap); -- the task will never access itself, so this is fine @@ -222,7 +222,7 @@ fw ← findFileWorker uri; -- it's over either way.) catch (fw.writeMessage (Message.notification "exit" none)) (fun err => pure ()); eraseFileWorker uri - + def parseParams (paramType : Type*) [HasFromJson paramType] (params : Json) : ServerM paramType := match fromJson? params with | some parsed => pure parsed @@ -236,11 +236,11 @@ def restartCrashedFileWorker (uri : DocumentUri) (fw : FileWorker) (queuedMsgs : eraseFileWorker uri; startFileWorker uri fw.doc.version fw.doc.text; newFw ← findFileWorker uri; -let tryDischargeQueuedMsgs : Array JsonRpc.Message → JsonRpc.Message → ServerM (Array JsonRpc.Message) := - fun crashedMsgs m => catch +let tryDischargeQueuedMsgs : Array JsonRpc.Message → JsonRpc.Message → ServerM (Array JsonRpc.Message) := + fun crashedMsgs m => catch (do newFw.writeMessage m; - pure crashedMsgs) + pure crashedMsgs) (fun err => pure (crashedMsgs.push m)); crashedMsgs ← queuedMsgs.foldlM tryDischargeQueuedMsgs #[]; if ¬ crashedMsgs.isEmpty then do @@ -265,35 +265,17 @@ let oldDoc := fw.doc; some newVersion ← pure doc.version? | throw (userError "Expected version number"); if newVersion <= oldDoc.version then do throw (userError "Got outdated version number") -else match changes.get? 0 with -| none => pure () -| some firstChange => do - let firstStartOff := match firstChange with - | TextDocumentContentChangeEvent.rangeChange (range : Range) _ => - oldDoc.text.lspPosToUtf8Pos range.start - | TextDocumentContentChangeEvent.fullChange _ => 0; - let accumulateChanges : TextDocumentContentChangeEvent → FileMap × String.Pos → FileMap × String.Pos := - fun change ⟨newDocText, minStartOff⟩ => - match change with - | TextDocumentContentChangeEvent.rangeChange (range : Range) (newText : String) => - let startOff := oldDoc.text.lspPosToUtf8Pos range.start; - let newDocText := replaceLspRange newDocText range newText; - let minStartOff := if startOff < minStartOff then startOff else minStartOff; - ⟨newDocText, minStartOff⟩ - | TextDocumentContentChangeEvent.fullChange (newText : String) => - ⟨newText.toFileMap, 0⟩; - let (newDocText, minStartOff) := changes.foldr accumulateChanges (oldDoc.text, firstStartOff); - log newDocText.source; - let oldHeaderEndPos := oldDoc.headerEndPos; - log $ toString minStartOff ++ "; " ++ toString oldHeaderEndPos; - if minStartOff ≤ oldHeaderEndPos then do +else if not changes.isEmpty then do + let (newDocText, _) := foldDocumentChanges changes oldDoc.text; + newHeaderAst ← liftIO $ parseHeaderAst newDocText.source; + if newHeaderAst != oldDoc.headerAst then do -- TODO(WN): we should amortize this somehow; -- when the user is typing in an import, this -- may rapidly destroy/create new processes terminateFileWorker doc.uri; startFileWorker doc.uri newVersion newDocText else do - let newDoc : OpenDocument := ⟨newVersion, newDocText, oldHeaderEndPos⟩; + let newDoc : OpenDocument := ⟨newVersion, newDocText, oldDoc.headerAst⟩; let newFw : FileWorker := {fw with doc := newDoc}; updateFileWorkers doc.uri newFw; let msg := Message.notification "textDocument/didChange" (fromJson? (toJson p)); @@ -303,6 +285,7 @@ else match changes.get? 0 with -- looks like it crashed now! catch (fw.writeNotification "textDocument/didChange" p) (fun err => handleCrash doc.uri newFw #[msg]) +else pure () def handleDidClose (p : DidCloseTextDocumentParams) : ServerM Unit := terminateFileWorker p.textDocument.uri @@ -322,7 +305,7 @@ let h := (fun α [HasFromJson α] [HasToJson α] [HasFileSource α] => do | WorkerState.crashed queuedMsgs => restartCrashedFileWorker uri fw (queuedMsgs.push msg) | WorkerState.running => do -- if this errors, the file worker crashed - catch (fw.writeRequest id method parsedParams) + catch (fw.writeRequest id method parsedParams) (fun err => handleCrash uri fw #[msg])); match method with | "textDocument/hover" => h HoverParams @@ -342,7 +325,7 @@ def shutdown : ServerM Unit := do st ← read; fileWorkers ← st.fileWorkersRef.get; fileWorkers.forM (fun id _ => terminateFileWorker id); -monadLift $ fileWorkers.forM (fun _ fw => do _ ← IO.wait fw.commTask; pure ()) +monadLift $ fileWorkers.forM (fun _ fw => do _ ← IO.wait fw.commTask; pure ()) inductive ServerEvent | WorkerEvent (uri : DocumentUri) (fw : FileWorker) (ev : WorkerEvent) @@ -391,6 +374,7 @@ partial def mainLoop : Unit → ServerM Unit | ServerEvent.WorkerEvent uri fw err => match err with | WorkerEvent.crashed e => handleCrash uri fw #[] + -- TODO: this internal error does occur | WorkerEvent.terminated => throw (userError "Internal server error: got termination event for worker that should have been removed") def mkLeanServerCapabilities : ServerCapabilities := @@ -404,13 +388,13 @@ def mkLeanServerCapabilities : ServerCapabilities := def initAndRunWatchdogAux : ServerM Unit := do st ← read; -catch +catch (do _ ← readLspNotificationAs st.hIn "initialized" InitializedParams; mainLoop (); Message.notification "exit" none ← readLspMessage st.hIn | throw (userError "Expected an exit notification"); - pure ()) + pure ()) -- something crashed, try to terminate all the file workers and all the forwarding tasks -- so that we can die in peace (fun err => do shutdown; throw err) @@ -434,6 +418,8 @@ runReader end Server end Lean +-- TODO: compile separately OR add as a flag to the `lean` binary in order to stop +-- polluting the global symbol list with a `main` (and ditto in FileWorker.lean) def main (_ : List String) : IO UInt32 := do i ← IO.getStdin; o ← IO.getStdout;