feat: add explicit process termination procedure, ensure crash handling on writes and accumulate changes of single didChange packet

This commit is contained in:
Marc Huisinga 2020-10-01 19:56:13 +02:00 committed by Sebastian Ullrich
parent a8b105e1e9
commit 07d5722b67

View file

@ -48,8 +48,8 @@ Moreover, we don't implement the full protocol at this level:
Consequently, the watchdog will not send an `initialized` notification to the worker.
- After `initialize`, the watchdog sends the corresponding `didOpen` notification with the full current state of the file.
No additional `didOpen` notifications will be forwarded to the worker process.
- File workers will never receive a `shutdown` request or an `exit` notification. File workers are always terminated
by closing their I/O channels. Similarly, they never receive a `didClose` notification.
- File workers are always terminated with an `exit` notification, without previously receiving a `shutdown` request.
Similarly, they never receive a `didClose` notification.
## Watchdog <-> client communication
@ -72,17 +72,25 @@ structure OpenDocument :=
def workerCfg : Process.StdioConfig := ⟨Process.Stdio.piped, Process.Stdio.piped, Process.Stdio.piped⟩
/-- Things that can happen in a worker. -/
-- Events that a forwarding task of a worker signals to the main task
inductive WorkerEvent
| terminated
| crashed
| ioError (e : IO.Error)
-- TODO(WN): more things will be able to happen
| crashed (e : IO.Error)
inductive WorkerState
-- The watchdog can detect a crashed file worker in two places: When trying to send a message to the file worker and when reading a request reply.
-- In the latter case, the forwarding task terminates and delegates a `crashed` event to the main task. Then, in both cases, the
-- file worker has its state set to `crashed` and requests that are in-flight are errored.
-- Upon receiving the next packet for that file worker, the file worker is restarted and the packet is forwarded to it.
-- If the crash was detected while writing a packet, we queue that packet until the next packet for the file worker arrives.
| crashed (queuedMsgs : Array Message)
| running
structure FileWorker :=
(doc : OpenDocument)
(proc : Process.Child workerCfg)
(commTask : Task WorkerEvent)
(state : WorkerState)
namespace FileWorker
@ -118,22 +126,27 @@ match fileWorkers.find? uri with
| some fw => pure fw
| none => throw (userError $ "got unknown document URI (" ++ uri ++ ")")
def eraseFileWorker (uri : DocumentUri) : ServerM Unit :=
fun st => st.fileWorkersRef.modify (fun fileWorkers => fileWorkers.erase uri)
-- NOTE: not atomic
def modifyFileWorker (uri : DocumentUri) (f : FileWorker → FileWorker) : ServerM Unit := do
fw ← findFileWorker uri;
updateFileWorkers uri (f fw)
-- TODO: this creates a long-running Task, which should be okay with upcoming API changes.
partial def fwdMsgAux (workerProc : Process.Child workerCfg) (hWrk : FS.Stream) (hOut : FS.Stream) : Unit → IO WorkerEvent
| ⟨⟩ => do
catch
(do
| ⟨⟩ => catch
(do
msg ← readLspMessage hWrk;
-- NOTE: Writes to Lean I/O channels are atomic, so these won't trample on each other.
writeLspMessage hOut msg;
fwdMsgAux ())
(fun err => do
-- NOTE: if writeLspMessage errors we will block here, but the main task will
-- quit eventually anyways if that happens
exitCode ← workerProc.wait;
-- Terminated events are always initiated by the main task, and should only
-- occur when the main task already discarded the corresponding file worker.
-- Specifically, after discarding, the main task will not listen to the
-- events of this forwarding task anymore.
pure $ if exitCode = 0 then WorkerEvent.terminated else WorkerEvent.crashed)
pure $ if exitCode = 0 then WorkerEvent.terminated else WorkerEvent.crashed err)
/-- A Task which forwards a worker's messages into the output stream until an event
which must be handled in the main watchdog thread (e.g. an I/O error) happens. -/
@ -141,7 +154,7 @@ def fwdMsgTask (workerProc : Process.Child workerCfg) (hWrk : FS.Stream) : Serve
fun st =>
(Task.map (fun either => match either with
| Except.ok ev => ev
| Except.error e => WorkerEvent.ioError e)) <$> (IO.asTask $ fwdMsgAux workerProc hWrk st.hOut ())
| Except.error e => WorkerEvent.crashed e)) <$> (IO.asTask $ fwdMsgAux workerProc hWrk st.hOut ())
private def parsedImportsEndPos (input : String) : IO String.Pos := do
emptyEnv ← mkEmptyEnvironment;
@ -158,24 +171,32 @@ writeLspRequest (FS.Stream.ofHandle workerProc.stdin) (0 : Nat) "initialize" st.
writeLspNotification (FS.Stream.ofHandle workerProc.stdin) "textDocument/didOpen"
(DidOpenTextDocumentParams.mk ⟨uri, "lean", version, text.source⟩);
commTask ← fwdMsgTask workerProc $ FS.Stream.ofHandle workerProc.stdout;
let fw : FileWorker := ⟨doc, workerProc, commTask⟩;
let fw : FileWorker := ⟨doc, workerProc, commTask, WorkerState.running⟩;
updateFileWorkers uri fw
def terminateFileWorker (uri : DocumentUri) : ServerM Unit :=
-- We're abusing the GC here. Erasing the file worker will free the stdin
-- handle of the subprocess, which will terminate as a result.
-- Upon terminating, the stdout handle is closed, which the
-- forwarding task will detect and then terminate itself.
-- TODO(MH): emit ContentChanged errors for pending requests
-- to that file worker.
-- TODO(MH): clear diagnostics?
fun st => st.fileWorkersRef.modify (fun fileWorkers => fileWorkers.erase uri)
def terminateFileWorker (uri : DocumentUri) : ServerM Unit := do
fw ← findFileWorker uri;
-- the file worker must have crashed just when we were about to terminate it!
-- that's fine - just forget about it then.
-- (on didClose we won't need the crashed file worker anymore
-- and when the header changed we'll start a new one right after
-- anyways)
catch (writeLspMessage fw.stdin (Message.notification "exit" none)) (fun err => pure ());
-- TODO(MH): error pending requests
eraseFileWorker uri
def parseParams (paramType : Type*) [HasFromJson paramType] (params : Json) : ServerM paramType :=
match fromJson? params with
| some parsed => pure parsed
| none => throw (userError "got param with wrong structure")
def handleCrash (uri : DocumentUri) (fw : FileWorker) (queuedMsgs : Array JsonRpc.Message) : ServerM Unit := pure () -- TODO(MH)
def restartFileWorker (uri : DocumentUri) (fw : FileWorker) : ServerM Unit := do
eraseFileWorker uri;
startFileWorker uri fw.doc.version fw.doc.text
-- TODO(MH): discharge queued requests
def handleDidOpen (p : DidOpenTextDocumentParams) : ServerM Unit :=
let doc := p.textDocument;
-- NOTE(WN): `toFileMap` marks line beginnings as immediately following
@ -193,38 +214,56 @@ let oldDoc := fw.doc;
some newVersion ← pure doc.version? | throw (userError "expected version number");
if newVersion <= oldDoc.version then do
throw (userError "got outdated version number")
else changes.forM $ fun change =>
match change with
| TextDocumentContentChangeEvent.rangeChange (range : Range) (newText : String) => do
let startOff := oldDoc.text.lspPosToUtf8Pos range.start;
let newDocText := replaceLspRange oldDoc.text range newText;
let oldHeaderEndPos := oldDoc.headerEndPos;
if startOff < oldHeaderEndPos then do
/- The header changed, restart worker. -/
-- TODO(WN): we should amortize this somehow;
-- when the user is typing in an import, this
-- may rapidly destroy/create new processes
terminateFileWorker doc.uri;
startFileWorker doc.uri newVersion newDocText
else
let newDoc : OpenDocument := ⟨newVersion, newDocText, oldHeaderEndPos⟩;
updateFileWorkers doc.uri { fw with doc := newDoc };
writeLspNotification fw.stdin "textDocument/didChange" p
| TextDocumentContentChangeEvent.fullChange (text : String) =>
throw (userError "TODO impl computing the diff of two sources.")
else match changes.get? 0 with
| none => pure ()
| some firstChange => do
let firstStartOff := match firstChange with
| TextDocumentContentChangeEvent.rangeChange (range : Range) _ =>
oldDoc.text.lspPosToUtf8Pos range.start
| TextDocumentContentChangeEvent.fullChange _ => 0;
let accumulateChanges : TextDocumentContentChangeEvent → FileMap × String.Pos → FileMap × String.Pos :=
fun change ⟨newDocText, minStartOff⟩ =>
match change with
| TextDocumentContentChangeEvent.rangeChange (range : Range) (newText : String) =>
let startOff := oldDoc.text.lspPosToUtf8Pos range.start;
let newDocText := replaceLspRange newDocText range newText;
let minStartOff := if startOff < minStartOff then startOff else minStartOff;
⟨newDocText, minStartOff⟩
| TextDocumentContentChangeEvent.fullChange (newText : String) =>
⟨newText.toFileMap, 0⟩;
let (newDocText, minStartOff) := changes.foldr accumulateChanges (oldDoc.text, firstStartOff);
let oldHeaderEndPos := oldDoc.headerEndPos;
if minStartOff < oldHeaderEndPos then do
-- TODO(WN): we should amortize this somehow;
-- when the user is typing in an import, this
-- may rapidly destroy/create new processes
terminateFileWorker doc.uri;
startFileWorker doc.uri newVersion newDocText
else do
let newDoc : OpenDocument := ⟨newVersion, newDocText, oldHeaderEndPos⟩;
let newFw : FileWorker := { fw with doc := newDoc };
updateFileWorkers doc.uri newFw;
match fw.state with
| WorkerState.crashed queuedRequests => restartFileWorker doc.uri newFw
| WorkerState.running =>
-- looks like it crashed now!
catch (writeLspNotification fw.stdin "textDocument/didChange" p)
(fun err => handleCrash doc.uri newFw #[Message.notification "textDocument/didChange" (fromJson? (toJson p))])
def handleDidClose (p : DidCloseTextDocumentParams) : ServerM Unit := do
st ← read;
let doc := p.textDocument;
fw ← findFileWorker doc.uri;
terminateFileWorker doc.uri;
st.fileWorkersRef.modify (fun fileWorkers => fileWorkers.erase doc.uri)
def handleDidClose (p : DidCloseTextDocumentParams) : ServerM Unit :=
terminateFileWorker p.textDocument.uri
def handleRequest (id : RequestID) (method : String) (params : Json) : ServerM Unit := do
let h := (fun α [HasFromJson α] [HasToJson α] [HasFileSource α] => do
parsedParams ← parseParams α params;
fw ← findFileWorker $ fileSource parsedParams;
writeLspRequest fw.stdin id method parsedParams);
let uri := fileSource parsedParams;
fw ← findFileWorker uri;
match fw.state with
| WorkerState.crashed queuedRequests => restartFileWorker uri fw
| WorkerState.running => do
-- if this errors, the file worker crashed
catch (writeLspRequest fw.stdin id method parsedParams)
(fun err => handleCrash uri fw #[Message.request id method (fromJson? params)]));
match method with
| "textDocument/hover" => h HoverParams
| _ => throw (userError $ "got unsupported request: " ++ method ++
@ -233,8 +272,11 @@ match method with
def handleNotification (method : String) (params : Json) : ServerM Unit := do
let forward := (fun α [HasFromJson α] [HasToJson α] [HasFileSource α] => do
parsedParams ← parseParams α params;
fw ← findFileWorker $ fileSource parsedParams;
writeLspNotification fw.stdin method parsedParams);
let uri := fileSource parsedParams;
fw ← findFileWorker uri;
-- if this errors, the file worker crashed
catch (writeLspNotification fw.stdin method parsedParams)
(fun err => handleCrash uri fw #[Message.notification method (fromJson? params)]));
let handle := (fun α [HasFromJson α] (handler : α → ServerM Unit) => parseParams α params >>= handler);
match method with
| "textDocument/didOpen" => handle DidOpenTextDocumentParams handleDidOpen
@ -289,14 +331,8 @@ partial def mainLoop : Unit → ServerM Unit
/- Restart an exited worker. -/
| ServerEvent.WorkerEvent uri fw err =>
match err with
| WorkerEvent.ioError e => throw e -- shouldn't occur
-- Cannot occur: Terminated events are only generated if the subprocess exits with exit code 0,
-- which happens only if the file worker was terminated. But terminated file workers are not queried
-- in the next waitAny cycle, and hence the HeaderChanged event will never reach the main task.
| WorkerEvent.terminated => throw (userError "internal server error: got Terminated worker event")
| WorkerEvent.crashed =>
-- TODO restart fw in such a way that no restart loops occur
mainLoop ()
| WorkerEvent.crashed e => handleCrash uri fw #[]
| WorkerEvent.terminated => throw (userError "internal server error: got termination event for worker that should have been removed")
def mkLeanServerCapabilities : ServerCapabilities :=
{ textDocumentSync? := some