diff --git a/src/Lean/Server/Watchdog.lean b/src/Lean/Server/Watchdog.lean index 9bc5f2f921..ad157aa67c 100644 --- a/src/Lean/Server/Watchdog.lean +++ b/src/Lean/Server/Watchdog.lean @@ -48,8 +48,8 @@ Moreover, we don't implement the full protocol at this level: Consequently, the watchdog will not send an `initialized` notification to the worker. - After `initialize`, the watchdog sends the corresponding `didOpen` notification with the full current state of the file. No additional `didOpen` notifications will be forwarded to the worker process. -- File workers will never receive a `shutdown` request or an `exit` notification. File workers are always terminated - by closing their I/O channels. Similarly, they never receive a `didClose` notification. +- File workers are always terminated with an `exit` notification, without previously receiving a `shutdown` request. + Similarly, they never receive a `didClose` notification. ## Watchdog <-> client communication @@ -72,17 +72,25 @@ structure OpenDocument := def workerCfg : Process.StdioConfig := ⟨Process.Stdio.piped, Process.Stdio.piped, Process.Stdio.piped⟩ -/-- Things that can happen in a worker. -/ +-- Events that a forwarding task of a worker signals to the main task inductive WorkerEvent | terminated -| crashed -| ioError (e : IO.Error) --- TODO(WN): more things will be able to happen +| crashed (e : IO.Error) + +inductive WorkerState +-- The watchdog can detect a crashed file worker in two places: When trying to send a message to the file worker and when reading a request reply. +-- In the latter case, the forwarding task terminates and delegates a `crashed` event to the main task. Then, in both cases, the +-- file worker has its state set to `crashed` and requests that are in-flight are errored. +-- Upon receiving the next packet for that file worker, the file worker is restarted and the packet is forwarded to it. +-- If the crash was detected while writing a packet, we queue that packet until the next packet for the file worker arrives. +| crashed (queuedMsgs : Array Message) +| running structure FileWorker := (doc : OpenDocument) (proc : Process.Child workerCfg) (commTask : Task WorkerEvent) +(state : WorkerState) namespace FileWorker @@ -118,22 +126,27 @@ match fileWorkers.find? uri with | some fw => pure fw | none => throw (userError $ "got unknown document URI (" ++ uri ++ ")") +def eraseFileWorker (uri : DocumentUri) : ServerM Unit := +fun st => st.fileWorkersRef.modify (fun fileWorkers => fileWorkers.erase uri) + +-- NOTE: not atomic +def modifyFileWorker (uri : DocumentUri) (f : FileWorker → FileWorker) : ServerM Unit := do +fw ← findFileWorker uri; +updateFileWorkers uri (f fw) + -- TODO: this creates a long-running Task, which should be okay with upcoming API changes. partial def fwdMsgAux (workerProc : Process.Child workerCfg) (hWrk : FS.Stream) (hOut : FS.Stream) : Unit → IO WorkerEvent -| ⟨⟩ => do - catch - (do +| ⟨⟩ => catch + (do msg ← readLspMessage hWrk; -- NOTE: Writes to Lean I/O channels are atomic, so these won't trample on each other. writeLspMessage hOut msg; fwdMsgAux ()) (fun err => do + -- NOTE: if writeLspMessage errors we will block here, but the main task will + -- quit eventually anyways if that happens exitCode ← workerProc.wait; - -- Terminated events are always initiated by the main task, and should only - -- occur when the main task already discarded the corresponding file worker. - -- Specifically, after discarding, the main task will not listen to the - -- events of this forwarding task anymore. - pure $ if exitCode = 0 then WorkerEvent.terminated else WorkerEvent.crashed) + pure $ if exitCode = 0 then WorkerEvent.terminated else WorkerEvent.crashed err) /-- A Task which forwards a worker's messages into the output stream until an event which must be handled in the main watchdog thread (e.g. an I/O error) happens. -/ @@ -141,7 +154,7 @@ def fwdMsgTask (workerProc : Process.Child workerCfg) (hWrk : FS.Stream) : Serve fun st => (Task.map (fun either => match either with | Except.ok ev => ev - | Except.error e => WorkerEvent.ioError e)) <$> (IO.asTask $ fwdMsgAux workerProc hWrk st.hOut ()) + | Except.error e => WorkerEvent.crashed e)) <$> (IO.asTask $ fwdMsgAux workerProc hWrk st.hOut ()) private def parsedImportsEndPos (input : String) : IO String.Pos := do emptyEnv ← mkEmptyEnvironment; @@ -158,24 +171,32 @@ writeLspRequest (FS.Stream.ofHandle workerProc.stdin) (0 : Nat) "initialize" st. writeLspNotification (FS.Stream.ofHandle workerProc.stdin) "textDocument/didOpen" (DidOpenTextDocumentParams.mk ⟨uri, "lean", version, text.source⟩); commTask ← fwdMsgTask workerProc $ FS.Stream.ofHandle workerProc.stdout; -let fw : FileWorker := ⟨doc, workerProc, commTask⟩; +let fw : FileWorker := ⟨doc, workerProc, commTask, WorkerState.running⟩; updateFileWorkers uri fw -def terminateFileWorker (uri : DocumentUri) : ServerM Unit := --- We're abusing the GC here. Erasing the file worker will free the stdin --- handle of the subprocess, which will terminate as a result. --- Upon terminating, the stdout handle is closed, which the --- forwarding task will detect and then terminate itself. --- TODO(MH): emit ContentChanged errors for pending requests --- to that file worker. --- TODO(MH): clear diagnostics? -fun st => st.fileWorkersRef.modify (fun fileWorkers => fileWorkers.erase uri) +def terminateFileWorker (uri : DocumentUri) : ServerM Unit := do +fw ← findFileWorker uri; +-- the file worker must have crashed just when we were about to terminate it! +-- that's fine - just forget about it then. +-- (on didClose we won't need the crashed file worker anymore +-- and when the header changed we'll start a new one right after +-- anyways) +catch (writeLspMessage fw.stdin (Message.notification "exit" none)) (fun err => pure ()); +-- TODO(MH): error pending requests +eraseFileWorker uri def parseParams (paramType : Type*) [HasFromJson paramType] (params : Json) : ServerM paramType := match fromJson? params with | some parsed => pure parsed | none => throw (userError "got param with wrong structure") +def handleCrash (uri : DocumentUri) (fw : FileWorker) (queuedMsgs : Array JsonRpc.Message) : ServerM Unit := pure () -- TODO(MH) + +def restartFileWorker (uri : DocumentUri) (fw : FileWorker) : ServerM Unit := do +eraseFileWorker uri; +startFileWorker uri fw.doc.version fw.doc.text +-- TODO(MH): discharge queued requests + def handleDidOpen (p : DidOpenTextDocumentParams) : ServerM Unit := let doc := p.textDocument; -- NOTE(WN): `toFileMap` marks line beginnings as immediately following @@ -193,38 +214,56 @@ let oldDoc := fw.doc; some newVersion ← pure doc.version? | throw (userError "expected version number"); if newVersion <= oldDoc.version then do throw (userError "got outdated version number") -else changes.forM $ fun change => - match change with - | TextDocumentContentChangeEvent.rangeChange (range : Range) (newText : String) => do - let startOff := oldDoc.text.lspPosToUtf8Pos range.start; - let newDocText := replaceLspRange oldDoc.text range newText; - let oldHeaderEndPos := oldDoc.headerEndPos; - if startOff < oldHeaderEndPos then do - /- The header changed, restart worker. -/ - -- TODO(WN): we should amortize this somehow; - -- when the user is typing in an import, this - -- may rapidly destroy/create new processes - terminateFileWorker doc.uri; - startFileWorker doc.uri newVersion newDocText - else - let newDoc : OpenDocument := ⟨newVersion, newDocText, oldHeaderEndPos⟩; - updateFileWorkers doc.uri { fw with doc := newDoc }; - writeLspNotification fw.stdin "textDocument/didChange" p - | TextDocumentContentChangeEvent.fullChange (text : String) => - throw (userError "TODO impl computing the diff of two sources.") +else match changes.get? 0 with +| none => pure () +| some firstChange => do + let firstStartOff := match firstChange with + | TextDocumentContentChangeEvent.rangeChange (range : Range) _ => + oldDoc.text.lspPosToUtf8Pos range.start + | TextDocumentContentChangeEvent.fullChange _ => 0; + let accumulateChanges : TextDocumentContentChangeEvent → FileMap × String.Pos → FileMap × String.Pos := + fun change ⟨newDocText, minStartOff⟩ => + match change with + | TextDocumentContentChangeEvent.rangeChange (range : Range) (newText : String) => + let startOff := oldDoc.text.lspPosToUtf8Pos range.start; + let newDocText := replaceLspRange newDocText range newText; + let minStartOff := if startOff < minStartOff then startOff else minStartOff; + ⟨newDocText, minStartOff⟩ + | TextDocumentContentChangeEvent.fullChange (newText : String) => + ⟨newText.toFileMap, 0⟩; + let (newDocText, minStartOff) := changes.foldr accumulateChanges (oldDoc.text, firstStartOff); + let oldHeaderEndPos := oldDoc.headerEndPos; + if minStartOff < oldHeaderEndPos then do + -- TODO(WN): we should amortize this somehow; + -- when the user is typing in an import, this + -- may rapidly destroy/create new processes + terminateFileWorker doc.uri; + startFileWorker doc.uri newVersion newDocText + else do + let newDoc : OpenDocument := ⟨newVersion, newDocText, oldHeaderEndPos⟩; + let newFw : FileWorker := { fw with doc := newDoc }; + updateFileWorkers doc.uri newFw; + match fw.state with + | WorkerState.crashed queuedRequests => restartFileWorker doc.uri newFw + | WorkerState.running => + -- looks like it crashed now! + catch (writeLspNotification fw.stdin "textDocument/didChange" p) + (fun err => handleCrash doc.uri newFw #[Message.notification "textDocument/didChange" (fromJson? (toJson p))]) -def handleDidClose (p : DidCloseTextDocumentParams) : ServerM Unit := do -st ← read; -let doc := p.textDocument; -fw ← findFileWorker doc.uri; -terminateFileWorker doc.uri; -st.fileWorkersRef.modify (fun fileWorkers => fileWorkers.erase doc.uri) +def handleDidClose (p : DidCloseTextDocumentParams) : ServerM Unit := +terminateFileWorker p.textDocument.uri def handleRequest (id : RequestID) (method : String) (params : Json) : ServerM Unit := do let h := (fun α [HasFromJson α] [HasToJson α] [HasFileSource α] => do parsedParams ← parseParams α params; - fw ← findFileWorker $ fileSource parsedParams; - writeLspRequest fw.stdin id method parsedParams); + let uri := fileSource parsedParams; + fw ← findFileWorker uri; + match fw.state with + | WorkerState.crashed queuedRequests => restartFileWorker uri fw + | WorkerState.running => do + -- if this errors, the file worker crashed + catch (writeLspRequest fw.stdin id method parsedParams) + (fun err => handleCrash uri fw #[Message.request id method (fromJson? params)])); match method with | "textDocument/hover" => h HoverParams | _ => throw (userError $ "got unsupported request: " ++ method ++ @@ -233,8 +272,11 @@ match method with def handleNotification (method : String) (params : Json) : ServerM Unit := do let forward := (fun α [HasFromJson α] [HasToJson α] [HasFileSource α] => do parsedParams ← parseParams α params; - fw ← findFileWorker $ fileSource parsedParams; - writeLspNotification fw.stdin method parsedParams); + let uri := fileSource parsedParams; + fw ← findFileWorker uri; + -- if this errors, the file worker crashed + catch (writeLspNotification fw.stdin method parsedParams) + (fun err => handleCrash uri fw #[Message.notification method (fromJson? params)])); let handle := (fun α [HasFromJson α] (handler : α → ServerM Unit) => parseParams α params >>= handler); match method with | "textDocument/didOpen" => handle DidOpenTextDocumentParams handleDidOpen @@ -289,14 +331,8 @@ partial def mainLoop : Unit → ServerM Unit /- Restart an exited worker. -/ | ServerEvent.WorkerEvent uri fw err => match err with - | WorkerEvent.ioError e => throw e -- shouldn't occur - -- Cannot occur: Terminated events are only generated if the subprocess exits with exit code 0, - -- which happens only if the file worker was terminated. But terminated file workers are not queried - -- in the next waitAny cycle, and hence the HeaderChanged event will never reach the main task. - | WorkerEvent.terminated => throw (userError "internal server error: got Terminated worker event") - | WorkerEvent.crashed => - -- TODO restart fw in such a way that no restart loops occur - mainLoop () + | WorkerEvent.crashed e => handleCrash uri fw #[] + | WorkerEvent.terminated => throw (userError "internal server error: got termination event for worker that should have been removed") def mkLeanServerCapabilities : ServerCapabilities := { textDocumentSync? := some