chore: refactor FileWorker

This commit is contained in:
mhuisi 2020-12-12 17:43:02 +01:00 committed by Sebastian Ullrich
parent 0dca44da30
commit 9306b9330e
3 changed files with 272 additions and 283 deletions

View file

@ -36,252 +36,241 @@ If a task that the request task waits for is terminated, a change occured somewh
command that the request is looking for and the request sends a "content changed" error.
-/
namespace Lean
namespace Server
namespace FileWorker
namespace Lean.Server.FileWorker
open Lsp
open IO
open Snapshots
private def sendDiagnostics (h : FS.Stream) (uri : DocumentUri) (version : Nat) (text : FileMap) (log : MessageLog)
: IO Unit := do
let diagnostics ← log.msgs.mapM (msgToDiagnostic text)
h.writeLspNotification {
method := "textDocument/publishDiagnostics"
param := {
uri := uri
version? := version
diagnostics := diagnostics.toArray
: PublishDiagnosticsParams
}
}
section Utils
private def logSnapContent (s : Snapshot) (text : FileMap) : IO Unit :=
IO.eprintln s!"[{s.beginPos}, {s.endPos}]: `{text.source.extract s.beginPos (s.endPos-1)}`"
private def logSnapContent (s : Snapshot) (text : FileMap) : IO Unit :=
IO.eprintln s!"[{s.beginPos}, {s.endPos}]: `{text.source.extract s.beginPos (s.endPos-1)}`"
inductive TaskError where
| aborted
| eof
| ioError (e : IO.Error)
inductive TaskError where
| aborted
| eof
| ioError (e : IO.Error)
instance : Coe IO.Error TaskError := ⟨TaskError.ioError⟩
instance : Coe IO.Error TaskError := ⟨TaskError.ioError⟩
/-- A document editable in the sense that we track the environment
and parser state after each command so that edits can be applied
without recompiling code appearing earlier in the file. -/
structure EditableDocument where
meta : DocumentMeta
/- The first snapshot is that after the header. -/
headerSnap : Snapshot
/- Subsequent snapshots occur after each command. -/
cmdSnaps : AsyncList TaskError Snapshot
private def nextCmdSnap (h : FS.Stream) (uri : DocumentUri) (version : Nat) (contents : FileMap)
: Snapshot → ExceptT TaskError IO Snapshot := fun parentSnap => do
let maybeSnap ← compileNextCmd contents.source parentSnap
if (← IO.checkCanceled) then
throw TaskError.aborted
match maybeSnap with
| Sum.inl snap =>
-- NOTE(MH): This relies on the client discarding old diagnostics upon receiving new ones
-- while prefering newer versions over old ones. The former is necessary because we do
-- not explicitly clear older diagnostics, while the latter is necessary because we do
-- not guarantee that diagnostics are emitted in order. Specifically, it may happen that
-- we interrupted this elaboration task right at this point and a newer elaboration task
-- emits diagnostics, after which we emit old diagnostics because we did not yet detect
-- the interrupt. Explicitly clearing diagnostics is difficult for a similar reason,
-- because we cannot guarantee that no further diagnostics are emitted after clearing
-- them.
sendDiagnostics h uri version contents snap.msgLog
snap
| Sum.inr msgLog =>
sendDiagnostics h uri version contents msgLog
throw TaskError.eof
def unfoldCmdSnaps (h : FS.Stream) (uri : DocumentUri) (version : Nat) (contents : FileMap) (initSnap : Snapshot)
: IO (AsyncList TaskError Snapshot) :=
AsyncList.unfoldAsync
(nextCmdSnap h uri version contents)
initSnap
-- TODO(MH): check for interrupt with increased precision
(some fun _ => pure TaskError.aborted)
/-- A document editable in the sense that we track the environment
and parser state after each command so that edits can be applied
without recompiling code appearing earlier in the file. -/
structure EditableDocument where
version : Nat
text : FileMap
/- The first snapshot is that after the header. -/
headerSnap : Snapshot
/- Subsequent snapshots occur after each command. -/
cmdSnaps : AsyncList TaskError Snapshot
namespace EditableDocument
open Elab
/-- Compiles the contents of a Lean file. -/
def compileDocument (h : FS.Stream) (uri : DocumentUri) (version : Nat) (text : FileMap)
: IO EditableDocument := do
let headerSnap@⟨_, _, _, SnapshotData.headerData env msgLog opts⟩ ← Snapshots.compileHeader text.source
| throwServerError "Internal server error: invalid header snapshot"
-- TODO(WN): Remove the hardcoded option once the server is linked against stage0
let opts' := opts.setBool `interpreter.prefer_native false
let headerSnap' := { headerSnap with data := SnapshotData.headerData env msgLog opts' }
let cmdSnaps ← unfoldCmdSnaps h uri version text headerSnap'
pure ⟨version, text, headerSnap, cmdSnaps⟩
/-- Given `changePos`, the UTF-8 offset of a change into the pre-change source,
and the new document, updates editable doc state. -/
def updateDocument (h : FS.Stream) (uri : DocumentUri) (doc : EditableDocument) (changePos : String.Pos) (newVersion : Nat) (newText : FileMap)
: IO EditableDocument := do
-- The watchdog only restarts the file worker when the syntax tree of the header changes.
-- If e.g. a newline is deleted, it will not restart this file worker, but we still
-- need to reparse the header so the offsets are correct.
let newHeaderSnap ← reparseHeader newText.source doc.headerSnap
if newHeaderSnap.stx != doc.headerSnap.stx then
throwServerError "Internal server error: header changed but worker wasn't restarted."
let ⟨cmdSnaps, e?⟩ ← doc.cmdSnaps.updateFinishedPrefix
match e? with
-- this case should not be possible. only the main task aborts tasks and ensures that aborted tasks
-- do not show up in `snapshots` of an EditableDocument.
| some TaskError.aborted =>
throwServerError "Internal server error: elab task was aborted while still in use."
| some (TaskError.ioError ioError) => throw ioError
| _ => -- No error or EOF
-- NOTE(WN): we invalidate eagerly as `endPos` consumes input greedily. To re-elaborate only
-- when really necessary, we could do a whitespace-aware `Syntax` comparison instead.
let mut validSnaps := cmdSnaps.finishedPrefix.takeWhile (fun s => s.endPos < changePos)
if validSnaps.length = 0 then
let newCmdSnaps ← unfoldCmdSnaps h uri newVersion newText newHeaderSnap
pure ⟨newVersion, newText, newHeaderSnap, newCmdSnaps⟩
else
-- When at least one valid non-header snap exists, it may happen that a change does not fall
-- within the syntactic range of that last snap but still modifies it by appending tokens.
-- We check for this here. We do not currently handle crazy grammars in which an appended
-- token can merge two or more previous commands into one. To do so would require reparsing
-- the entire file.
let mut lastSnap := validSnaps.getLast!
let preLastSnap :=
if validSnaps.length ≥ 2
then validSnaps.get! (validSnaps.length - 2)
else newHeaderSnap
let newLastStx ← parseNextCmd newText.source preLastSnap
if newLastStx != lastSnap.stx then
validSnaps ← validSnaps.dropLast
lastSnap ← preLastSnap
let newSnaps ← unfoldCmdSnaps h uri newVersion newText lastSnap
let newCmdSnaps := AsyncList.ofList validSnaps ++ newSnaps
-- NOTE: We do not cancel old tasks explicitly, the GC does this for us when no refs remain.
pure ⟨newVersion, newText, newHeaderSnap, newCmdSnaps⟩
end EditableDocument
open EditableDocument
instance : Inhabited EditableDocument :=
⟨⟨Inhabited.default, Inhabited.default, Inhabited.default⟩⟩
end Utils
open IO
open Std (RBMap RBMap.empty)
open JsonRpc
abbrev PendingRequestMap := RBMap RequestID (Task (Except IO.Error Unit)) (fun a b => Decidable.decide (a < b))
section ServerM
abbrev PendingRequestMap := RBMap RequestID (Task (Except IO.Error Unit)) (fun a b => Decidable.decide (a < b))
structure ServerContext where
hIn hOut : FS.Stream
docRef : IO.Ref EditableDocument
pendingRequestsRef : IO.Ref PendingRequestMap
structure ServerContext where
hIn hOut : FS.Stream
docRef : IO.Ref EditableDocument
pendingRequestsRef : IO.Ref PendingRequestMap
abbrev ServerM := ReaderT ServerContext IO
abbrev ServerM := ReaderT ServerContext IO
def setDocument (val : EditableDocument) : ServerM Unit :=
fun st => st.docRef.set val
def updatePendingRequests (map : PendingRequestMap → PendingRequestMap) : ServerM Unit := do
(←read).pendingRequestsRef.modify map
def getDocument : ServerM EditableDocument :=
fun st => st.docRef.get
/-- Elaborates the next command after `parentSnap` and emits diagnostics. -/
private def nextCmdSnap (m : DocumentMeta) (parentSnap : Snapshot) : ExceptT TaskError ServerM Snapshot := do
let st ← read
let maybeSnap ← compileNextCmd m.text.source parentSnap
if ←IO.checkCanceled then
throw TaskError.aborted
let sendDiagnostics (msgLog : MessageLog) : IO Unit := do
let diagnostics ← msgLog.msgs.mapM (msgToDiagnostic m.text)
st.hOut.writeLspNotification {
method := "textDocument/publishDiagnostics"
param := {
uri := m.uri
version? := m.version
diagnostics := diagnostics.toArray
: PublishDiagnosticsParams
}
}
match maybeSnap with
| Sum.inl snap =>
/- NOTE(MH): This relies on the client discarding old diagnostics upon receiving new ones
while prefering newer versions over old ones. The former is necessary because we do
not explicitly clear older diagnostics, while the latter is necessary because we do
not guarantee that diagnostics are emitted in order. Specifically, it may happen that
we interrupted this elaboration task right at this point and a newer elaboration task
emits diagnostics, after which we emit old diagnostics because we did not yet detect
the interrupt. Explicitly clearing diagnostics is difficult for a similar reason,
because we cannot guarantee that no further diagnostics are emitted after clearing
them. -/
sendDiagnostics snap.msgLog
snap
| Sum.inr msgLog =>
sendDiagnostics msgLog
throw TaskError.eof
def updatePendingRequests (map : PendingRequestMap → PendingRequestMap) : ServerM Unit :=
fun st => st.pendingRequestsRef.modify map
/-- Elaborates all commands after `initSnap`, emitting the diagnostics. -/
def unfoldCmdSnaps (m : DocumentMeta) (initSnap : Snapshot) : ServerM (AsyncList TaskError Snapshot) := do
-- TODO(MH): check for interrupt with increased precision
AsyncList.unfoldAsync (nextCmdSnap m . (←read)) initSnap (some fun _ => pure TaskError.aborted)
def openDocument (h : FS.Stream) (p : DidOpenTextDocumentParams) : IO EditableDocument :=
let doc := p.textDocument
-- NOTE(WN): `toFileMap` marks line beginnings as immediately following
-- "\n", which should be enough to handle both LF and CRLF correctly.
-- This is because LSP always refers to characters by (line, column),
-- so if we get the line number correct it shouldn't matter that there
-- is a CR there.
compileDocument h doc.uri doc.version doc.text.toFileMap
/-- Compiles the contents of a Lean file. -/
def compileDocument (m : DocumentMeta) : ServerM Unit := do
let headerSnap@⟨_, _, _, SnapshotData.headerData env msgLog opts⟩ ← Snapshots.compileHeader m.text.source
| throwServerError "Internal server error: invalid header snapshot"
-- TODO(WN): Remove the hardcoded option once the server is linked against stage0
let opts' := opts.setBool `interpreter.prefer_native false
let headerSnap' := { headerSnap with data := SnapshotData.headerData env msgLog opts' }
let cmdSnaps ← unfoldCmdSnaps m headerSnap
(←read).docRef.set ⟨m, headerSnap, cmdSnaps⟩
def handleDidChange (p : DidChangeTextDocumentParams) : ServerM Unit := do
let docId := p.textDocument
let changes := p.contentChanges
let oldDoc ← getDocument
let some newVersion ← pure docId.version?
| throwServerError "Expected version number"
if newVersion <= oldDoc.version then
throwServerError "Got outdated version number"
if ¬ changes.isEmpty then
let (newDocText, minStartOff) := foldDocumentChanges changes oldDoc.text
let newDoc ← updateDocument (←read).hOut docId.uri oldDoc minStartOff newVersion newDocText
setDocument newDoc
/-- Given the new document and `changePos`, the UTF-8 offset of a change into the pre-change source,
updates editable doc state. -/
def updateDocument (newMeta : DocumentMeta) (changePos : String.Pos) : ServerM Unit := do
-- The watchdog only restarts the file worker when the syntax tree of the header changes.
-- If e.g. a newline is deleted, it will not restart this file worker, but we still
-- need to reparse the header so that the offsets are correct.
let st ← read
let oldDoc ← st.docRef.get
let newHeaderSnap ← reparseHeader newMeta.text.source oldDoc.headerSnap
if newHeaderSnap.stx != oldDoc.headerSnap.stx then
throwServerError "Internal server error: header changed but worker wasn't restarted."
let ⟨cmdSnaps, e?⟩ ← oldDoc.cmdSnaps.updateFinishedPrefix
match e? with
-- This case should not be possible. only the main task aborts tasks and ensures that aborted tasks
-- do not show up in `snapshots` of an EditableDocument.
| some TaskError.aborted =>
throwServerError "Internal server error: elab task was aborted while still in use."
| some (TaskError.ioError ioError) => throw ioError
| _ => -- No error or EOF
-- NOTE(WN): we invalidate eagerly as `endPos` consumes input greedily. To re-elaborate only
-- when really necessary, we could do a whitespace-aware `Syntax` comparison instead.
let mut validSnaps := cmdSnaps.finishedPrefix.takeWhile (fun s => s.endPos < changePos)
if validSnaps.length = 0 then
let newCmdSnaps ← unfoldCmdSnaps newMeta newHeaderSnap
st.docRef.set ⟨newMeta, newHeaderSnap, newCmdSnaps⟩
else
/- When at least one valid non-header snap exists, it may happen that a change does not fall
within the syntactic range of that last snap but still modifies it by appending tokens.
We check for this here. We do not currently handle crazy grammars in which an appended
token can merge two or more previous commands into one. To do so would require reparsing
the entire file. -/
let mut lastSnap := validSnaps.getLast!
let preLastSnap :=
if validSnaps.length ≥ 2
then validSnaps.get! (validSnaps.length - 2)
else newHeaderSnap
let newLastStx ← parseNextCmd newMeta.text.source preLastSnap
if newLastStx != lastSnap.stx then
validSnaps ← validSnaps.dropLast
lastSnap ← preLastSnap
let newSnaps ← unfoldCmdSnaps newMeta lastSnap
let newCmdSnaps := AsyncList.ofList validSnaps ++ newSnaps
st.docRef.set ⟨newMeta, newHeaderSnap, newCmdSnaps⟩
end ServerM
def handleCancelRequest (p : CancelParams) : ServerM Unit := do
updatePendingRequests (fun pendingRequests => pendingRequests.erase p.id)
section NotificationHandling
def handleDidOpen (p : DidOpenTextDocumentParams) : ServerM Unit :=
let doc := p.textDocument
/- NOTE(WN): `toFileMap` marks line beginnings as immediately following
"\n", which should be enough to handle both LF and CRLF correctly.
This is because LSP always refers to characters by (line, column),
so if we get the line number correct it shouldn't matter that there
is a CR there. -/
compileDocument ⟨doc.uri, doc.version, doc.text.toFileMap⟩
-- TODO(MH): requests that need data from a certain command should traverse e.snapshots
-- by successively getting the next task, meaning that we might need to wait for elaboration.
-- Sebastian said something about a future function TaskIO.bind that ensures that the
-- request task will also stop waiting when the reference to the task is released by handleDidChange.
-- when that happens, the request should send a "content changed" error to the user.
-- (this way, the server doesn't get bogged down in requests for an old state of the document)
-- requests need to manually check for whether their task has been cancelled, so that they
-- can reply with a RequestCancelled error.
def handleHover (p : HoverParams) (e : EditableDocument) : IO Unit := pure ⟨⟩
def handleDidChange (p : DidChangeTextDocumentParams) : ServerM Unit := do
let docId := p.textDocument
let changes := p.contentChanges
let oldDoc ← (←read).docRef.get
let some newVersion ← pure docId.version?
| throwServerError "Expected version number"
if newVersion <= oldDoc.meta.version then
throwServerError "Got outdated version number"
if ¬ changes.isEmpty then
let (newDocText, minStartOff) := foldDocumentChanges changes oldDoc.meta.text
updateDocument ⟨docId.uri, newVersion, newDocText⟩ minStartOff
def parseParams (paramType : Type) [FromJson paramType] (params : Json) : ServerM paramType :=
match fromJson? params with
| some parsed => pure parsed
| none => throwServerError $ "Got param with wrong structure: " ++ params.compress
def handleCancelRequest (p : CancelParams) : ServerM Unit := do
updatePendingRequests (fun pendingRequests => pendingRequests.erase p.id)
end NotificationHandling
def handleNotification (method : String) (params : Json) : ServerM Unit := do
let handle := fun paramType [FromJson paramType] (handler : paramType → ServerM Unit) =>
parseParams paramType params >>= handler
match method with
| "textDocument/didChange" => handle DidChangeTextDocumentParams handleDidChange
| "$/cancelRequest" => handle CancelParams handleCancelRequest
| _ => throwServerError $ "Got unsupported notification method: " ++ method
section RequestHandling
/- TODO(MH): Requests that need data from a certain command should traverse e.snapshots
by successively getting the next task, meaning that we might need to wait for elaboration.
Sebastian said something about a future function TaskIO.bind that ensures that the
request task will also stop waiting when the reference to the task is released by handleDidChange.
When that happens, the request should send a "content changed" error to the user
(this way, the server doesn't get bogged down in requests for an old state of the document).
Requests need to manually check for whether their task has been cancelled, so that they
can reply with a RequestCancelled error. -/
def handleHover (p : HoverParams) (e : EditableDocument) : IO Unit := pure ⟨⟩
end RequestHandling
def queueRequest (id : RequestID) (handler : α → EditableDocument → IO Unit) (params : α)
: ServerM Unit := do
let requestTask ← asTask (handler params (←getDocument))
updatePendingRequests (fun pendingRequests => pendingRequests.insert id requestTask)
section MessageHandling
def parseParams (paramType : Type) [FromJson paramType] (params : Json) : ServerM paramType :=
match fromJson? params with
| some parsed => pure parsed
| none => throwServerError $ "Got param with wrong structure: " ++ params.compress
def handleRequest (id : RequestID) (method : String) (params : Json)
: ServerM Unit := do
let handle := fun paramType [FromJson paramType]
(handler : paramType → EditableDocument → IO Unit) =>
parseParams paramType params >>= queueRequest id handler
match method with
| "textDocument/hover" => handle HoverParams handleHover
| _ => throwServerError $ "Got unsupported request: " ++ method ++
" params: " ++ toString params
def handleNotification (method : String) (params : Json) : ServerM Unit := do
let handle := fun paramType [FromJson paramType] (handler : paramType → ServerM Unit) =>
parseParams paramType params >>= handler
match method with
| "textDocument/didChange" => handle DidChangeTextDocumentParams handleDidChange
| "$/cancelRequest" => handle CancelParams handleCancelRequest
| _ => throwServerError $ "Got unsupported notification method: " ++ method
partial def mainLoop : ServerM Unit := do
let st ← read
let msg ← st.hIn.readLspMessage
let pendingRequests ← st.pendingRequestsRef.get
let filterFinishedTasks : PendingRequestMap → RequestID → Task (Except IO.Error Unit)
→ ServerM PendingRequestMap := fun acc id task => do
if (←hasFinished task) then acc.erase id
else acc
let pendingRequests ← pendingRequests.foldM filterFinishedTasks pendingRequests
st.pendingRequestsRef.set pendingRequests
match msg with
| Message.request id method (some params) =>
handleRequest id method (toJson params)
mainLoop
| Message.notification "exit" none =>
-- should be sufficient to shut down the file worker.
-- references are lost => tasks are marked as cancelled
-- => all tasks eventually quit
()
| Message.notification method (some params) =>
handleNotification method (toJson params)
mainLoop
| _ => throwServerError "Got invalid JSON-RPC message"
def queueRequest (id : RequestID) (handler : α → EditableDocument → IO Unit) (params : α)
: ServerM Unit := do
let requestTask ← asTask (handler params (←(←read).docRef.get))
updatePendingRequests (fun pendingRequests => pendingRequests.insert id requestTask)
def handleRequest (id : RequestID) (method : String) (params : Json)
: ServerM Unit := do
let handle := fun paramType [FromJson paramType]
(handler : paramType → EditableDocument → IO Unit) =>
parseParams paramType params >>= queueRequest id handler
match method with
| "textDocument/hover" => handle HoverParams handleHover
| _ => throwServerError $ "Got unsupported request: " ++ method ++
" params: " ++ toString params
end MessageHandling
section MainLoop
partial def mainLoop : ServerM Unit := do
let st ← read
let msg ← st.hIn.readLspMessage
let pendingRequests ← st.pendingRequestsRef.get
let filterFinishedTasks : PendingRequestMap → RequestID → Task (Except IO.Error Unit)
→ ServerM PendingRequestMap := fun acc id task => do
if (←hasFinished task) then acc.erase id
else acc
let pendingRequests ← pendingRequests.foldM filterFinishedTasks pendingRequests
st.pendingRequestsRef.set pendingRequests
match msg with
| Message.request id method (some params) =>
handleRequest id method (toJson params)
mainLoop
| Message.notification "exit" none =>
-- should be sufficient to shut down the file worker.
-- references are lost => tasks are marked as cancelled
-- => all tasks eventually quit
()
| Message.notification method (some params) =>
handleNotification method (toJson params)
mainLoop
| _ => throwServerError "Got invalid JSON-RPC message"
end MainLoop
def initAndRunWorker (i o e : FS.Stream) : IO Unit := do
let i ← maybeTee "fwIn.txt" false i
@ -291,15 +280,14 @@ def initAndRunWorker (i o e : FS.Stream) : IO Unit := do
let ⟨_, param⟩ ← i.readLspNotificationAs "textDocument/didOpen" DidOpenTextDocumentParams
let e ← e.withPrefix s!"[{param.textDocument.uri}] "
let _ ← IO.setStderr e
let doc ← openDocument o param
ReaderT.run mainLoop {
hIn := i
hOut := o
docRef := ←IO.mkRef doc
let ctx : ServerContext := {
hIn := i
hOut := o
-- `openDocument` will not access `docRef`, but set it
docRef := ←IO.mkRef arbitrary
pendingRequestsRef := ←IO.mkRef (RBMap.empty : PendingRequestMap)
: ServerContext
}
ReaderT.run (do handleDidOpen param; mainLoop) ctx
namespace Test
@ -325,6 +313,4 @@ def workerMain : IO UInt32 := do
e.putStrLn (toString err)
return 1
end FileWorker
end Server
end Lean
end Lean.Server.FileWorker

View file

@ -12,8 +12,7 @@ namespace IO
def throwServerError (err : String) : IO α :=
throw (userError err)
namespace FS
namespace Stream
namespace FS.Stream
/-- Chains two streams by creating a new stream s.t. writing to it
just writes to `a` but reading from it also duplicates the read output
@ -57,12 +56,18 @@ def withPrefix (a : Stream) (pre : String) : Stream :=
putStr := fun s =>
a.putStr (pre ++ s) }
end Stream
end FS
end FS.Stream
end IO
namespace Lean
namespace Server
namespace Lean.Server
structure DocumentMeta where
uri : Lsp.DocumentUri
version : Nat
text : FileMap
instance : Inhabited DocumentMeta :=
⟨⟨Inhabited.default, Inhabited.default, Inhabited.default⟩⟩
def replaceLspRange (text : FileMap) (r : Lsp.Range) (newText : String) : FileMap :=
let start := text.lspPosToUtf8Pos r.start
@ -106,8 +111,7 @@ def foldDocumentChanges (changes : @& Array Lsp.TextDocumentContentChangeEvent)
-- NOTE: We assume Lean files are below 16 EiB.
changes.foldl accumulateChanges (oldText, 0xffffffff)
end Server
end Lean
end Lean.Server
namespace List

View file

@ -67,11 +67,10 @@ open JsonRpc
section Utils
structure OpenDocument where
version : Nat
text : FileMap
meta : DocumentMeta
headerAst : Syntax
def workerCfg : Process.StdioConfig := {
def workerCfg : Process.StdioConfig := {
stdin := Process.Stdio.piped
stdout := Process.Stdio.piped
-- We pass workers' stderr through to the editor.
@ -159,8 +158,8 @@ section ServerM
abbrev ServerM := ReaderT ServerContext IO
def updateFileWorkers (uri : DocumentUri) (val : FileWorker) : ServerM Unit := do
(←read).fileWorkersRef.modify (fun fileWorkers => fileWorkers.insert uri val)
def updateFileWorkers (val : FileWorker) : ServerM Unit := do
(←read).fileWorkersRef.modify (fun fileWorkers => fileWorkers.insert val.doc.meta.uri val)
def findFileWorker (uri : DocumentUri) : ServerM FileWorker := do
match (←(←read).fileWorkersRef.get).find? uri with
@ -203,18 +202,18 @@ section ServerM
| Except.ok ev => ev
| Except.error e => WorkerEvent.crashed e
def startFileWorker (uri : DocumentUri) (version : Nat) (text : FileMap) : ServerM Unit := do
def startFileWorker (m : DocumentMeta) : ServerM Unit := do
let st ← read
let headerAst ← parseHeaderAst text.source
let workerProc ← Process.spawn {
let headerAst ← parseHeaderAst m.text.source
let workerProc ← Process.spawn {
toStdioConfig := workerCfg
cmd := st.workerPath
args := #["--worker"]
args := #["--worker"]
}
let pendingRequestsRef ← IO.mkRef (RBMap.empty : PendingRequestMap)
-- The task will never access itself, so this is fine
let commTaskFw : FileWorker := {
doc := ⟨version, text, headerAst⟩
doc := ⟨m, headerAst⟩
proc := workerProc
commTask := Task.pure WorkerEvent.terminated
state := WorkerState.running
@ -224,17 +223,17 @@ section ServerM
let fw : FileWorker := { commTaskFw with commTask := commTask }
fw.stdin.writeLspRequest ⟨0, "initialize", st.initParams⟩
fw.writeNotification {
method := "textDocument/didOpen"
method := "textDocument/didOpen"
param := {
textDocument := {
uri := uri
uri := m.uri
languageId := "lean"
version := version
text := text.source
version := m.version
text := m.text.source
} : DidOpenTextDocumentParams
}
}
updateFileWorkers uri fw
updateFileWorkers fw
def terminateFileWorker (uri : DocumentUri) : ServerM Unit := do
/- The file worker must have crashed just when we were about to terminate it!
@ -248,23 +247,23 @@ section ServerM
eraseFileWorker uri
def handleCrash (uri : DocumentUri) (queuedMsgs : Array JsonRpc.Message) : ServerM Unit := do
updateFileWorkers uri { ←findFileWorker uri with state := WorkerState.crashed queuedMsgs }
updateFileWorkers { ←findFileWorker uri with state := WorkerState.crashed queuedMsgs }
/-- Tries to write a message, sets the state of the FileWorker to `crashed` if it does not succeed
and restarts the file worker if the `crashed` flag was already set.
Messages that couldn't be sent can be queued up via the queueFailedMessage flag and
will be discharged after the FileWorker is restarted. -/
def tryWriteMessage [Coe α JsonRpc.Message] (uri : DocumentUri) (msg : α) (writeAction : FileWorker → α → IO Unit)
def tryWriteMessage [Coe α JsonRpc.Message] (uri : DocumentUri) (msg : α) (writeAction : FileWorker → α → IO Unit)
(queueFailedMessage := true) : ServerM Unit := do
let fw ← findFileWorker uri
match fw.state with
| WorkerState.crashed queuedMsgs =>
let mut queuedMsgs := queuedMsgs
if queueFailedMessage then
if queueFailedMessage then
queuedMsgs := queuedMsgs.push msg
-- restart the crashed FileWorker
eraseFileWorker uri
startFileWorker uri fw.doc.version fw.doc.text
startFileWorker fw.doc.meta
let newFw ← findFileWorker uri
let mut crashedMsgs := #[]
-- try to discharge all queued msgs, tracking the ones that we can't discharge
@ -276,14 +275,14 @@ section ServerM
if ¬ crashedMsgs.isEmpty then
handleCrash uri crashedMsgs
| WorkerState.running =>
let initialQueuedMsgs :=
let initialQueuedMsgs :=
if queueFailedMessage then
#[msg]
else
#[]
try
try
writeAction fw msg
catch _ =>
catch _ =>
handleCrash uri (initialQueuedMsgs.map Coe.coe)
end ServerM
@ -295,7 +294,7 @@ section NotificationHandling
This is because LSP always refers to characters by (line, column),
so if we get the line number correct it shouldn't matter that there
is a CR there. -/
startFileWorker doc.uri doc.version doc.text.toFileMap
startFileWorker doc.uri, doc.version, doc.text.toFileMap
def handleDidChange (p : DidChangeTextDocumentParams) : ServerM Unit := do
let doc := p.textDocument
@ -304,22 +303,22 @@ section NotificationHandling
let oldDoc := fw.doc
let some newVersion ← pure doc.version?
| throwServerError "Expected version number"
if newVersion <= oldDoc.version then
if newVersion <= oldDoc.meta.version then
throwServerError "Got outdated version number"
if changes.isEmpty then
if changes.isEmpty then
return
let (newDocText, _) := foldDocumentChanges changes oldDoc.text
let (newDocText, _) := foldDocumentChanges changes oldDoc.meta.text
let newMeta : DocumentMeta := ⟨doc.uri, newVersion, newDocText⟩
let newHeaderAst ← parseHeaderAst newDocText.source
if newHeaderAst != oldDoc.headerAst then
/- TODO(WN): we should amortize this somehow
when the user is typing in an import, this
may rapidly destroy/create new processes -/
terminateFileWorker doc.uri
startFileWorker doc.uri newVersion newDocText
startFileWorker newMeta
else
let newDoc : OpenDocument := ⟨newVersion, newDocText, oldDoc.headerAst⟩
let newFw : FileWorker := { fw with doc := newDoc }
updateFileWorkers doc.uri newFw
let newDoc : OpenDocument := ⟨newMeta, oldDoc.headerAst⟩
updateFileWorkers { fw with doc := newDoc }
tryWriteMessage doc.uri ⟨"textDocument/didChange", p⟩ FileWorker.writeNotification
def handleDidClose (p : DidCloseTextDocumentParams) : ServerM Unit :=
@ -359,13 +358,13 @@ end MessageHandling
section MainLoop
def shutdown : ServerM Unit := do
let fileWorkers ← (←read).fileWorkersRef.get
for ⟨id, _⟩ in fileWorkers do
terminateFileWorker id
for ⟨uri, _⟩ in fileWorkers do
terminateFileWorker uri
for ⟨_, fw⟩ in fileWorkers do
let _ ← IO.wait fw.commTask
inductive ServerEvent where
| WorkerEvent (uri : DocumentUri) (fw : FileWorker) (ev : WorkerEvent)
| WorkerEvent (fw : FileWorker) (ev : WorkerEvent)
| ClientMsg (msg : JsonRpc.Message)
| ClientError (e : IO.Error)
@ -380,9 +379,9 @@ section MainLoop
let st ← read
let workers ← st.fileWorkersRef.get
let workerTasks := workers.fold
(fun acc uri fw =>
(fun acc _ fw =>
match fw.state with
| WorkerState.running => fw.commTask.map (ServerEvent.WorkerEvent uri fw) :: acc
| WorkerState.running => fw.commTask.map (ServerEvent.WorkerEvent fw) :: acc
| _ => acc)
([] : List (Task ServerEvent))
let ev ← IO.waitAny $ clientTask :: workerTasks
@ -400,23 +399,23 @@ section MainLoop
mainLoop (←runClientTask)
| _ => throwServerError "Got invalid JSON-RPC message"
| ServerEvent.ClientError e => throw e
| ServerEvent.WorkerEvent uri fw err =>
| ServerEvent.WorkerEvent fw err =>
match err with
| WorkerEvent.crashed e =>
handleCrash uri #[]
handleCrash fw.doc.meta.uri #[]
mainLoop clientTask
| WorkerEvent.terminated => throwServerError "Internal server error: got termination event for worker that should have been removed"
end MainLoop
def mkLeanServerCapabilities : ServerCapabilities := {
textDocumentSync? := some {
def mkLeanServerCapabilities : ServerCapabilities := {
textDocumentSync? := some {
openClose := true
change := TextDocumentSyncKind.incremental
willSave := false
willSaveWaitUntil := false
save? := none
save? := none
}
hoverProvider := true
hoverProvider := true
}
def initAndRunWatchdogAux : ServerM Unit := do
@ -442,23 +441,23 @@ def initAndRunWatchdog (i o e : FS.Stream) : IO Unit := do
let initRequest ← i.readLspRequestAs "initialize" InitializeParams
o.writeLspResponse {
id := initRequest.id
result := {
result := {
capabilities := mkLeanServerCapabilities
serverInfo? := some {
serverInfo? := some {
name := "Lean 4 server"
version? := "0.0.1"
version? := "0.0.1"
}
: InitializeResult
: InitializeResult
}
}
ReaderT.run initAndRunWatchdogAux {
ReaderT.run initAndRunWatchdogAux {
hIn := i
hOut := o
hLog := e
fileWorkersRef := fileWorkersRef
initParams := initRequest.param
workerPath := workerPath
: ServerContext
: ServerContext
}
namespace Test