diff --git a/src/Lean/Server/FileWorker/SemanticHighlighting.lean b/src/Lean/Server/FileWorker/SemanticHighlighting.lean index dd9d9c574a..19b70c213d 100644 --- a/src/Lean/Server/FileWorker/SemanticHighlighting.lean +++ b/src/Lean/Server/FileWorker/SemanticHighlighting.lean @@ -159,12 +159,13 @@ structure SemanticTokensState where /-- Computes all semantic tokens for the document. -/ def handleSemanticTokensFull (_ : SemanticTokensParams) (_ : SemanticTokensState) : RequestM (LspResponse SemanticTokens × SemanticTokensState) := do + let ctx ← read let doc ← readDoc -- Only grabs the finished prefix so that we do not need to wait for elaboration to complete -- for the full file before sending a response. This means that the response will be incomplete, -- which we mitigate by regularly sending `workspace/semanticTokens/refresh` requests in the -- `FileWorker` to tell the client to re-compute the semantic tokens. - let (snaps, _, isComplete) ← doc.cmdSnaps.getFinishedPrefix + let (snaps, _, isComplete) ← doc.cmdSnaps.getFinishedPrefixWithTimeout 3000 (cancelTk? := ctx.cancelTk.cancellationTask) let response ← computeSemanticTokens doc 0 none snaps return ({ response, isComplete }, ⟨⟩) diff --git a/src/Lean/Server/Watchdog.lean b/src/Lean/Server/Watchdog.lean index 87b2f35f8b..ab1d08c479 100644 --- a/src/Lean/Server/Watchdog.lean +++ b/src/Lean/Server/Watchdog.lean @@ -7,6 +7,7 @@ Authors: Marc Huisinga, Wojciech Nawrocki prelude import Init.System.IO import Std.Sync.Mutex +import Std.Data.TreeMap import Init.Data.ByteArray import Lean.Data.RBMap @@ -59,7 +60,6 @@ Moreover, we don't implement the full protocol at this level: - After `initialize`, the watchdog sends the corresponding `didOpen` notification with the full current state of the file. No additional `didOpen` notifications will be forwarded to the worker process. -- `$/cancelRequest` notifications are forwarded to all file workers. - File workers are always terminated with an `exit` notification, without previously receiving a `shutdown` request. Similarly, they never receive a `didClose` notification. @@ -89,41 +89,141 @@ section Utils inductive WorkerEvent where | terminated | importsChanged - | crashed (e : IO.Error) + | crashed (exitCode : UInt32) | ioError (e : IO.Error) - inductive CrashOrigin - | fileWorkerToClientForwarding - | clientToFileWorkerForwarding - inductive WorkerState where - /-- The watchdog can detect a crashed file worker in two places: When trying to send a message - to the file worker and when reading a request reply. - In the latter case, the forwarding task terminates and delegates a `crashed` event to the - main task. Then, in both cases, the file worker has its state set to `crashed` and requests - that are in-flight are errored. Upon receiving the next packet for that file worker, the file - worker is restarted and the packet is forwarded to it. If the crash was detected while writing - a packet, we queue that packet until the next packet for the file worker arrives. -/ - | crashed (queuedMsgs : Array JsonRpc.Message) (origin : CrashOrigin) + | crashed + | cannotWrite + | terminating | running - abbrev PendingRequestMap := RBMap RequestID JsonRpc.Message compare + structure RequestQueueMap where + i : Nat + reqs : Std.TreeMap RequestID (Nat × JsonRpc.Message) + queue : Std.TreeMap Nat (RequestID × JsonRpc.Message) + deriving Inhabited + + namespace RequestQueueMap + def empty : RequestQueueMap where + i := 0 + reqs := ∅ + queue := ∅ + + instance : EmptyCollection RequestQueueMap where + emptyCollection := .empty + + def enqueue (m : RequestQueueMap) (req : JsonRpc.Message) : RequestQueueMap := Id.run do + let .request id .. := req + | panic! "Got non-request in `RequestQueueMap.enqueue`." + { + i := m.i + 1 + reqs := m.reqs.insert id (m.i, req) + queue := m.queue.insert m.i (id, req) + } + + def erase (m : RequestQueueMap) (id : RequestID) : RequestQueueMap := Id.run do + let some (i, _) := m.reqs.get? id + | return m + return { m with + reqs := m.reqs.erase id + queue := m.queue.erase i + } + + def contains (m : RequestQueueMap) (id : RequestID) : Bool := + m.reqs.contains id + + instance : ForIn m RequestQueueMap (RequestID × JsonRpc.Message) where + forIn map init f := map.queue.forIn (fun _ a b => f a b) init + end RequestQueueMap + + structure RequestData where + requestQueues : Std.TreeMap DocumentUri RequestQueueMap + uriByRequest : Std.TreeMap RequestID DocumentUri + deriving Inhabited + + namespace RequestData + + def empty : RequestData := { + requestQueues := ∅ + uriByRequest := ∅ + } + + def clearWorkerRequestData (d : RequestData) (uri : DocumentUri) : RequestData := Id.run do + let some requestQueue := d.requestQueues.get? uri + | return d + let mut uriByRequest := d.uriByRequest + for (id, _) in requestQueue do + uriByRequest := uriByRequest.erase id + let requestQueues := d.requestQueues.erase uri + return { + requestQueues, + uriByRequest + : RequestData + } + + def enqueue (d : RequestData) (uri : DocumentUri) (req : JsonRpc.Message) : RequestData := Id.run do + let .request id .. := req + | panic! "Got non-request in `RequestData.enqueue`." + return { + requestQueues := d.requestQueues.insertIfNew uri ∅ |>.modify uri (·.enqueue req) + uriByRequest := d.uriByRequest.insert id uri + } + + def erase (d : RequestData) (uri : DocumentUri) (id : RequestID) : RequestData where + requestQueues := d.requestQueues.modify uri (·.erase id) + uriByRequest := d.uriByRequest.erase id + + def contains (d : RequestData) (uri : DocumentUri) (id : RequestID) : Bool := Id.run do + let some uri' := d.uriByRequest.get? id + | return false + return uri == uri' + + def getUri? (d : RequestData) (id : RequestID) : Option DocumentUri := + d.uriByRequest.get? id + + def getRequestQueue (d : RequestData) (uri : DocumentUri) : RequestQueueMap := + d.requestQueues.get? uri |>.getD ∅ + + end RequestData + + abbrev RequestDataMutex := Std.Mutex RequestData + + namespace RequestDataMutex + + def new : BaseIO RequestDataMutex := + Std.Mutex.new .empty + + def clearWorkerRequestData (m : RequestDataMutex) (uri : DocumentUri) : BaseIO Unit := + m.atomically do modify (·.clearWorkerRequestData uri) + + def enqueue (m : RequestDataMutex) (uri : DocumentUri) (req : JsonRpc.Message) : BaseIO Unit := + m.atomically do modify (·.enqueue uri req) + + def erase (m : RequestDataMutex) (uri : DocumentUri) (id : RequestID) : BaseIO Unit := + m.atomically do modify (·.erase uri id) + + def contains (m : RequestDataMutex) (uri : DocumentUri) (id : RequestID) : BaseIO Bool := + m.atomically do return (← get).contains uri id + + def getUri? (m : RequestDataMutex) (id : RequestID) : BaseIO (Option DocumentUri) := + m.atomically do return (← get).getUri? id + + def getRequestQueue (m : RequestDataMutex) (uri : DocumentUri) : BaseIO RequestQueueMap := + m.atomically do return (← get).getRequestQueue uri + + end RequestDataMutex + end Utils section FileWorker + structure FileWorker where - doc : DocumentMeta - proc : Process.Child workerCfg - exitCode : Std.Mutex (Option UInt32) - commTask : ServerTask WorkerEvent - state : WorkerState - -- This should not be mutated outside of namespace FileWorker, - -- as it is used as shared mutable state - /-- The pending requests map contains all requests that have been received from the LSP client, - but were not answered yet. - We need them for forwarding cancellation requests to the correct worker as well as cleanly - aborting requests on worker crashes. -/ - pendingRequestsRef : IO.Ref PendingRequestMap + doc : DocumentMeta + proc : Process.Child workerCfg + exitCode : Std.Mutex (Option UInt32) + commTask : ServerTask WorkerEvent + state : Std.Mutex WorkerState namespace FileWorker @@ -133,21 +233,6 @@ section FileWorker def stdout (fw : FileWorker) : FS.Stream := FS.Stream.ofHandle fw.proc.stdout - def erasePendingRequest (fw : FileWorker) (id : RequestID) : IO Unit := - fw.pendingRequestsRef.modify fun pendingRequests => pendingRequests.erase id - - def errorPendingRequests (fw : FileWorker) (hError : FS.Stream) (code : ErrorCode) (msg : String) - : IO Unit := do - let pendingRequests ← fw.pendingRequestsRef.modifyGet - fun pendingRequests => (pendingRequests, RBMap.empty) - for ⟨id, _⟩ in pendingRequests do - hError.writeLspResponseError { id := id, code := code, message := msg } - - def queuedMsgs (fw : FileWorker) : Array JsonRpc.Message := - match fw.state with - | .running => #[] - | .crashed queuedMsgs _ => queuedMsgs - def waitForProc (fw : FileWorker) : IO UInt32 := fw.exitCode.atomically do match ← get with @@ -170,7 +255,6 @@ section FileWorker -- Process is already dead return exitCode - end FileWorker end FileWorker @@ -251,19 +335,20 @@ section ServerM (translation?, data) structure ServerContext where - hIn : FS.Stream - hOut : FS.Stream - hLog : FS.Stream + hIn : FS.Stream + hOut : FS.Stream + hLog : FS.Stream /-- Command line arguments. -/ - args : List String - fileWorkersRef : IO.Ref FileWorkerMap + args : List String + fileWorkersRef : IO.Ref FileWorkerMap /-- We store these to pass them to workers. -/ - initParams : InitializeParams - workerPath : System.FilePath - srcSearchPath : System.SearchPath - references : IO.Ref References - serverRequestData : IO.Ref ServerRequestData - importData : IO.Ref ImportData + initParams : InitializeParams + workerPath : System.FilePath + srcSearchPath : System.SearchPath + references : IO.Ref References + serverRequestData : IO.Ref ServerRequestData + importData : IO.Ref ImportData + requestData : RequestDataMutex abbrev ServerM := ReaderT ServerContext IO @@ -281,6 +366,31 @@ section ServerM | return s.references.modify fun refs => refs.removeWorkerRefs module + def setWorkerState (fw : FileWorker) (s : WorkerState) : ServerM Unit := do + fw.state.atomically <| set s + + def getWorkerState (fw : FileWorker) : ServerM WorkerState := do + fw.state.atomically get + + def errorPendingRequests (uri : DocumentUri) (code : ErrorCode) (msg : String) + : ServerM Unit := do + let ctx ← read + let pendingRequests ← ctx.requestData.atomically do + let d ← get + let pendingRequests := d.getRequestQueue uri + set <| d.clearWorkerRequestData uri + return pendingRequests + for (id, _) in pendingRequests do + ctx.hOut.writeLspResponseError { id := id, code := code, message := msg } + + def erasePendingRequest (uri : DocumentUri) (id : RequestID) : ServerM Bool := do + let ctx ← read + ctx.requestData.atomically do + let d ← get + let wasPending := d.contains uri id + set <| d.erase uri id + return wasPending + def log (msg : String) : ServerM Unit := do let st ← read st.hLog.putStrLn msg @@ -318,74 +428,70 @@ section ServerM | Except.error e => WorkerEvent.ioError e where loop : ServerM WorkerEvent := do + let uri := fw.doc.uri let o := (←read).hOut let msg ← try fw.stdout.readLspMessage - catch err => + catch _ => let exitCode ← fw.waitForProc -- Remove surviving descendant processes, if any, such as from nested builds. -- On Windows, we instead rely on elan doing this. try fw.proc.kill catch _ => pure () -- TODO: Wait for process group to finish match exitCode with - | 0 => - -- Worker was terminated - fw.errorPendingRequests o ErrorCode.contentModified - (s!"The file worker for {fw.doc.uri} has been terminated. " - ++ "Either the header has changed, or the file was closed, " - ++ " or the server is shutting down.") - -- one last message to clear the diagnostics for this file so that stale errors - -- do not remain in the editor forever. - o.writeLspMessage <| mkPublishDiagnosticsNotification fw.doc #[] - return WorkerEvent.terminated - | 2 => - return .importsChanged - | _ => - -- Worker crashed - let (errorCode, errorCausePointer) := - if exitCode = 1 then - (ErrorCode.workerExited, "see stderr for exception") - else - (ErrorCode.workerCrashed, "likely due to a stack overflow or a bug") - fw.errorPendingRequests o errorCode - s!"Server process for {fw.doc.uri} crashed, {errorCausePointer}." - o.writeLspMessage <| mkFileProgressAtPosNotification fw.doc 0 (kind := LeanFileProgressKind.fatalError) - return WorkerEvent.crashed err + | 0 => return .terminated + | 2 => return .importsChanged + | _ => return .crashed exitCode - -- Re. `o.writeLspMessage msg`: - -- Writes to Lean I/O channels are atomic, so these won't trample on each other. - match msg with - | Message.response id _ => do - fw.erasePendingRequest id - o.writeLspMessage msg - | Message.responseError id _ _ _ => do - fw.erasePendingRequest id - o.writeLspMessage msg - | Message.request id method params? => - let globalID ← (←read).serverRequestData.modifyGet - (·.trackOutboundRequest fw.doc.uri id) - o.writeLspMessage (Message.request globalID method params?) - | Message.notification "$/lean/ileanInfoUpdate" params => - if let some params := params then - if let Except.ok params := FromJson.fromJson? <| ToJson.toJson params then - handleIleanInfoUpdate fw params - | Message.notification "$/lean/ileanInfoFinal" params => - if let some params := params then - if let Except.ok params := FromJson.fromJson? <| ToJson.toJson params then - handleIleanInfoFinal fw params - | Message.notification "$/lean/importClosure" params => - if let some params := params then - if let Except.ok params := FromJson.fromJson? <| ToJson.toJson params then - handleImportClosure fw params - | _ => - o.writeLspMessage msg + -- When the file worker is terminated by the main thread, the client can immediately launch + -- another file worker using `didOpen`. In this case, even when this task and the old file + -- worker process haven't terminated yet, we want to avoid emitting diagnostics and responses + -- from the old process, so that they can't race with one another in the client. + fw.state.atomically do + let s ← get + if s matches .terminating then + return + -- Re. `o.writeLspMessage msg`: + -- Writes to Lean I/O channels are atomic, so these won't trample on each other. + match msg with + | Message.response id _ => do + let wasPending ← erasePendingRequest uri id + -- In the rare scenario that we detect a file worker crash on the main thread and this task + -- still has a response to process, it could theoretically happen that we restart the file + -- worker, discharge all queued requests to it and then get a duplicate response. + -- This ensures that this scenario can't occur, and we only emit responses for requests + -- that were still pending. + if wasPending then + o.writeLspMessage msg + | Message.responseError id _ _ _ => do + let wasPending ← erasePendingRequest uri id + if wasPending then + o.writeLspMessage msg + | Message.request id method params? => + let globalID ← (←read).serverRequestData.modifyGet + (·.trackOutboundRequest fw.doc.uri id) + o.writeLspMessage (Message.request globalID method params?) + | Message.notification "$/lean/ileanInfoUpdate" params => + if let some params := params then + if let Except.ok params := FromJson.fromJson? <| ToJson.toJson params then + handleIleanInfoUpdate fw params + | Message.notification "$/lean/ileanInfoFinal" params => + if let some params := params then + if let Except.ok params := FromJson.fromJson? <| ToJson.toJson params then + handleIleanInfoFinal fw params + | Message.notification "$/lean/importClosure" params => + if let some params := params then + if let Except.ok params := FromJson.fromJson? <| ToJson.toJson params then + handleImportClosure fw params + | _ => + o.writeLspMessage msg loop def startFileWorker (m : DocumentMeta) : ServerM Unit := do - (← read).hOut.writeLspMessage <| mkFileProgressAtPosNotification m 0 let st ← read + st.hOut.writeLspMessage <| mkFileProgressAtPosNotification m 0 let workerProc ← Process.spawn { toStdioConfig := workerCfg cmd := st.workerPath.toString @@ -394,7 +500,6 @@ section ServerM setsid := true } let exitCode ← Std.Mutex.new none - let pendingRequestsRef ← IO.mkRef (RBMap.empty : PendingRequestMap) let initialDependencyBuildMode := m.dependencyBuildMode let updatedDependencyBuildMode := if initialDependencyBuildMode matches .once then @@ -409,8 +514,7 @@ section ServerM proc := workerProc exitCode commTask := Task.pure WorkerEvent.terminated - state := WorkerState.running - pendingRequestsRef := pendingRequestsRef + state := ← Std.Mutex.new WorkerState.running } let commTask ← forwardMessages fw let fw : FileWorker := { fw with commTask := commTask } @@ -429,74 +533,59 @@ section ServerM } } updateFileWorkers fw + let reqQueue ← st.requestData.getRequestQueue m.uri + for (_, msg) in reqQueue do + try + fw.stdin.writeLspMessage msg + catch _ => + setWorkerState fw .cannotWrite + break def terminateFileWorker (uri : DocumentUri) : ServerM Unit := do let some fw ← findFileWorker? uri | return + setWorkerState fw .terminating + eraseFileWorker uri + try + errorPendingRequests uri ErrorCode.contentModified + s!"The file worker for {uri} has been terminated." + -- Clear the diagnostics for this file so that stale errors + -- do not remain in the editor forever. + (← read).hOut.writeLspMessage <| mkPublishDiagnosticsNotification fw.doc #[] + catch _ => + -- Client closed stdout => Still ensure that file worker is terminated + pure () try fw.stdin.writeLspMessage (Message.notification "exit" none) catch _ => - /- The file worker must have crashed just when we were about to terminate it! - That's fine - just forget about it then. - (on didClose we won't need the crashed file worker anymore, - when the header changed we'll start a new one right after - anyways and when we're shutting down the server - it's over either way.) -/ - return - eraseFileWorker uri + -- File worker crashed during termination => Treat it as terminated + pure () - def handleCrash (uri : DocumentUri) (queuedMsgs : Array JsonRpc.Message) (origin: CrashOrigin) : ServerM Unit := do - let some fw ← findFileWorker? uri - | return - updateFileWorkers { fw with state := WorkerState.crashed queuedMsgs origin } - - def tryDischargeQueuedMessages (uri : DocumentUri) (queuedMsgs : Array JsonRpc.Message) : ServerM Unit := do - let some fw ← findFileWorker? uri - | throwServerError "Cannot find file worker for '{uri}'." - let mut crashedMsgs := #[] - -- Try to discharge all queued msgs, tracking the ones that we can't discharge - for msg in queuedMsgs do - try - fw.stdin.writeLspMessage msg - catch _ => - crashedMsgs := crashedMsgs.push msg - if ¬ crashedMsgs.isEmpty then - handleCrash uri crashedMsgs .clientToFileWorkerForwarding - - /-- Tries to write a message, sets the state of the FileWorker to `crashed` if it does not succeed - and restarts the file worker if the `crashed` flag was already set. Just logs an error if - there is no FileWorker at this `uri`. - Messages that couldn't be sent can be queued up via the queueFailedMessage flag and - will be discharged after the FileWorker is restarted. -/ def tryWriteMessage (uri : DocumentUri) (msg : JsonRpc.Message) - (queueFailedMessage := true) - (restartCrashedWorker := false) : ServerM Unit := do let some fw ← findFileWorker? uri | return - match fw.state with - | WorkerState.crashed queuedMsgs _ => - let mut queuedMsgs := queuedMsgs - if queueFailedMessage then - queuedMsgs := queuedMsgs.push msg - if !restartCrashedWorker then + if msg matches .request .. then + -- If we cannot write a notification to the file worker, it is nonetheless safe to discard + -- the notification here because all non-discardable notifications are handled by the watchdog + -- itself. + (← read).requestData.enqueue uri msg + match ← getWorkerState fw with + | WorkerState.cannotWrite | WorkerState.terminating => + return + | WorkerState.crashed => + if ! (msg matches .notification "textDocument/didChange" ..) then + -- Only restart crashed FileWorkers on `didChange`. return - -- restart the crashed FileWorker eraseFileWorker uri startFileWorker fw.doc - tryDischargeQueuedMessages uri queuedMsgs | WorkerState.running => - let initialQueuedMsgs := - if queueFailedMessage then - #[msg] - else - #[] try fw.stdin.writeLspMessage msg catch _ => - handleCrash uri initialQueuedMsgs .clientToFileWorkerForwarding + setWorkerState fw .cannotWrite /-- Sends a notification to the file worker identified by `uri` that its dependency `staleDependency` @@ -508,7 +597,7 @@ section ServerM staleDependency := staleDependency : LeanStaleDependencyParams } - tryWriteMessage uri notification (queueFailedMessage := false) + tryWriteMessage uri notification end ServerM section RequestHandling @@ -788,9 +877,7 @@ section NotificationHandling let newDoc : DocumentMeta := ⟨doc.uri, newVersion, newDocText, oldDoc.dependencyBuildMode⟩ updateFileWorkers { fw with doc := newDoc } let notification := Notification.mk "textDocument/didChange" p - -- Don't queue failed `didChange` notifications because we already accumulate them in the - -- document and hand the updated document to the file worker when restarting it. - tryWriteMessage doc.uri notification (restartCrashedWorker := true) (queueFailedMessage := false) + tryWriteMessage doc.uri notification /-- When a file is saved, notifies all file workers for files that depend on this file that this @@ -843,16 +930,14 @@ section NotificationHandling | e => throw e def handleCancelRequest (p : CancelParams) : ServerM Unit := do - let fileWorkers ← (←read).fileWorkersRef.get - for ⟨uri, fw⟩ in fileWorkers do - -- Cancelled requests still require a response, so they can't be removed - -- from the pending requests map. - if (← fw.pendingRequestsRef.get).contains p.id then - tryWriteMessage uri (Notification.mk "$/cancelRequest" p) (queueFailedMessage := false) + let ctx ← read + let some uri ← ctx.requestData.getUri? p.id + | return + tryWriteMessage uri (Notification.mk "$/cancelRequest" p) def forwardNotification {α : Type} [ToJson α] [FileSource α] (method : String) (params : α) : ServerM Unit := - tryWriteMessage (fileSource params) (Notification.mk method params) (queueFailedMessage := true) + tryWriteMessage (fileSource params) (Notification.mk method params) end NotificationHandling section MessageHandling @@ -874,20 +959,18 @@ section MessageHandling (←read).hOut.writeLspResponseError <| e.toLspResponseError id return | Except.ok uri => pure uri - let some fw ← findFileWorker? uri + if (← findFileWorker? uri).isNone then /- Clients may send requests to closed files, which we respond to with an error. For example, VSCode sometimes sends requests just after closing a file, and RPC clients may also do so, e.g. due to remaining timers. -/ - | do - (←read).hOut.writeLspResponseError - { id := id - /- Some clients (VSCode) also send requests *before* opening a file. We reply - with `contentModified` as that does not display a "request failed" popup. -/ - code := ErrorCode.contentModified - message := s!"Cannot process request to closed file '{uri}'" } - return + (←read).hOut.writeLspResponseError + { id := id + /- Some clients (VSCode) also send requests *before* opening a file. We reply + with `contentModified` as that does not display a "request failed" popup. -/ + code := ErrorCode.contentModified + message := s!"Cannot process request to closed file '{uri}'" } + return let r := Request.mk id method params - fw.pendingRequestsRef.modify (·.insert id r) tryWriteMessage uri r def handleRequest (id : RequestID) (method : String) (params : Json) : ServerM Unit := do @@ -951,8 +1034,6 @@ section MessageHandling handle DidChangeWatchedFilesParams handleDidChangeWatchedFiles | "$/cancelRequest" => handle CancelParams handleCancelRequest - | "$/lean/rpc/connect" => - handle RpcConnectParams (forwardNotification method) | "$/lean/rpc/release" => handle RpcReleaseParams (forwardNotification method) | "$/lean/rpc/keepAlive" => @@ -982,7 +1063,7 @@ section MainLoop def shutdown : ServerM Unit := do let fileWorkers ← (←read).fileWorkersRef.get for ⟨uri, _⟩ in fileWorkers do - terminateFileWorker uri + try terminateFileWorker uri catch _ => pure () for ⟨_, fw⟩ in fileWorkers do -- TODO: Wait for process group to finish instead try let _ ← fw.killProcAndWait catch _ => pure () @@ -1008,19 +1089,10 @@ section MainLoop let workers ← st.fileWorkersRef.get let mut workerTasks := #[] for (_, fw) in workers do - -- When the forwarding task crashes, its return value will be stuck at - -- `WorkerEvent.crashed _`. - -- We want to handle this event only once, not over and over again, - -- so once the state becomes `WorkerState.crashed _ .fileWorkerToClientForwarding` - -- as a result of `WorkerEvent.crashed _`, we stop handling this event until - -- eventually the file worker is restarted by a notification from the client. - -- We do not want to filter the forwarding task in case of - -- `WorkerState.crashed _ .clientToFileWorkerForwarding`, since the forwarding task - -- exit code may still contain valuable information in this case (e.g. that the imports changed). - if !(fw.state matches WorkerState.crashed _ .fileWorkerToClientForwarding) then + if !((← getWorkerState fw) matches WorkerState.crashed) then workerTasks := workerTasks.push <| fw.commTask.mapCheap (ServerEvent.workerEvent fw) - let ev ← ServerTask.waitAny (clientTask :: workerTasks.toList) + let ev ← ServerTask.waitAny (workerTasks.toList ++ [clientTask]) (by simp) match ev with | ServerEvent.clientMsg msg => match msg with @@ -1042,20 +1114,27 @@ section MainLoop | _ => throwServerError "Got invalid JSON-RPC message" | ServerEvent.clientError e => throw e | ServerEvent.workerEvent fw ev => + let doc := fw.doc + let uri := doc.uri match ev with | WorkerEvent.ioError e => - throwServerError s!"IO error while processing events for {fw.doc.uri}: {e}" - | WorkerEvent.crashed _ => - handleCrash fw.doc.uri fw.queuedMsgs .fileWorkerToClientForwarding + throwServerError s!"IO error while processing events for {uri}: {e}" + | WorkerEvent.crashed exitCode => + let (errorCode, errorCausePointer) := + if exitCode = 1 then + (ErrorCode.workerExited, "see stderr for exception") + else + (ErrorCode.workerCrashed, "likely due to a stack overflow or a bug") + errorPendingRequests uri errorCode + s!"Server process for {uri} crashed, {errorCausePointer}." + (← read).hOut.writeLspMessage <| mkFileProgressAtPosNotification doc 0 (kind := LeanFileProgressKind.fatalError) + setWorkerState fw .crashed mainLoop clientTask | WorkerEvent.terminated => - throwServerError <| "Internal server error: got termination event for worker that " - ++ "should have been removed" + throwServerError + "Internal server error: Got termination event for worker that should have been removed" | .importsChanged => - let uri := fw.doc.uri - let queuedMsgs := fw.queuedMsgs - startFileWorker fw.doc - tryDischargeQueuedMessages uri queuedMsgs + startFileWorker doc mainLoop clientTask end MainLoop @@ -1132,7 +1211,7 @@ def initAndRunWatchdogAux : ServerM Unit := do catch _ => /- NOTE(WN): It looks like instead of sending the `exit` notification, - VSCode sometimes just closes the stream. In that case, pretend we got an `exit`. + VS Code sometimes just closes the stream. In that case, pretend we got an `exit`. -/ pure (Message.notification "exit" none) match msg with @@ -1191,6 +1270,7 @@ def initAndRunWatchdog (args : List String) (i o e : FS.Stream) : IO Unit := do freshServerRequestID := 0 } let importData ← IO.mkRef ⟨RBMap.empty, RBMap.empty⟩ + let requestData ← RequestDataMutex.new let i ← maybeTee "wdIn.txt" false i let o ← maybeTee "wdOut.txt" true o let e ← maybeTee "wdErr.txt" true e @@ -1218,6 +1298,7 @@ def initAndRunWatchdog (args : List String) (i o e : FS.Stream) : IO Unit := do references serverRequestData importData + requestData : ServerContext }