fix: detect header changes in watchdog

This commit is contained in:
Wojciech Nawrocki 2020-10-07 13:52:00 -04:00 committed by Sebastian Ullrich
parent 91a8700858
commit ca7fee7f0a
4 changed files with 57 additions and 66 deletions

View file

@ -217,29 +217,14 @@ oldDoc ← getDocument;
some newVersion ← pure docId.version? | throw (userError "expected version number");
if newVersion <= oldDoc.version then do
throw (userError "got outdated version number")
else match changes.get? 0 with
| none => pure ()
| some firstChange => do
let firstStartOff := match firstChange with
| TextDocumentContentChangeEvent.rangeChange (range : Range) _ =>
oldDoc.text.lspPosToUtf8Pos range.start
| TextDocumentContentChangeEvent.fullChange _ => 0;
let accumulateChanges : TextDocumentContentChangeEvent → FileMap × String.Pos → FileMap × String.Pos :=
fun change ⟨newDocText, minStartOff⟩ =>
match change with
| TextDocumentContentChangeEvent.rangeChange (range : Range) (newText : String) =>
let startOff := oldDoc.text.lspPosToUtf8Pos range.start;
let newDocText := replaceLspRange newDocText range newText;
let minStartOff := if startOff < minStartOff then startOff else minStartOff;
⟨newDocText, minStartOff⟩
| TextDocumentContentChangeEvent.fullChange (newText : String) =>
⟨newText.toFileMap, 0⟩;
let (newDocText, minStartOff) := changes.foldr accumulateChanges (oldDoc.text, firstStartOff);
else if not changes.isEmpty then do
let (newDocText, minStartOff) := foldDocumentChanges changes oldDoc.text;
IO.eprintln newDocText.source;
st ← read;
newDoc ← monadLift $
updateDocument st.hOut docId.uri oldDoc minStartOff newVersion newDocText;
setDocument newDoc
else pure ()
def handleCancelRequest (p : CancelParams) : ServerM Unit := do
updatePendingRequests (fun pendingRequests => pendingRequests.erase p.id)
@ -287,7 +272,7 @@ partial def mainLoop : Unit → ServerM Unit
st ← read;
msg ← readLspMessage st.hIn;
pendingRequests ← st.pendingRequestsRef.get;
let filterFinishedTasks : PendingRequestMap → RequestID → Task (Except IO.Error Unit) → IO PendingRequestMap :=
let filterFinishedTasks : PendingRequestMap → RequestID → Task (Except IO.Error Unit) → IO PendingRequestMap :=
(fun acc id task => do
f ← hasFinished task;
pure $ if f then

View file

@ -1,6 +1,6 @@
## Building
(Assuming `lean4` is the `elan` toolchain for stage 0.5)
(Assuming `lean4` is the `elan` toolchain for stage 1)
```
cd $LEAN4_HOME/src/Lean/Server/
leanmake +lean4 bin PKG=Watchdog LINK_OPTS=-rdynamic
@ -25,7 +25,7 @@ An easy way to get an LSP client is to build the [sample extension](https://gith
args: [],
options: {
env: {
LEAN_PATH: "$LEAN4_HOME/build/$RELEASE_OR_DEBUG/stage0.5/lib/lean/",
LEAN_PATH: "$LEAN4_HOME/build/$RELEASE_OR_DEBUG/stage1/lib/lean/",
LEAN_WORKER_PATH: "$LEAN4_HOME/src/Lean/Server/build/bin/FileWorker"
}
}

View file

@ -11,6 +11,26 @@ let pre := text.source.extract 0 start;
let post := text.source.extract «end» text.source.bsize;
(pre ++ newText ++ post).toFileMap
open Lsp
/-- Returns the document contents with all changes applied, together with the position of the change
which lands earliest in the file. Panics if there are no changes. -/
def foldDocumentChanges (changes : @& Array Lsp.TextDocumentContentChangeEvent) (oldText : FileMap)
: FileMap × String.Pos :=
if changes.isEmpty then panic! "Lean.Server.foldDocumentChanges: empty change array" else
let accumulateChanges : TextDocumentContentChangeEvent → FileMap × String.Pos → FileMap × String.Pos :=
fun change ⟨newDocText, minStartOff⟩ =>
match change with
| TextDocumentContentChangeEvent.rangeChange (range : Range) (newText : String) =>
let startOff := oldText.lspPosToUtf8Pos range.start;
let newDocText := replaceLspRange newDocText range newText;
let minStartOff := minStartOff.min startOff;
⟨newDocText, minStartOff⟩
| TextDocumentContentChangeEvent.fullChange (newText : String) =>
⟨newText.toFileMap, 0⟩;
-- NOTE: We assume Lean files are below 16 EiB.
changes.foldr accumulateChanges (oldText, 0xffffffff)
-- TODO(WN): should these instances be in Core?
instance Thunk.monad : Monad Thunk :=
{ pure := @Thunk.pure,

View file

@ -49,7 +49,7 @@ Moreover, we don't implement the full protocol at this level:
- After `initialize`, the watchdog sends the corresponding `didOpen` notification with the full current state of the file.
No additional `didOpen` notifications will be forwarded to the worker process.
- `$/cancelRequest` notifications are forwarded to all file workers.
- File workers are always terminated with an `exit` notification, without previously receiving a `shutdown` request.
- File workers are always terminated with an `exit` notification, without previously receiving a `shutdown` request.
Similarly, they never receive a `didClose` notification.
## Watchdog <-> client communication
@ -69,7 +69,7 @@ open JsonRpc
structure OpenDocument :=
(version : Nat)
(text : FileMap)
(headerEndPos : String.Pos)
(headerAst : Syntax)
def workerCfg : Process.StdioConfig := ⟨Process.Stdio.piped, Process.Stdio.piped, Process.Stdio.piped⟩
@ -81,7 +81,7 @@ inductive WorkerEvent
inductive WorkerState
-- The watchdog can detect a crashed file worker in two places: When trying to send a message to the file worker and when reading a request reply.
-- In the latter case, the forwarding task terminates and delegates a `crashed` event to the main task. Then, in both cases, the
-- file worker has its state set to `crashed` and requests that are in-flight are errored.
-- file worker has its state set to `crashed` and requests that are in-flight are errored.
-- Upon receiving the next packet for that file worker, the file worker is restarted and the packet is forwarded to it.
-- If the crash was detected while writing a packet, we queue that packet until the next packet for the file worker arrives.
| crashed (queuedMsgs : Array JsonRpc.Message)
@ -113,7 +113,7 @@ variables {m : Type → Type} [Monad m] [MonadIO m]
def readMessage (fw : FileWorker) : m JsonRpc.Message := do
msg ← readLspMessage fw.stdout;
match msg with
| Message.request id method params? =>
| Message.request id method params? =>
liftIO $ fw.pendingRequestsRef.modify (fun pendingRequests => pendingRequests.erase id)
| _ => pure ();
pure msg
@ -126,7 +126,7 @@ writeLspNotification fw.stdin method param
def writeRequest {α : Type*} [HasToJson α] (fw : FileWorker) (id : RequestID) (method : String) (param : α) : m Unit := do
writeLspRequest fw.stdin id method param;
liftIO $ fw.pendingRequestsRef.modify $ fun pendingRequests =>
liftIO $ fw.pendingRequestsRef.modify $ fun pendingRequests =>
pendingRequests.insert id (Message.request id method (fromJson? (toJson param)))
def errorPendingRequests (fw : FileWorker) (hOut : FS.Stream) (code : ErrorCode) (msg : String) : m Unit := do
@ -160,12 +160,12 @@ match fileWorkers.find? uri with
def eraseFileWorker (uri : DocumentUri) : ServerM Unit :=
fun st => st.fileWorkersRef.modify (fun fileWorkers => fileWorkers.erase uri)
def log (msg : String) : ServerM Unit :=
def log (msg : String) : ServerM Unit :=
fun st => st.log.putStrLn msg
-- TODO: this creates a long-running Task, which should be okay with upcoming API changes.
partial def fwdMsgAux (fw : FileWorker) (hOut : FS.Stream) : Unit → IO WorkerEvent
| ⟨⟩ => catch
| ⟨⟩ => catch
(do
msg ← fw.readMessage;
-- NOTE: Writes to Lean I/O channels are atomic, so these won't trample on each other.
@ -178,12 +178,12 @@ partial def fwdMsgAux (fw : FileWorker) (hOut : FS.Stream) : Unit → IO WorkerE
if exitCode = 0 then do
-- worker was terminated
fw.errorPendingRequests hOut ErrorCode.contentModified "File header changed or file was closed";
pure WorkerEvent.terminated
pure WorkerEvent.terminated
else do
-- worker crashed
fw.errorPendingRequests hOut ErrorCode.internalError "Server process of file crashed, likely due to a stack overflow in user code";
pure (WorkerEvent.crashed err))
/-- A Task which forwards a worker's messages into the output stream until an event
which must be handled in the main watchdog thread (e.g. an I/O error) happens. -/
def fwdMsgTask (fw : FileWorker) : ServerM (Task WorkerEvent) :=
@ -192,16 +192,16 @@ fun st =>
| Except.ok ev => ev
| Except.error e => WorkerEvent.crashed e)) <$> (IO.asTask (fwdMsgAux fw st.hOut ()) Task.Priority.dedicated)
private def parsedImportsEndPos (input : String) : IO String.Pos := do
private def parseHeaderAst (input : String) : IO Syntax := do
emptyEnv ← mkEmptyEnvironment;
let inputCtx := Parser.mkInputContext input "<input>";
let (_, parserState, _) := Parser.parseHeader emptyEnv inputCtx;
pure parserState.pos
let (stx, _, _) := Parser.parseHeader emptyEnv inputCtx;
pure stx
def startFileWorker (uri : DocumentUri) (version : Nat) (text : FileMap) : ServerM Unit := do
st ← read;
pos ← monadLift $ parsedImportsEndPos text.source;
let doc : OpenDocument := ⟨version, text, pos⟩;
headerAst ← monadLift $ parseHeaderAst text.source;
let doc : OpenDocument := ⟨version, text, headerAst⟩;
workerProc ← monadLift $ Process.spawn {workerCfg with cmd := st.workerPath};
pendingRequestsRef ← IO.mkRef (RBMap.empty : PendingRequestMap);
-- the task will never access itself, so this is fine
@ -222,7 +222,7 @@ fw ← findFileWorker uri;
-- it's over either way.)
catch (fw.writeMessage (Message.notification "exit" none)) (fun err => pure ());
eraseFileWorker uri
def parseParams (paramType : Type*) [HasFromJson paramType] (params : Json) : ServerM paramType :=
match fromJson? params with
| some parsed => pure parsed
@ -236,11 +236,11 @@ def restartCrashedFileWorker (uri : DocumentUri) (fw : FileWorker) (queuedMsgs :
eraseFileWorker uri;
startFileWorker uri fw.doc.version fw.doc.text;
newFw ← findFileWorker uri;
let tryDischargeQueuedMsgs : Array JsonRpc.Message → JsonRpc.Message → ServerM (Array JsonRpc.Message) :=
fun crashedMsgs m => catch
let tryDischargeQueuedMsgs : Array JsonRpc.Message → JsonRpc.Message → ServerM (Array JsonRpc.Message) :=
fun crashedMsgs m => catch
(do
newFw.writeMessage m;
pure crashedMsgs)
pure crashedMsgs)
(fun err => pure (crashedMsgs.push m));
crashedMsgs ← queuedMsgs.foldlM tryDischargeQueuedMsgs #[];
if ¬ crashedMsgs.isEmpty then do
@ -265,35 +265,17 @@ let oldDoc := fw.doc;
some newVersion ← pure doc.version? | throw (userError "Expected version number");
if newVersion <= oldDoc.version then do
throw (userError "Got outdated version number")
else match changes.get? 0 with
| none => pure ()
| some firstChange => do
let firstStartOff := match firstChange with
| TextDocumentContentChangeEvent.rangeChange (range : Range) _ =>
oldDoc.text.lspPosToUtf8Pos range.start
| TextDocumentContentChangeEvent.fullChange _ => 0;
let accumulateChanges : TextDocumentContentChangeEvent → FileMap × String.Pos → FileMap × String.Pos :=
fun change ⟨newDocText, minStartOff⟩ =>
match change with
| TextDocumentContentChangeEvent.rangeChange (range : Range) (newText : String) =>
let startOff := oldDoc.text.lspPosToUtf8Pos range.start;
let newDocText := replaceLspRange newDocText range newText;
let minStartOff := if startOff < minStartOff then startOff else minStartOff;
⟨newDocText, minStartOff⟩
| TextDocumentContentChangeEvent.fullChange (newText : String) =>
⟨newText.toFileMap, 0⟩;
let (newDocText, minStartOff) := changes.foldr accumulateChanges (oldDoc.text, firstStartOff);
log newDocText.source;
let oldHeaderEndPos := oldDoc.headerEndPos;
log $ toString minStartOff ++ "; " ++ toString oldHeaderEndPos;
if minStartOff ≤ oldHeaderEndPos then do
else if not changes.isEmpty then do
let (newDocText, _) := foldDocumentChanges changes oldDoc.text;
newHeaderAst ← liftIO $ parseHeaderAst newDocText.source;
if newHeaderAst != oldDoc.headerAst then do
-- TODO(WN): we should amortize this somehow;
-- when the user is typing in an import, this
-- may rapidly destroy/create new processes
terminateFileWorker doc.uri;
startFileWorker doc.uri newVersion newDocText
else do
let newDoc : OpenDocument := ⟨newVersion, newDocText, oldHeaderEndPos⟩;
let newDoc : OpenDocument := ⟨newVersion, newDocText, oldDoc.headerAst⟩;
let newFw : FileWorker := {fw with doc := newDoc};
updateFileWorkers doc.uri newFw;
let msg := Message.notification "textDocument/didChange" (fromJson? (toJson p));
@ -303,6 +285,7 @@ else match changes.get? 0 with
-- looks like it crashed now!
catch (fw.writeNotification "textDocument/didChange" p)
(fun err => handleCrash doc.uri newFw #[msg])
else pure ()
def handleDidClose (p : DidCloseTextDocumentParams) : ServerM Unit :=
terminateFileWorker p.textDocument.uri
@ -322,7 +305,7 @@ let h := (fun α [HasFromJson α] [HasToJson α] [HasFileSource α] => do
| WorkerState.crashed queuedMsgs => restartCrashedFileWorker uri fw (queuedMsgs.push msg)
| WorkerState.running => do
-- if this errors, the file worker crashed
catch (fw.writeRequest id method parsedParams)
catch (fw.writeRequest id method parsedParams)
(fun err => handleCrash uri fw #[msg]));
match method with
| "textDocument/hover" => h HoverParams
@ -342,7 +325,7 @@ def shutdown : ServerM Unit := do
st ← read;
fileWorkers ← st.fileWorkersRef.get;
fileWorkers.forM (fun id _ => terminateFileWorker id);
monadLift $ fileWorkers.forM (fun _ fw => do _ ← IO.wait fw.commTask; pure ())
monadLift $ fileWorkers.forM (fun _ fw => do _ ← IO.wait fw.commTask; pure ())
inductive ServerEvent
| WorkerEvent (uri : DocumentUri) (fw : FileWorker) (ev : WorkerEvent)
@ -391,6 +374,7 @@ partial def mainLoop : Unit → ServerM Unit
| ServerEvent.WorkerEvent uri fw err =>
match err with
| WorkerEvent.crashed e => handleCrash uri fw #[]
-- TODO: this internal error does occur
| WorkerEvent.terminated => throw (userError "Internal server error: got termination event for worker that should have been removed")
def mkLeanServerCapabilities : ServerCapabilities :=
@ -404,13 +388,13 @@ def mkLeanServerCapabilities : ServerCapabilities :=
def initAndRunWatchdogAux : ServerM Unit := do
st ← read;
catch
catch
(do
_ ← readLspNotificationAs st.hIn "initialized" InitializedParams;
mainLoop ();
Message.notification "exit" none ← readLspMessage st.hIn
| throw (userError "Expected an exit notification");
pure ())
pure ())
-- something crashed, try to terminate all the file workers and all the forwarding tasks
-- so that we can die in peace
(fun err => do shutdown; throw err)
@ -434,6 +418,8 @@ runReader
end Server
end Lean
-- TODO: compile separately OR add as a flag to the `lean` binary in order to stop
-- polluting the global symbol list with a `main` (and ditto in FileWorker.lean)
def main (_ : List String) : IO UInt32 := do
i ← IO.getStdin;
o ← IO.getStdout;