fix: language server dropping requests (#7178)
This PR fixes a race condition in the language server that would sometimes cause it to drop requests and never respond to them when editing the header of a file. This in turn could cause semantic highlighting to stop functioning in VS Code, as VS Code would stop emitting requests when a prior request was dropped, and also cause the InfoView to become defective. It would also cause import auto-completion to feel a bit wonky, since these requests were sometimes dropped. This race condition has been present in the language server since its first version in 2020. This PR also reverts the futile fix attempt in #7130. The specific race condition was that if the file worker crashed or had to be restarted while a request was in flight in the file worker, then we wouldn't correctly replay it in our watchdog crash-restart logic. This PR adjusts this logic to fix this.
This commit is contained in:
parent
2ac0e4c061
commit
80b1ce8cad
2 changed files with 284 additions and 202 deletions
|
|
@ -159,12 +159,13 @@ structure SemanticTokensState where
|
|||
/-- Computes all semantic tokens for the document. -/
|
||||
def handleSemanticTokensFull (_ : SemanticTokensParams) (_ : SemanticTokensState)
|
||||
: RequestM (LspResponse SemanticTokens × SemanticTokensState) := do
|
||||
let ctx ← read
|
||||
let doc ← readDoc
|
||||
-- Only grabs the finished prefix so that we do not need to wait for elaboration to complete
|
||||
-- for the full file before sending a response. This means that the response will be incomplete,
|
||||
-- which we mitigate by regularly sending `workspace/semanticTokens/refresh` requests in the
|
||||
-- `FileWorker` to tell the client to re-compute the semantic tokens.
|
||||
let (snaps, _, isComplete) ← doc.cmdSnaps.getFinishedPrefix
|
||||
let (snaps, _, isComplete) ← doc.cmdSnaps.getFinishedPrefixWithTimeout 3000 (cancelTk? := ctx.cancelTk.cancellationTask)
|
||||
let response ← computeSemanticTokens doc 0 none snaps
|
||||
return ({ response, isComplete }, ⟨⟩)
|
||||
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ Authors: Marc Huisinga, Wojciech Nawrocki
|
|||
prelude
|
||||
import Init.System.IO
|
||||
import Std.Sync.Mutex
|
||||
import Std.Data.TreeMap
|
||||
import Init.Data.ByteArray
|
||||
import Lean.Data.RBMap
|
||||
|
||||
|
|
@ -59,7 +60,6 @@ Moreover, we don't implement the full protocol at this level:
|
|||
- After `initialize`, the watchdog sends the corresponding `didOpen` notification with the full
|
||||
current state of the file. No additional `didOpen` notifications will be forwarded to the worker
|
||||
process.
|
||||
- `$/cancelRequest` notifications are forwarded to all file workers.
|
||||
- File workers are always terminated with an `exit` notification, without previously receiving a
|
||||
`shutdown` request. Similarly, they never receive a `didClose` notification.
|
||||
|
||||
|
|
@ -89,41 +89,141 @@ section Utils
|
|||
inductive WorkerEvent where
|
||||
| terminated
|
||||
| importsChanged
|
||||
| crashed (e : IO.Error)
|
||||
| crashed (exitCode : UInt32)
|
||||
| ioError (e : IO.Error)
|
||||
|
||||
inductive CrashOrigin
|
||||
| fileWorkerToClientForwarding
|
||||
| clientToFileWorkerForwarding
|
||||
|
||||
inductive WorkerState where
|
||||
/-- The watchdog can detect a crashed file worker in two places: When trying to send a message
|
||||
to the file worker and when reading a request reply.
|
||||
In the latter case, the forwarding task terminates and delegates a `crashed` event to the
|
||||
main task. Then, in both cases, the file worker has its state set to `crashed` and requests
|
||||
that are in-flight are errored. Upon receiving the next packet for that file worker, the file
|
||||
worker is restarted and the packet is forwarded to it. If the crash was detected while writing
|
||||
a packet, we queue that packet until the next packet for the file worker arrives. -/
|
||||
| crashed (queuedMsgs : Array JsonRpc.Message) (origin : CrashOrigin)
|
||||
| crashed
|
||||
| cannotWrite
|
||||
| terminating
|
||||
| running
|
||||
|
||||
abbrev PendingRequestMap := RBMap RequestID JsonRpc.Message compare
|
||||
structure RequestQueueMap where
|
||||
i : Nat
|
||||
reqs : Std.TreeMap RequestID (Nat × JsonRpc.Message)
|
||||
queue : Std.TreeMap Nat (RequestID × JsonRpc.Message)
|
||||
deriving Inhabited
|
||||
|
||||
namespace RequestQueueMap
|
||||
def empty : RequestQueueMap where
|
||||
i := 0
|
||||
reqs := ∅
|
||||
queue := ∅
|
||||
|
||||
instance : EmptyCollection RequestQueueMap where
|
||||
emptyCollection := .empty
|
||||
|
||||
def enqueue (m : RequestQueueMap) (req : JsonRpc.Message) : RequestQueueMap := Id.run do
|
||||
let .request id .. := req
|
||||
| panic! "Got non-request in `RequestQueueMap.enqueue`."
|
||||
{
|
||||
i := m.i + 1
|
||||
reqs := m.reqs.insert id (m.i, req)
|
||||
queue := m.queue.insert m.i (id, req)
|
||||
}
|
||||
|
||||
def erase (m : RequestQueueMap) (id : RequestID) : RequestQueueMap := Id.run do
|
||||
let some (i, _) := m.reqs.get? id
|
||||
| return m
|
||||
return { m with
|
||||
reqs := m.reqs.erase id
|
||||
queue := m.queue.erase i
|
||||
}
|
||||
|
||||
def contains (m : RequestQueueMap) (id : RequestID) : Bool :=
|
||||
m.reqs.contains id
|
||||
|
||||
instance : ForIn m RequestQueueMap (RequestID × JsonRpc.Message) where
|
||||
forIn map init f := map.queue.forIn (fun _ a b => f a b) init
|
||||
end RequestQueueMap
|
||||
|
||||
structure RequestData where
|
||||
requestQueues : Std.TreeMap DocumentUri RequestQueueMap
|
||||
uriByRequest : Std.TreeMap RequestID DocumentUri
|
||||
deriving Inhabited
|
||||
|
||||
namespace RequestData
|
||||
|
||||
def empty : RequestData := {
|
||||
requestQueues := ∅
|
||||
uriByRequest := ∅
|
||||
}
|
||||
|
||||
def clearWorkerRequestData (d : RequestData) (uri : DocumentUri) : RequestData := Id.run do
|
||||
let some requestQueue := d.requestQueues.get? uri
|
||||
| return d
|
||||
let mut uriByRequest := d.uriByRequest
|
||||
for (id, _) in requestQueue do
|
||||
uriByRequest := uriByRequest.erase id
|
||||
let requestQueues := d.requestQueues.erase uri
|
||||
return {
|
||||
requestQueues,
|
||||
uriByRequest
|
||||
: RequestData
|
||||
}
|
||||
|
||||
def enqueue (d : RequestData) (uri : DocumentUri) (req : JsonRpc.Message) : RequestData := Id.run do
|
||||
let .request id .. := req
|
||||
| panic! "Got non-request in `RequestData.enqueue`."
|
||||
return {
|
||||
requestQueues := d.requestQueues.insertIfNew uri ∅ |>.modify uri (·.enqueue req)
|
||||
uriByRequest := d.uriByRequest.insert id uri
|
||||
}
|
||||
|
||||
def erase (d : RequestData) (uri : DocumentUri) (id : RequestID) : RequestData where
|
||||
requestQueues := d.requestQueues.modify uri (·.erase id)
|
||||
uriByRequest := d.uriByRequest.erase id
|
||||
|
||||
def contains (d : RequestData) (uri : DocumentUri) (id : RequestID) : Bool := Id.run do
|
||||
let some uri' := d.uriByRequest.get? id
|
||||
| return false
|
||||
return uri == uri'
|
||||
|
||||
def getUri? (d : RequestData) (id : RequestID) : Option DocumentUri :=
|
||||
d.uriByRequest.get? id
|
||||
|
||||
def getRequestQueue (d : RequestData) (uri : DocumentUri) : RequestQueueMap :=
|
||||
d.requestQueues.get? uri |>.getD ∅
|
||||
|
||||
end RequestData
|
||||
|
||||
abbrev RequestDataMutex := Std.Mutex RequestData
|
||||
|
||||
namespace RequestDataMutex
|
||||
|
||||
def new : BaseIO RequestDataMutex :=
|
||||
Std.Mutex.new .empty
|
||||
|
||||
def clearWorkerRequestData (m : RequestDataMutex) (uri : DocumentUri) : BaseIO Unit :=
|
||||
m.atomically do modify (·.clearWorkerRequestData uri)
|
||||
|
||||
def enqueue (m : RequestDataMutex) (uri : DocumentUri) (req : JsonRpc.Message) : BaseIO Unit :=
|
||||
m.atomically do modify (·.enqueue uri req)
|
||||
|
||||
def erase (m : RequestDataMutex) (uri : DocumentUri) (id : RequestID) : BaseIO Unit :=
|
||||
m.atomically do modify (·.erase uri id)
|
||||
|
||||
def contains (m : RequestDataMutex) (uri : DocumentUri) (id : RequestID) : BaseIO Bool :=
|
||||
m.atomically do return (← get).contains uri id
|
||||
|
||||
def getUri? (m : RequestDataMutex) (id : RequestID) : BaseIO (Option DocumentUri) :=
|
||||
m.atomically do return (← get).getUri? id
|
||||
|
||||
def getRequestQueue (m : RequestDataMutex) (uri : DocumentUri) : BaseIO RequestQueueMap :=
|
||||
m.atomically do return (← get).getRequestQueue uri
|
||||
|
||||
end RequestDataMutex
|
||||
|
||||
end Utils
|
||||
|
||||
section FileWorker
|
||||
|
||||
structure FileWorker where
|
||||
doc : DocumentMeta
|
||||
proc : Process.Child workerCfg
|
||||
exitCode : Std.Mutex (Option UInt32)
|
||||
commTask : ServerTask WorkerEvent
|
||||
state : WorkerState
|
||||
-- This should not be mutated outside of namespace FileWorker,
|
||||
-- as it is used as shared mutable state
|
||||
/-- The pending requests map contains all requests that have been received from the LSP client,
|
||||
but were not answered yet.
|
||||
We need them for forwarding cancellation requests to the correct worker as well as cleanly
|
||||
aborting requests on worker crashes. -/
|
||||
pendingRequestsRef : IO.Ref PendingRequestMap
|
||||
doc : DocumentMeta
|
||||
proc : Process.Child workerCfg
|
||||
exitCode : Std.Mutex (Option UInt32)
|
||||
commTask : ServerTask WorkerEvent
|
||||
state : Std.Mutex WorkerState
|
||||
|
||||
namespace FileWorker
|
||||
|
||||
|
|
@ -133,21 +233,6 @@ section FileWorker
|
|||
def stdout (fw : FileWorker) : FS.Stream :=
|
||||
FS.Stream.ofHandle fw.proc.stdout
|
||||
|
||||
def erasePendingRequest (fw : FileWorker) (id : RequestID) : IO Unit :=
|
||||
fw.pendingRequestsRef.modify fun pendingRequests => pendingRequests.erase id
|
||||
|
||||
def errorPendingRequests (fw : FileWorker) (hError : FS.Stream) (code : ErrorCode) (msg : String)
|
||||
: IO Unit := do
|
||||
let pendingRequests ← fw.pendingRequestsRef.modifyGet
|
||||
fun pendingRequests => (pendingRequests, RBMap.empty)
|
||||
for ⟨id, _⟩ in pendingRequests do
|
||||
hError.writeLspResponseError { id := id, code := code, message := msg }
|
||||
|
||||
def queuedMsgs (fw : FileWorker) : Array JsonRpc.Message :=
|
||||
match fw.state with
|
||||
| .running => #[]
|
||||
| .crashed queuedMsgs _ => queuedMsgs
|
||||
|
||||
def waitForProc (fw : FileWorker) : IO UInt32 :=
|
||||
fw.exitCode.atomically do
|
||||
match ← get with
|
||||
|
|
@ -170,7 +255,6 @@ section FileWorker
|
|||
-- Process is already dead
|
||||
return exitCode
|
||||
|
||||
|
||||
end FileWorker
|
||||
end FileWorker
|
||||
|
||||
|
|
@ -251,19 +335,20 @@ section ServerM
|
|||
(translation?, data)
|
||||
|
||||
structure ServerContext where
|
||||
hIn : FS.Stream
|
||||
hOut : FS.Stream
|
||||
hLog : FS.Stream
|
||||
hIn : FS.Stream
|
||||
hOut : FS.Stream
|
||||
hLog : FS.Stream
|
||||
/-- Command line arguments. -/
|
||||
args : List String
|
||||
fileWorkersRef : IO.Ref FileWorkerMap
|
||||
args : List String
|
||||
fileWorkersRef : IO.Ref FileWorkerMap
|
||||
/-- We store these to pass them to workers. -/
|
||||
initParams : InitializeParams
|
||||
workerPath : System.FilePath
|
||||
srcSearchPath : System.SearchPath
|
||||
references : IO.Ref References
|
||||
serverRequestData : IO.Ref ServerRequestData
|
||||
importData : IO.Ref ImportData
|
||||
initParams : InitializeParams
|
||||
workerPath : System.FilePath
|
||||
srcSearchPath : System.SearchPath
|
||||
references : IO.Ref References
|
||||
serverRequestData : IO.Ref ServerRequestData
|
||||
importData : IO.Ref ImportData
|
||||
requestData : RequestDataMutex
|
||||
|
||||
abbrev ServerM := ReaderT ServerContext IO
|
||||
|
||||
|
|
@ -281,6 +366,31 @@ section ServerM
|
|||
| return
|
||||
s.references.modify fun refs => refs.removeWorkerRefs module
|
||||
|
||||
def setWorkerState (fw : FileWorker) (s : WorkerState) : ServerM Unit := do
|
||||
fw.state.atomically <| set s
|
||||
|
||||
def getWorkerState (fw : FileWorker) : ServerM WorkerState := do
|
||||
fw.state.atomically get
|
||||
|
||||
def errorPendingRequests (uri : DocumentUri) (code : ErrorCode) (msg : String)
|
||||
: ServerM Unit := do
|
||||
let ctx ← read
|
||||
let pendingRequests ← ctx.requestData.atomically do
|
||||
let d ← get
|
||||
let pendingRequests := d.getRequestQueue uri
|
||||
set <| d.clearWorkerRequestData uri
|
||||
return pendingRequests
|
||||
for (id, _) in pendingRequests do
|
||||
ctx.hOut.writeLspResponseError { id := id, code := code, message := msg }
|
||||
|
||||
def erasePendingRequest (uri : DocumentUri) (id : RequestID) : ServerM Bool := do
|
||||
let ctx ← read
|
||||
ctx.requestData.atomically do
|
||||
let d ← get
|
||||
let wasPending := d.contains uri id
|
||||
set <| d.erase uri id
|
||||
return wasPending
|
||||
|
||||
def log (msg : String) : ServerM Unit := do
|
||||
let st ← read
|
||||
st.hLog.putStrLn msg
|
||||
|
|
@ -318,74 +428,70 @@ section ServerM
|
|||
| Except.error e => WorkerEvent.ioError e
|
||||
where
|
||||
loop : ServerM WorkerEvent := do
|
||||
let uri := fw.doc.uri
|
||||
let o := (←read).hOut
|
||||
let msg ←
|
||||
try
|
||||
fw.stdout.readLspMessage
|
||||
catch err =>
|
||||
catch _ =>
|
||||
let exitCode ← fw.waitForProc
|
||||
-- Remove surviving descendant processes, if any, such as from nested builds.
|
||||
-- On Windows, we instead rely on elan doing this.
|
||||
try fw.proc.kill catch _ => pure ()
|
||||
-- TODO: Wait for process group to finish
|
||||
match exitCode with
|
||||
| 0 =>
|
||||
-- Worker was terminated
|
||||
fw.errorPendingRequests o ErrorCode.contentModified
|
||||
(s!"The file worker for {fw.doc.uri} has been terminated. "
|
||||
++ "Either the header has changed, or the file was closed, "
|
||||
++ " or the server is shutting down.")
|
||||
-- one last message to clear the diagnostics for this file so that stale errors
|
||||
-- do not remain in the editor forever.
|
||||
o.writeLspMessage <| mkPublishDiagnosticsNotification fw.doc #[]
|
||||
return WorkerEvent.terminated
|
||||
| 2 =>
|
||||
return .importsChanged
|
||||
| _ =>
|
||||
-- Worker crashed
|
||||
let (errorCode, errorCausePointer) :=
|
||||
if exitCode = 1 then
|
||||
(ErrorCode.workerExited, "see stderr for exception")
|
||||
else
|
||||
(ErrorCode.workerCrashed, "likely due to a stack overflow or a bug")
|
||||
fw.errorPendingRequests o errorCode
|
||||
s!"Server process for {fw.doc.uri} crashed, {errorCausePointer}."
|
||||
o.writeLspMessage <| mkFileProgressAtPosNotification fw.doc 0 (kind := LeanFileProgressKind.fatalError)
|
||||
return WorkerEvent.crashed err
|
||||
| 0 => return .terminated
|
||||
| 2 => return .importsChanged
|
||||
| _ => return .crashed exitCode
|
||||
|
||||
-- Re. `o.writeLspMessage msg`:
|
||||
-- Writes to Lean I/O channels are atomic, so these won't trample on each other.
|
||||
match msg with
|
||||
| Message.response id _ => do
|
||||
fw.erasePendingRequest id
|
||||
o.writeLspMessage msg
|
||||
| Message.responseError id _ _ _ => do
|
||||
fw.erasePendingRequest id
|
||||
o.writeLspMessage msg
|
||||
| Message.request id method params? =>
|
||||
let globalID ← (←read).serverRequestData.modifyGet
|
||||
(·.trackOutboundRequest fw.doc.uri id)
|
||||
o.writeLspMessage (Message.request globalID method params?)
|
||||
| Message.notification "$/lean/ileanInfoUpdate" params =>
|
||||
if let some params := params then
|
||||
if let Except.ok params := FromJson.fromJson? <| ToJson.toJson params then
|
||||
handleIleanInfoUpdate fw params
|
||||
| Message.notification "$/lean/ileanInfoFinal" params =>
|
||||
if let some params := params then
|
||||
if let Except.ok params := FromJson.fromJson? <| ToJson.toJson params then
|
||||
handleIleanInfoFinal fw params
|
||||
| Message.notification "$/lean/importClosure" params =>
|
||||
if let some params := params then
|
||||
if let Except.ok params := FromJson.fromJson? <| ToJson.toJson params then
|
||||
handleImportClosure fw params
|
||||
| _ =>
|
||||
o.writeLspMessage msg
|
||||
-- When the file worker is terminated by the main thread, the client can immediately launch
|
||||
-- another file worker using `didOpen`. In this case, even when this task and the old file
|
||||
-- worker process haven't terminated yet, we want to avoid emitting diagnostics and responses
|
||||
-- from the old process, so that they can't race with one another in the client.
|
||||
fw.state.atomically do
|
||||
let s ← get
|
||||
if s matches .terminating then
|
||||
return
|
||||
-- Re. `o.writeLspMessage msg`:
|
||||
-- Writes to Lean I/O channels are atomic, so these won't trample on each other.
|
||||
match msg with
|
||||
| Message.response id _ => do
|
||||
let wasPending ← erasePendingRequest uri id
|
||||
-- In the rare scenario that we detect a file worker crash on the main thread and this task
|
||||
-- still has a response to process, it could theoretically happen that we restart the file
|
||||
-- worker, discharge all queued requests to it and then get a duplicate response.
|
||||
-- This ensures that this scenario can't occur, and we only emit responses for requests
|
||||
-- that were still pending.
|
||||
if wasPending then
|
||||
o.writeLspMessage msg
|
||||
| Message.responseError id _ _ _ => do
|
||||
let wasPending ← erasePendingRequest uri id
|
||||
if wasPending then
|
||||
o.writeLspMessage msg
|
||||
| Message.request id method params? =>
|
||||
let globalID ← (←read).serverRequestData.modifyGet
|
||||
(·.trackOutboundRequest fw.doc.uri id)
|
||||
o.writeLspMessage (Message.request globalID method params?)
|
||||
| Message.notification "$/lean/ileanInfoUpdate" params =>
|
||||
if let some params := params then
|
||||
if let Except.ok params := FromJson.fromJson? <| ToJson.toJson params then
|
||||
handleIleanInfoUpdate fw params
|
||||
| Message.notification "$/lean/ileanInfoFinal" params =>
|
||||
if let some params := params then
|
||||
if let Except.ok params := FromJson.fromJson? <| ToJson.toJson params then
|
||||
handleIleanInfoFinal fw params
|
||||
| Message.notification "$/lean/importClosure" params =>
|
||||
if let some params := params then
|
||||
if let Except.ok params := FromJson.fromJson? <| ToJson.toJson params then
|
||||
handleImportClosure fw params
|
||||
| _ =>
|
||||
o.writeLspMessage msg
|
||||
|
||||
loop
|
||||
|
||||
def startFileWorker (m : DocumentMeta) : ServerM Unit := do
|
||||
(← read).hOut.writeLspMessage <| mkFileProgressAtPosNotification m 0
|
||||
let st ← read
|
||||
st.hOut.writeLspMessage <| mkFileProgressAtPosNotification m 0
|
||||
let workerProc ← Process.spawn {
|
||||
toStdioConfig := workerCfg
|
||||
cmd := st.workerPath.toString
|
||||
|
|
@ -394,7 +500,6 @@ section ServerM
|
|||
setsid := true
|
||||
}
|
||||
let exitCode ← Std.Mutex.new none
|
||||
let pendingRequestsRef ← IO.mkRef (RBMap.empty : PendingRequestMap)
|
||||
let initialDependencyBuildMode := m.dependencyBuildMode
|
||||
let updatedDependencyBuildMode :=
|
||||
if initialDependencyBuildMode matches .once then
|
||||
|
|
@ -409,8 +514,7 @@ section ServerM
|
|||
proc := workerProc
|
||||
exitCode
|
||||
commTask := Task.pure WorkerEvent.terminated
|
||||
state := WorkerState.running
|
||||
pendingRequestsRef := pendingRequestsRef
|
||||
state := ← Std.Mutex.new WorkerState.running
|
||||
}
|
||||
let commTask ← forwardMessages fw
|
||||
let fw : FileWorker := { fw with commTask := commTask }
|
||||
|
|
@ -429,74 +533,59 @@ section ServerM
|
|||
}
|
||||
}
|
||||
updateFileWorkers fw
|
||||
let reqQueue ← st.requestData.getRequestQueue m.uri
|
||||
for (_, msg) in reqQueue do
|
||||
try
|
||||
fw.stdin.writeLspMessage msg
|
||||
catch _ =>
|
||||
setWorkerState fw .cannotWrite
|
||||
break
|
||||
|
||||
def terminateFileWorker (uri : DocumentUri) : ServerM Unit := do
|
||||
let some fw ← findFileWorker? uri
|
||||
| return
|
||||
setWorkerState fw .terminating
|
||||
eraseFileWorker uri
|
||||
try
|
||||
errorPendingRequests uri ErrorCode.contentModified
|
||||
s!"The file worker for {uri} has been terminated."
|
||||
-- Clear the diagnostics for this file so that stale errors
|
||||
-- do not remain in the editor forever.
|
||||
(← read).hOut.writeLspMessage <| mkPublishDiagnosticsNotification fw.doc #[]
|
||||
catch _ =>
|
||||
-- Client closed stdout => Still ensure that file worker is terminated
|
||||
pure ()
|
||||
try
|
||||
fw.stdin.writeLspMessage (Message.notification "exit" none)
|
||||
catch _ =>
|
||||
/- The file worker must have crashed just when we were about to terminate it!
|
||||
That's fine - just forget about it then.
|
||||
(on didClose we won't need the crashed file worker anymore,
|
||||
when the header changed we'll start a new one right after
|
||||
anyways and when we're shutting down the server
|
||||
it's over either way.) -/
|
||||
return
|
||||
eraseFileWorker uri
|
||||
-- File worker crashed during termination => Treat it as terminated
|
||||
pure ()
|
||||
|
||||
def handleCrash (uri : DocumentUri) (queuedMsgs : Array JsonRpc.Message) (origin: CrashOrigin) : ServerM Unit := do
|
||||
let some fw ← findFileWorker? uri
|
||||
| return
|
||||
updateFileWorkers { fw with state := WorkerState.crashed queuedMsgs origin }
|
||||
|
||||
def tryDischargeQueuedMessages (uri : DocumentUri) (queuedMsgs : Array JsonRpc.Message) : ServerM Unit := do
|
||||
let some fw ← findFileWorker? uri
|
||||
| throwServerError "Cannot find file worker for '{uri}'."
|
||||
let mut crashedMsgs := #[]
|
||||
-- Try to discharge all queued msgs, tracking the ones that we can't discharge
|
||||
for msg in queuedMsgs do
|
||||
try
|
||||
fw.stdin.writeLspMessage msg
|
||||
catch _ =>
|
||||
crashedMsgs := crashedMsgs.push msg
|
||||
if ¬ crashedMsgs.isEmpty then
|
||||
handleCrash uri crashedMsgs .clientToFileWorkerForwarding
|
||||
|
||||
/-- Tries to write a message, sets the state of the FileWorker to `crashed` if it does not succeed
|
||||
and restarts the file worker if the `crashed` flag was already set. Just logs an error if
|
||||
there is no FileWorker at this `uri`.
|
||||
Messages that couldn't be sent can be queued up via the queueFailedMessage flag and
|
||||
will be discharged after the FileWorker is restarted. -/
|
||||
def tryWriteMessage
|
||||
(uri : DocumentUri)
|
||||
(msg : JsonRpc.Message)
|
||||
(queueFailedMessage := true)
|
||||
(restartCrashedWorker := false)
|
||||
: ServerM Unit := do
|
||||
let some fw ← findFileWorker? uri
|
||||
| return
|
||||
match fw.state with
|
||||
| WorkerState.crashed queuedMsgs _ =>
|
||||
let mut queuedMsgs := queuedMsgs
|
||||
if queueFailedMessage then
|
||||
queuedMsgs := queuedMsgs.push msg
|
||||
if !restartCrashedWorker then
|
||||
if msg matches .request .. then
|
||||
-- If we cannot write a notification to the file worker, it is nonetheless safe to discard
|
||||
-- the notification here because all non-discardable notifications are handled by the watchdog
|
||||
-- itself.
|
||||
(← read).requestData.enqueue uri msg
|
||||
match ← getWorkerState fw with
|
||||
| WorkerState.cannotWrite | WorkerState.terminating =>
|
||||
return
|
||||
| WorkerState.crashed =>
|
||||
if ! (msg matches .notification "textDocument/didChange" ..) then
|
||||
-- Only restart crashed FileWorkers on `didChange`.
|
||||
return
|
||||
-- restart the crashed FileWorker
|
||||
eraseFileWorker uri
|
||||
startFileWorker fw.doc
|
||||
tryDischargeQueuedMessages uri queuedMsgs
|
||||
| WorkerState.running =>
|
||||
let initialQueuedMsgs :=
|
||||
if queueFailedMessage then
|
||||
#[msg]
|
||||
else
|
||||
#[]
|
||||
try
|
||||
fw.stdin.writeLspMessage msg
|
||||
catch _ =>
|
||||
handleCrash uri initialQueuedMsgs .clientToFileWorkerForwarding
|
||||
setWorkerState fw .cannotWrite
|
||||
|
||||
/--
|
||||
Sends a notification to the file worker identified by `uri` that its dependency `staleDependency`
|
||||
|
|
@ -508,7 +597,7 @@ section ServerM
|
|||
staleDependency := staleDependency
|
||||
: LeanStaleDependencyParams
|
||||
}
|
||||
tryWriteMessage uri notification (queueFailedMessage := false)
|
||||
tryWriteMessage uri notification
|
||||
end ServerM
|
||||
|
||||
section RequestHandling
|
||||
|
|
@ -788,9 +877,7 @@ section NotificationHandling
|
|||
let newDoc : DocumentMeta := ⟨doc.uri, newVersion, newDocText, oldDoc.dependencyBuildMode⟩
|
||||
updateFileWorkers { fw with doc := newDoc }
|
||||
let notification := Notification.mk "textDocument/didChange" p
|
||||
-- Don't queue failed `didChange` notifications because we already accumulate them in the
|
||||
-- document and hand the updated document to the file worker when restarting it.
|
||||
tryWriteMessage doc.uri notification (restartCrashedWorker := true) (queueFailedMessage := false)
|
||||
tryWriteMessage doc.uri notification
|
||||
|
||||
/--
|
||||
When a file is saved, notifies all file workers for files that depend on this file that this
|
||||
|
|
@ -843,16 +930,14 @@ section NotificationHandling
|
|||
| e => throw e
|
||||
|
||||
def handleCancelRequest (p : CancelParams) : ServerM Unit := do
|
||||
let fileWorkers ← (←read).fileWorkersRef.get
|
||||
for ⟨uri, fw⟩ in fileWorkers do
|
||||
-- Cancelled requests still require a response, so they can't be removed
|
||||
-- from the pending requests map.
|
||||
if (← fw.pendingRequestsRef.get).contains p.id then
|
||||
tryWriteMessage uri (Notification.mk "$/cancelRequest" p) (queueFailedMessage := false)
|
||||
let ctx ← read
|
||||
let some uri ← ctx.requestData.getUri? p.id
|
||||
| return
|
||||
tryWriteMessage uri (Notification.mk "$/cancelRequest" p)
|
||||
|
||||
def forwardNotification {α : Type} [ToJson α] [FileSource α] (method : String) (params : α)
|
||||
: ServerM Unit :=
|
||||
tryWriteMessage (fileSource params) (Notification.mk method params) (queueFailedMessage := true)
|
||||
tryWriteMessage (fileSource params) (Notification.mk method params)
|
||||
end NotificationHandling
|
||||
|
||||
section MessageHandling
|
||||
|
|
@ -874,20 +959,18 @@ section MessageHandling
|
|||
(←read).hOut.writeLspResponseError <| e.toLspResponseError id
|
||||
return
|
||||
| Except.ok uri => pure uri
|
||||
let some fw ← findFileWorker? uri
|
||||
if (← findFileWorker? uri).isNone then
|
||||
/- Clients may send requests to closed files, which we respond to with an error.
|
||||
For example, VSCode sometimes sends requests just after closing a file,
|
||||
and RPC clients may also do so, e.g. due to remaining timers. -/
|
||||
| do
|
||||
(←read).hOut.writeLspResponseError
|
||||
{ id := id
|
||||
/- Some clients (VSCode) also send requests *before* opening a file. We reply
|
||||
with `contentModified` as that does not display a "request failed" popup. -/
|
||||
code := ErrorCode.contentModified
|
||||
message := s!"Cannot process request to closed file '{uri}'" }
|
||||
return
|
||||
(←read).hOut.writeLspResponseError
|
||||
{ id := id
|
||||
/- Some clients (VSCode) also send requests *before* opening a file. We reply
|
||||
with `contentModified` as that does not display a "request failed" popup. -/
|
||||
code := ErrorCode.contentModified
|
||||
message := s!"Cannot process request to closed file '{uri}'" }
|
||||
return
|
||||
let r := Request.mk id method params
|
||||
fw.pendingRequestsRef.modify (·.insert id r)
|
||||
tryWriteMessage uri r
|
||||
|
||||
def handleRequest (id : RequestID) (method : String) (params : Json) : ServerM Unit := do
|
||||
|
|
@ -951,8 +1034,6 @@ section MessageHandling
|
|||
handle DidChangeWatchedFilesParams handleDidChangeWatchedFiles
|
||||
| "$/cancelRequest" =>
|
||||
handle CancelParams handleCancelRequest
|
||||
| "$/lean/rpc/connect" =>
|
||||
handle RpcConnectParams (forwardNotification method)
|
||||
| "$/lean/rpc/release" =>
|
||||
handle RpcReleaseParams (forwardNotification method)
|
||||
| "$/lean/rpc/keepAlive" =>
|
||||
|
|
@ -982,7 +1063,7 @@ section MainLoop
|
|||
def shutdown : ServerM Unit := do
|
||||
let fileWorkers ← (←read).fileWorkersRef.get
|
||||
for ⟨uri, _⟩ in fileWorkers do
|
||||
terminateFileWorker uri
|
||||
try terminateFileWorker uri catch _ => pure ()
|
||||
for ⟨_, fw⟩ in fileWorkers do
|
||||
-- TODO: Wait for process group to finish instead
|
||||
try let _ ← fw.killProcAndWait catch _ => pure ()
|
||||
|
|
@ -1008,19 +1089,10 @@ section MainLoop
|
|||
let workers ← st.fileWorkersRef.get
|
||||
let mut workerTasks := #[]
|
||||
for (_, fw) in workers do
|
||||
-- When the forwarding task crashes, its return value will be stuck at
|
||||
-- `WorkerEvent.crashed _`.
|
||||
-- We want to handle this event only once, not over and over again,
|
||||
-- so once the state becomes `WorkerState.crashed _ .fileWorkerToClientForwarding`
|
||||
-- as a result of `WorkerEvent.crashed _`, we stop handling this event until
|
||||
-- eventually the file worker is restarted by a notification from the client.
|
||||
-- We do not want to filter the forwarding task in case of
|
||||
-- `WorkerState.crashed _ .clientToFileWorkerForwarding`, since the forwarding task
|
||||
-- exit code may still contain valuable information in this case (e.g. that the imports changed).
|
||||
if !(fw.state matches WorkerState.crashed _ .fileWorkerToClientForwarding) then
|
||||
if !((← getWorkerState fw) matches WorkerState.crashed) then
|
||||
workerTasks := workerTasks.push <| fw.commTask.mapCheap (ServerEvent.workerEvent fw)
|
||||
|
||||
let ev ← ServerTask.waitAny (clientTask :: workerTasks.toList)
|
||||
let ev ← ServerTask.waitAny (workerTasks.toList ++ [clientTask]) (by simp)
|
||||
match ev with
|
||||
| ServerEvent.clientMsg msg =>
|
||||
match msg with
|
||||
|
|
@ -1042,20 +1114,27 @@ section MainLoop
|
|||
| _ => throwServerError "Got invalid JSON-RPC message"
|
||||
| ServerEvent.clientError e => throw e
|
||||
| ServerEvent.workerEvent fw ev =>
|
||||
let doc := fw.doc
|
||||
let uri := doc.uri
|
||||
match ev with
|
||||
| WorkerEvent.ioError e =>
|
||||
throwServerError s!"IO error while processing events for {fw.doc.uri}: {e}"
|
||||
| WorkerEvent.crashed _ =>
|
||||
handleCrash fw.doc.uri fw.queuedMsgs .fileWorkerToClientForwarding
|
||||
throwServerError s!"IO error while processing events for {uri}: {e}"
|
||||
| WorkerEvent.crashed exitCode =>
|
||||
let (errorCode, errorCausePointer) :=
|
||||
if exitCode = 1 then
|
||||
(ErrorCode.workerExited, "see stderr for exception")
|
||||
else
|
||||
(ErrorCode.workerCrashed, "likely due to a stack overflow or a bug")
|
||||
errorPendingRequests uri errorCode
|
||||
s!"Server process for {uri} crashed, {errorCausePointer}."
|
||||
(← read).hOut.writeLspMessage <| mkFileProgressAtPosNotification doc 0 (kind := LeanFileProgressKind.fatalError)
|
||||
setWorkerState fw .crashed
|
||||
mainLoop clientTask
|
||||
| WorkerEvent.terminated =>
|
||||
throwServerError <| "Internal server error: got termination event for worker that "
|
||||
++ "should have been removed"
|
||||
throwServerError
|
||||
"Internal server error: Got termination event for worker that should have been removed"
|
||||
| .importsChanged =>
|
||||
let uri := fw.doc.uri
|
||||
let queuedMsgs := fw.queuedMsgs
|
||||
startFileWorker fw.doc
|
||||
tryDischargeQueuedMessages uri queuedMsgs
|
||||
startFileWorker doc
|
||||
mainLoop clientTask
|
||||
end MainLoop
|
||||
|
||||
|
|
@ -1132,7 +1211,7 @@ def initAndRunWatchdogAux : ServerM Unit := do
|
|||
catch _ =>
|
||||
/-
|
||||
NOTE(WN): It looks like instead of sending the `exit` notification,
|
||||
VSCode sometimes just closes the stream. In that case, pretend we got an `exit`.
|
||||
VS Code sometimes just closes the stream. In that case, pretend we got an `exit`.
|
||||
-/
|
||||
pure (Message.notification "exit" none)
|
||||
match msg with
|
||||
|
|
@ -1191,6 +1270,7 @@ def initAndRunWatchdog (args : List String) (i o e : FS.Stream) : IO Unit := do
|
|||
freshServerRequestID := 0
|
||||
}
|
||||
let importData ← IO.mkRef ⟨RBMap.empty, RBMap.empty⟩
|
||||
let requestData ← RequestDataMutex.new
|
||||
let i ← maybeTee "wdIn.txt" false i
|
||||
let o ← maybeTee "wdOut.txt" true o
|
||||
let e ← maybeTee "wdErr.txt" true e
|
||||
|
|
@ -1218,6 +1298,7 @@ def initAndRunWatchdog (args : List String) (i o e : FS.Stream) : IO Unit := do
|
|||
references
|
||||
serverRequestData
|
||||
importData
|
||||
requestData
|
||||
: ServerContext
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue