feat: amortise fileworker restarts and change processing

This commit is contained in:
Wojciech Nawrocki 2021-01-20 19:13:13 -05:00 committed by Sebastian Ullrich
parent d5ba10a316
commit b9cc6a709f

View file

@ -77,10 +77,13 @@ section Utils
stderr := Process.Stdio.inherit
}
/-- Events that a forwarding task of a worker signals to the main task -/
/-- Events that worker-specific tasks signal to the main thread. -/
inductive WorkerEvent where
/- A synthetic event signalling that the grouped edits should be processed. -/
| processGroupedEdits
| terminated
| crashed (e : IO.Error)
| ioError (e : IO.Error)
inductive WorkerState where
/- The watchdog can detect a crashed file worker in two places: When trying to send a message to the file worker
@ -102,6 +105,14 @@ section Utils
end Utils
section FileWorker
/-- A group of edits which will be processed at a future instant. -/
structure GroupedEdits where
/-- When to process the edits. -/
applyTime : Nat
params : DidChangeTextDocumentParams
/-- Signals when `applyTime` has been reached. -/
signalTask : Task WorkerEvent
structure FileWorker where
doc : OpenDocument
proc : Process.Child workerCfg
@ -109,6 +120,7 @@ section FileWorker
state : WorkerState
-- This should not be mutated outside of namespace FileWorker, as it is used as shared mutable state
pendingRequestsRef : IO.Ref PendingRequestMap
groupedEditsRef : IO.Ref (Option GroupedEdits)
namespace FileWorker
@ -143,6 +155,22 @@ section FileWorker
for ⟨id, _⟩ in pendingRequests do
hError.writeLspResponseError { id := id, code := code, message := msg }
partial def runEditsSignalTask (fw : FileWorker) : IO (Task WorkerEvent) := do
let rec loopAction : IO WorkerEvent := do
let now ← monoMsNow
let some ge ← fw.groupedEditsRef.get
| throwServerError "Internal error: empty grouped edits reference in signal task"
if ge.applyTime ≤ now then
return WorkerEvent.processGroupedEdits
else
IO.sleep <| UInt32.ofNat <| ge.applyTime - now
loopAction
let t ← IO.asTask loopAction
return t.map fun
| Except.ok ev => ev
| Except.error e => WorkerEvent.ioError e
end FileWorker
end FileWorker
@ -205,7 +233,7 @@ section ServerM
let task ← IO.asTask (loop $ ←read) Task.Priority.dedicated
task.map $ fun
| Except.ok ev => ev
| Except.error e => WorkerEvent.crashed e
| Except.error e => WorkerEvent.ioError e
def startFileWorker (m : DocumentMeta) : ServerM Unit := do
let st ← read
@ -217,15 +245,16 @@ section ServerM
}
let pendingRequestsRef ← IO.mkRef (RBMap.empty : PendingRequestMap)
-- The task will never access itself, so this is fine
let commTaskFw : FileWorker := {
let fw : FileWorker := {
doc := ⟨m, headerAst⟩
proc := workerProc
commTask := Task.pure WorkerEvent.terminated
state := WorkerState.running
pendingRequestsRef := pendingRequestsRef
groupedEditsRef := ← IO.mkRef none
}
let commTask ← forwardMessages commTaskFw
let fw : FileWorker := { commTaskFw with commTask := commTask }
let commTask ← forwardMessages fw
let fw : FileWorker := { fw with commTask := commTask }
fw.stdin.writeLspRequest ⟨0, "initialize", st.initParams⟩
fw.writeNotification {
method := "textDocument/didOpen"
@ -303,10 +332,11 @@ section NotificationHandling
is a CR there. -/
startFileWorker ⟨doc.uri, doc.version, doc.text.toFileMap⟩
def handleDidChange (p : DidChangeTextDocumentParams) : ServerM Unit := do
let doc := p.textDocument
let changes := p.contentChanges
let fw ← findFileWorker doc.uri
def handleEdits (fw : FileWorker) : ServerM Unit := do
let some ge ← fw.groupedEditsRef.modifyGet (·, none)
| throwServerError "Internal error: empty grouped edits reference"
let doc := ge.params.textDocument
let changes := ge.params.contentChanges
let oldDoc := fw.doc
let some newVersion ← pure doc.version?
| throwServerError "Expected version number"
@ -318,15 +348,12 @@ section NotificationHandling
let newMeta : DocumentMeta := ⟨doc.uri, newVersion, newDocText⟩
let newHeaderAst ← parseHeaderAst newDocText.source
if newHeaderAst != oldDoc.headerAst then
/- TODO(WN): we should amortize this somehow
when the user is typing in an import, this
may rapidly destroy/create new processes -/
terminateFileWorker doc.uri
startFileWorker newMeta
else
let newDoc : OpenDocument := ⟨newMeta, oldDoc.headerAst⟩
updateFileWorkers { fw with doc := newDoc }
tryWriteMessage doc.uri ⟨"textDocument/didChange", p⟩ FileWorker.writeNotification (restartCrashedWorker := true)
tryWriteMessage doc.uri ⟨"textDocument/didChange", ge.params⟩ FileWorker.writeNotification (restartCrashedWorker := true)
def handleDidClose (p : DidCloseTextDocumentParams) : ServerM Unit :=
terminateFileWorker p.textDocument.uri
@ -368,7 +395,7 @@ section MessageHandling
let handle := (fun α [FromJson α] (handler : α → ServerM Unit) => parseParams α params >>= handler)
match method with
| "textDocument/didOpen" => handle DidOpenTextDocumentParams handleDidOpen
| "textDocument/didChange" => handle DidChangeTextDocumentParams handleDidChange
/- NOTE: textDocument/didChange is handled in the main loop. -/
| "textDocument/didClose" => handle DidCloseTextDocumentParams handleDidClose
| "$/cancelRequest" => handle CancelParams handleCancelRequest
| _ => (←read).hLog.putStrLn s!"Got unsupported notification: {method}"
@ -383,33 +410,34 @@ section MainLoop
discard <| IO.wait fw.commTask
inductive ServerEvent where
| WorkerEvent (fw : FileWorker) (ev : WorkerEvent)
| ClientMsg (msg : JsonRpc.Message)
| ClientError (e : IO.Error)
| workerEvent (fw : FileWorker) (ev : WorkerEvent)
| clientMsg (msg : JsonRpc.Message)
| clientError (e : IO.Error)
def runClientTask : ServerM (Task ServerEvent) := do
let st ← read
let readMsgAction : IO ServerEvent := do
/- Runs asynchronously. -/
let msg ← st.hIn.readLspMessage
ServerEvent.ClientMsg msg
ServerEvent.clientMsg msg
let clientTask := (←IO.asTask readMsgAction).map $ fun
| Except.ok ev => ev
| Except.error e => ServerEvent.ClientError e
| Except.error e => ServerEvent.clientError e
return clientTask
partial def mainLoop (clientTask : Task ServerEvent) : ServerM Unit := do
let st ← read
let workers ← st.fileWorkersRef.get
let workerTasks := workers.fold
(fun acc _ fw =>
match fw.state with
| WorkerState.running => fw.commTask.map (ServerEvent.WorkerEvent fw) :: acc
| _ => acc)
([] : List (Task ServerEvent))
let ev ← IO.waitAny $ clientTask :: workerTasks
let mut workerTasks := #[]
for (_, fw) in workers do
if let WorkerState.running := fw.state then
workerTasks := workerTasks.push <| fw.commTask.map (ServerEvent.workerEvent fw)
if let some ge ← fw.groupedEditsRef.get then
workerTasks := workerTasks.push <| ge.signalTask.map (ServerEvent.workerEvent fw)
let ev ← IO.waitAny (workerTasks.push clientTask |>.toList)
match ev with
| ServerEvent.ClientMsg msg =>
| ServerEvent.clientMsg msg =>
match msg with
| Message.request id "shutdown" _ =>
shutdown
@ -417,17 +445,41 @@ section MainLoop
| Message.request id method (some params) =>
handleRequest id method (toJson params)
mainLoop (←runClientTask)
| Message.notification "textDocument/didChange" (some params) =>
let p ← parseParams DidChangeTextDocumentParams (toJson params)
let fw ← findFileWorker p.textDocument.uri
let now ← monoMsNow
/- We wait 500ms since last edit before applying the changes. -/
let applyTime := now + 500
let startingGroup? ← fw.groupedEditsRef.modifyGet fun
| some ge => (false, some { ge with applyTime := applyTime
params.textDocument := p.textDocument
params.contentChanges := ge.params.contentChanges ++ p.contentChanges } )
| none => (true, some { applyTime := applyTime
params := p
/- This is overwritten just below. -/
signalTask := Task.pure WorkerEvent.processGroupedEdits } )
if startingGroup? then
let t ← fw.runEditsSignalTask
fw.groupedEditsRef.modify (Option.map fun ge => { ge with signalTask := t } )
mainLoop (←runClientTask)
| Message.notification method (some params) =>
handleNotification method (toJson params)
mainLoop (←runClientTask)
| _ => throwServerError "Got invalid JSON-RPC message"
| ServerEvent.ClientError e => throw e
| ServerEvent.WorkerEvent fw err =>
match err with
| ServerEvent.clientError e => throw e
| ServerEvent.workerEvent fw ev =>
match ev with
| WorkerEvent.processGroupedEdits =>
handleEdits fw
mainLoop clientTask
| WorkerEvent.ioError e =>
throwServerError s!"IO error while processing events for {fw.doc.meta.uri}: {e}"
| WorkerEvent.crashed e =>
handleCrash fw.doc.meta.uri #[]
mainLoop clientTask
| WorkerEvent.terminated => throwServerError "Internal server error: got termination event for worker that should have been removed"
| WorkerEvent.terminated =>
throwServerError "Internal server error: got termination event for worker that should have been removed"
end MainLoop
def mkLeanServerCapabilities : ServerCapabilities := {