From 8decfb6b1d3b7b644c13842658dfeb0bc94aba06 Mon Sep 17 00:00:00 2001 From: mhuisi Date: Wed, 2 Sep 2020 21:15:28 +0200 Subject: [PATCH] feat: initial multiprocess watchdog arch --- src/Lean/Data/JsonRpc.lean | 11 +- src/Lean/Data/Lsp/Capabilities.lean | 3 + src/Lean/Data/Lsp/Communication.lean | 13 +- src/Lean/Data/Lsp/InitShutdown.lean | 19 ++ src/Lean/Data/Lsp/TextSync.lean | 11 + src/Lean/Server/FileWorker.lean | 318 +++++++++++++++++++++++++++ src/Lean/Server/HasFileSource.lean | 47 ++++ src/Lean/Server/Watchdog.lean | 311 ++++++++++++++++++++++++++ 8 files changed, 727 insertions(+), 6 deletions(-) create mode 100644 src/Lean/Server/FileWorker.lean create mode 100644 src/Lean/Server/HasFileSource.lean create mode 100644 src/Lean/Server/Watchdog.lean diff --git a/src/Lean/Data/JsonRpc.lean b/src/Lean/Data/JsonRpc.lean index b66296a204..7f4590cbb8 100644 --- a/src/Lean/Data/JsonRpc.lean +++ b/src/Lean/Data/JsonRpc.lean @@ -207,7 +207,16 @@ def readNotificationAs (h : FS.Stream) (nBytes : Nat) (expectedMethod : String) def writeMessage (h : FS.Stream) (m : Message) : IO Unit := h.writeJson (toJson m) -def writeResponse {α} [ToJson α] (h : FS.Stream) (id : RequestID) (r : α) : IO Unit := +def writeRequest {α : Type} [ToJson α] (h : FS.Stream) (id : RequestID) (method : String) (params : α) : IO Unit := + h.writeMessage (Message.request id method (fromJson? (toJson params))) + +def writeNotification {α : Type} [ToJson α] (h : FS.Stream) (method : String) (params : α) : IO Unit := + h.writeMessage (Message.notification method (fromJson? (toJson params))) + +def writeResponse {α : Type} [ToJson α] (h : FS.Stream) (id : RequestID) (r : α) : IO Unit := h.writeMessage (Message.response id (toJson r)) +def writeResponseError {α : Type} [HasToJson α] (h : FS.Stream) (id : RequestID) (code : ErrorCode) (message : String) (data : α) : IO Unit := + h.writeMessage (Message.responseError id code message (toJson data)) + end IO.FS.Stream diff --git a/src/Lean/Data/Lsp/Capabilities.lean b/src/Lean/Data/Lsp/Capabilities.lean index 0af417ffa2..398fc37d1f 100644 --- a/src/Lean/Data/Lsp/Capabilities.lean +++ b/src/Lean/Data/Lsp/Capabilities.lean @@ -23,6 +23,9 @@ inductive ClientCapabilities where instance : FromJson ClientCapabilities := ⟨fun j => ClientCapabilities.mk⟩ +instance ClientCapabilities.hasToJson : HasToJson ClientCapabilities := +⟨fun o => mkObj []⟩ + -- TODO largely unimplemented structure ServerCapabilities where textDocumentSync? : Option TextDocumentSyncOptions := none diff --git a/src/Lean/Data/Lsp/Communication.lean b/src/Lean/Data/Lsp/Communication.lean index 7a944a4b06..5f803e7563 100644 --- a/src/Lean/Data/Lsp/Communication.lean +++ b/src/Lean/Data/Lsp/Communication.lean @@ -67,14 +67,17 @@ def writeLspMessage (h : FS.Stream) (m : Message) : IO Unit := do h.putStr (header ++ j) h.flush +def writeLspRequest {α : Type} [ToJson α] (h : FS.Stream) (id : RequestID) (method : String) (params : α) : IO Unit := + writeLspMessage h (Message.request id method (fromJson? (toJson params))) + +def writeLspNotification {α : Type} [ToJson α] (h : FS.Stream) (method : String) (r : α) : IO Unit := + writeLspMessage h (Message.notification method (fromJson? (toJson r))) + def writeLspResponse {α : Type} [ToJson α] (h : FS.Stream) (id : RequestID) (r : α) : IO Unit := writeLspMessage h (Message.response id (toJson r)) -def writeLspNotification {α : Type} [ToJson α] (h : FS.Stream) (method : String) (r : α) : IO Unit := - match toJson r with - | Json.obj o => writeLspMessage h (Message.notification method o) - | Json.arr a => writeLspMessage h (Message.notification method a) - | _ => throw (userError "internal server error in Lean.Lsp.writeLspNotification: tried to write LSP notification that is neither a JSON object nor a JSON array") +def writeLspResponseError {α : Type} [ToJson α] (h : FS.Stream) (id : RequestID) (code : ErrorCode) (message : String) (data : α) : IO Unit := + writeLspMessage h (Message.responseError id code message (toJson data)) end Lsp end Lean diff --git a/src/Lean/Data/Lsp/InitShutdown.lean b/src/Lean/Data/Lsp/InitShutdown.lean index 397b8690e7..ef678fc58f 100644 --- a/src/Lean/Data/Lsp/InitShutdown.lean +++ b/src/Lean/Data/Lsp/InitShutdown.lean @@ -25,6 +25,9 @@ instance : FromJson ClientInfo := ⟨fun j => do let version? := j.getObjValAs? String "version" pure ⟨name, version?⟩⟩ +instance ClientInfo.hasToJson : ToJson ClientInfo := + ⟨fun o => mkObj $ ⟨"name", o.name⟩ :: opt "version" o.version?⟩ + inductive Trace where | off | messages @@ -37,6 +40,12 @@ instance : FromJson Trace := ⟨fun j => | some "verbose" => Trace.verbose | _ => none⟩ +instance Trace.hasToJson : ToJson Trace := +⟨fun o => match o with + | Trace.off => "off" + | Trace.messages => "messages" + | Trace.verbose => "verbose"⟩ + structure InitializeParams where processId? : Option Int := none clientInfo? : Option ClientInfo := none @@ -66,6 +75,16 @@ instance : FromJson InitializeParams := ⟨fun j => do let workspaceFolders? := j.getObjValAs? (Array WorkspaceFolder) "workspaceFolders" pure ⟨processId?, clientInfo?, rootUri?, initializationOptions?, capabilities, trace, workspaceFolders?⟩⟩ +instance InitializeParams.hasToJson : HasToJson InitializeParams := +⟨fun o => mkObj $ + opt "processId" o.processId? ++ + opt "clientInfo" o.clientInfo? ++ + opt "rootUri" o.rootUri? ++ + opt "initializationOptions" o.initializationOptions? ++ + [⟨"capabilities", toJson o.capabilities⟩] ++ + [⟨"trace", toJson o.trace⟩] ++ + opt "workspaceFolders" o.workspaceFolders?⟩ + inductive InitializedParams where | mk diff --git a/src/Lean/Data/Lsp/TextSync.lean b/src/Lean/Data/Lsp/TextSync.lean index 654211ed0b..77f219fdd2 100644 --- a/src/Lean/Data/Lsp/TextSync.lean +++ b/src/Lean/Data/Lsp/TextSync.lean @@ -62,6 +62,11 @@ instance : FromJson TextDocumentContentChangeEvent := ⟨fun j => pure $ TextDocumentContentChangeEvent.rangeChange range text) <|> (TextDocumentContentChangeEvent.fullChange <$> j.getObjValAs? String "text")⟩ +instance TextDocumentContentChangeEvent.hasToJson : ToJson TextDocumentContentChangeEvent := + ⟨fun o => mkObj $ match o with + | TextDocumentContentChangeEvent.rangeChange range text => [⟨"range", toJson range⟩, ⟨"text", toJson text⟩] + | TextDocumentContentChangeEvent.fullChange text => [⟨"text", toJson text⟩]⟩ + structure DidChangeTextDocumentParams where textDocument : VersionedTextDocumentIdentifier contentChanges : Array TextDocumentContentChangeEvent @@ -71,6 +76,9 @@ instance : FromJson DidChangeTextDocumentParams := ⟨fun j => do let contentChanges ← j.getObjValAs? (Array TextDocumentContentChangeEvent) "contentChanges" pure ⟨textDocument, contentChanges⟩⟩ +instance DidChangeTextDocumentParams.hasToJson : HasToJson DidChangeTextDocumentParams := +⟨fun o => mkObj $ [⟨"textDocument", toJson o.textDocument⟩, ⟨"contentChanges", toJson o.contentChanges⟩]⟩ + -- TODO: missing: -- WillSaveTextDocumentParams, TextDocumentSaveReason, -- TextDocumentSaveRegistrationOptions, DidSaveTextDocumentParams @@ -87,6 +95,9 @@ structure DidCloseTextDocumentParams where instance : FromJson DidCloseTextDocumentParams := ⟨fun j => DidCloseTextDocumentParams.mk <$> j.getObjValAs? TextDocumentIdentifier "textDocument"⟩ +instance DidCloseTextDocumentParams.hasToJson : HasToJson DidCloseTextDocumentParams := +⟨fun o => mkObj $ [⟨"textDocument", toJson o.textDocument⟩]⟩ + -- TODO: TextDocumentSyncClientCapabilities /- NOTE: This is defined twice in the spec. The latter version has more fields. -/ diff --git a/src/Lean/Server/FileWorker.lean b/src/Lean/Server/FileWorker.lean new file mode 100644 index 0000000000..a4a9b67bfe --- /dev/null +++ b/src/Lean/Server/FileWorker.lean @@ -0,0 +1,318 @@ +/- +Copyright (c) 2020 Marc Huisinga. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. + +Authors: Marc Huisinga, Wojciech Nawrocki +-/ +import Init.System.IO +import Std.Data.RBMap + +import Lean.Environment +import Lean.Server.Snapshots +import Lean.Data.Lsp +import Lean.Data.Json.FromToJson + +namespace Lean +namespace Server + +/- +Each file worker process manages a single file. File workers are launched and terminated +by a watchdog process. See Watchdog.lean for a description of how file workers are expected +to interact with the watchdog process. + +File processing and requests+notifications against a file should be concurrent for two reasons: +- By the LSP standard, requests should be cancellable. +- Since Lean allows arbitrary user code to be executed during elaboration via the tactic framework, + elaboration can be extremely slow and even not halt in some cases. Users should be able to + work with the file while this is happening, e.g. make new changes to the file or send requests. + +To achieve these goals, elaboration is executed in a chain of tasks, where each task corresponds to +the elaboration of one command. When the elaboration of one command is done, the next task is spawned. +On didChange notifications, we search for the task in which the change occured. If we stumble across +a task that has not yet finished before finding the task we're looking for, we terminate it +and start the elaboration there, otherwise we start the elaboration at the task where the change occured. + +Requests iterate over tasks until they find the command that they need to answer the request. +In order to not block the main thread, this is done in a request task. +If a task that the request task waits for is terminated, a change occured somewhere before the +command that the request is looking for and the request sends a "content changed" error. +-/ + +open Lsp +open IO +open Snapshots + +private def sendDiagnosticsCore (h : FS.Stream) (uri : DocumentUri) (version : Nat) (text : FileMap) (log : MessageLog) + : IO Unit := +let diagnostics := log.msgs.map (msgToDiagnostic text); +Lsp.writeLspNotification h "textDocument/publishDiagnostics" + { uri := uri, + version? := version, + diagnostics := diagnostics.toArray : PublishDiagnosticsParams } + +inductive TaskError +| aborted +| eof +| ioError (e : IO.Error) + +-- TODO(MH): use proper standard library version +constant asTask {α : Type} (act : IO α) : IO (Task (Except IO.Error α)) := arbitrary _ + +inductive ElabTask +-- TODO(MH): Sebastian said something about wrapping next in Thunk but i did not have time to look into it yet +| mk (snap : Snapshot) (next : Task (Except TaskError ElabTask)) : ElabTask + +namespace ElabTask + +private def runTask (act : IO (Except TaskError ElabTask)) : IO (Task (Except TaskError ElabTask)) := do +t ← asTask act; +pure $ t.map $ fun error => match error with +| Except.ok e => e +| Except.error ioError => Except.error (TaskError.ioError ioError) + +private partial def runCore (h : FS.Stream) (uri : DocumentUri) (version : Nat) (contents : FileMap) : Snapshot → IO (Except TaskError ElabTask) +| parent => do + result ← compileNextCmd contents.source parent; + match result with + | Sum.inl snap => do + sendDiagnosticsCore h uri version contents snap.msgLog; + -- TODO(MH): check for interrupt (maybe with increased precision even in compileNextCmd, but not after runTask!) + t ← runTask (runCore snap); + pure (Except.ok ⟨snap, t⟩) + | Sum.inr msgLog => pure (Except.error TaskError.eof) + +def run (h : FS.Stream) (uri : DocumentUri) (version : Nat) (contents : FileMap) (parent : Snapshot) : IO ElabTask := do +t ← runTask (runCore h uri version contents parent); +pure ⟨parent, t⟩ + +-- TODO(MH) +def nextHasFinished (t : ElabTask) : IO Bool := +pure true + +-- TODO(MH) +def interruptNext (t : ElabTask) : IO Unit := +pure () + +partial def interruptAfter : ElabTask → IO Unit +| task@⟨snap, nextTask⟩ => do + task.interruptNext; -- assumption: interrupting a finished task does not cause problems + -- it may happen that we interrupt the task + -- but it still finishes and launches a next task. + -- hence we need to chase down the chain of tasks until + -- one of them errors. + match nextTask.get with + | Except.ok next => interruptAfter next + | Except.error _ => pure () + +partial def branchOffAt (h : FS.Stream) (uri : DocumentUri) (version : Nat) (contents : FileMap) : ElabTask → String.Pos → IO ElabTask +| task@⟨snap, nextTask⟩, changePos => do + finished ← task.nextHasFinished; + if finished then + match nextTask.get with + | Except.ok (next@⟨nextSnap, _⟩) => + -- if next contains the change ... + -- (it will never be the header snap because the + -- watchdog will never send didChange notifs with + -- header changes to the file worker) + if changePos < nextSnap.endPos then do + newNext ← run h uri version contents snap; + -- interruptAfter task; -- TODO(MH): we may not need to interrupt since tasks without refs are marked as cancelled by the GC + pure newNext + else do + newNext ← branchOffAt next changePos; + pure ⟨snap, Task.pure (Except.ok newNext)⟩ + | Except.error e => match e with + -- this case should not be possible. only the main task aborts tasks and ensures that aborted tasks + -- do not show up in `snapshots` of EditableDocument below. + | TaskError.aborted => throw (userError "reached case that should not be possible during server file worker task branching") + | TaskError.eof => do + newNext ← run h uri version contents snap; + pure newNext + -- TODO(MH): can this ever occur in reasonable cases? + | TaskError.ioError ioError => throw ioError + else do + newNext ← run h uri version contents snap; + -- next might finish before it sees the interrupt, so we must chase down the chain of tasks + -- interruptAfter task; -- TODO(MH): we may not need to interrupt since tasks without refs are marked as cancelled by the GC + pure newNext + +end ElabTask + +/-- A document editable in the sense that we track the environment +and parser state after each command so that edits can be applied +without recompiling code appearing earlier in the file. -/ +structure EditableDocument := +(version : Nat) +(text : FileMap) +/- Subsequent snapshots occur after each command. -/ +(snapshots : ElabTask) + +namespace EditableDocument + +open Elab + +/-- Compiles the contents of a Lean file. -/ +def compileDocument (h : FS.Stream) (uri : DocumentUri) (version : Nat) (text : FileMap) : IO EditableDocument := do +headerSnap ← Snapshots.compileHeader text.source; +task ← ElabTask.run h uri version text headerSnap; +let docOut : EditableDocument := ⟨version, text, task⟩; +pure docOut + +/-- Given `changePos`, the UTF-8 offset of a change into the pre-change source, +and the new document, updates editable doc state. -/ +def updateDocument (h : FS.Stream) (uri : DocumentUri) (doc : EditableDocument) (changePos : String.Pos) (newVersion : Nat) (newText : FileMap) : IO EditableDocument := do +newSnapshots ← doc.snapshots.branchOffAt h uri newVersion newText changePos; +pure ⟨newVersion, newText, newSnapshots⟩ + +end EditableDocument + +open EditableDocument + +open IO +open Std (RBMap RBMap.empty) +open JsonRpc + +structure ServerContext := +(hIn hOut : FS.Stream) +(docRef : IO.Ref EditableDocument) +(pendingRequestsRef : IO.Ref (Array (Task (Except IO.Error Unit)))) + +abbrev ServerM := ReaderT ServerContext IO + +def setDocument (val : EditableDocument) : ServerM Unit := +fun st => st.docRef.set val + +def getDocument : ServerM EditableDocument := +fun st => st.docRef.get + +def updatePendingRequests (map : Array (Task (Except IO.Error Unit)) → Array (Task (Except IO.Error Unit))) : ServerM Unit := +fun st => st.pendingRequestsRef.modify map + +def readLspMessage : ServerM JsonRpc.Message := +fun st => monadLift $ readLspMessage st.hIn + +def readLspRequestAs (expectedMethod : String) (α : Type*) [HasFromJson α] : ServerM (Request α) := +fun st => monadLift $ readLspRequestAs st.hIn expectedMethod α + +def readLspNotificationAs (expectedMethod : String) (α : Type*) [HasFromJson α] : ServerM α := +fun st => monadLift $ readLspNotificationAs st.hIn expectedMethod α + +def writeLspNotification {α : Type*} [HasToJson α] (method : String) (params : α) : ServerM Unit := +fun st => monadLift $ writeLspNotification st.hOut method params + +def writeLspResponse {α : Type*} [HasToJson α] (id : RequestID) (params : α) : ServerM Unit := +fun st => monadLift $ writeLspResponse st.hOut id params + +/-- Clears diagnostics for the document version 'version'. -/ +-- TODO(WN): how to clear all diagnostics? Sending version 'none' doesn't seem to work +def clearDiagnostics (uri : DocumentUri) (version : Nat) : ServerM Unit := +writeLspNotification "textDocument/publishDiagnostics" + { uri := uri, + version? := version, + diagnostics := #[] : PublishDiagnosticsParams } + +def sendDiagnostics (uri : DocumentUri) (doc : EditableDocument) (log : MessageLog) + : ServerM Unit := do +fun st => monadLift $ sendDiagnosticsCore st.hOut uri doc.version doc.text log + +def openDocument (h : FS.Stream) (p : DidOpenTextDocumentParams) : IO EditableDocument := do +let doc := p.textDocument; +-- NOTE(WN): `toFileMap` marks line beginnings as immediately following +-- "\n", which should be enough to handle both LF and CRLF correctly. +-- This is because LSP always refers to characters by (line, column), +-- so if we get the line number correct it shouldn't matter that there +-- is a CR there. +let text := doc.text.toFileMap; +newDoc ← compileDocument h doc.uri doc.version text; +pure newDoc + +private def replaceLspRange (text : FileMap) (r : Lsp.Range) (newText : String) : FileMap := +let start := text.lspPosToUtf8Pos r.start; +let «end» := text.lspPosToUtf8Pos r.«end»; +let pre := text.source.extract 0 start; +let post := text.source.extract «end» text.source.bsize; +(pre ++ newText ++ post).toFileMap + +def handleDidChange (p : DidChangeTextDocumentParams) : ServerM Unit := do +let docId := p.textDocument; +let changes := p.contentChanges; +oldDoc ← getDocument; +some newVersion ← pure docId.version? | throw (userError "expected version number"); +if newVersion <= oldDoc.version then do + throw (userError "got outdated version number") +else changes.forM $ fun change => + match change with + | TextDocumentContentChangeEvent.rangeChange (range : Range) (newText : String) => do + -- Clients don't have to clear diagnostics, so we clear them + -- for the *previous* version here. + clearDiagnostics docId.uri oldDoc.version; + let startOff := oldDoc.text.lspPosToUtf8Pos range.start; + let newDocText := replaceLspRange oldDoc.text range newText; + st ← read; + newDoc ← monadLift $ + updateDocument st.hOut docId.uri oldDoc startOff newVersion newDocText; + setDocument newDoc + | TextDocumentContentChangeEvent.fullChange (text : String) => + throw (userError "TODO impl computing the diff of two sources.") + +-- TODO(MH): requests that need data from a certain command should traverse e.snapshots +-- by successively getting the next task, meaning that we might need to wait for elaboration. +-- Sebastian said something about a future function TaskIO.bind that ensures that the +-- request task will also stop waiting when the reference to the task is released by handleDidChange. +-- when that happens, the request should send a "content changed" error to the user. +-- (this way, the server doesn't get bogged down in requests for an old state of the document) +def handleHover (p : HoverParams) (e : EditableDocument) : IO Unit := pure ⟨⟩ + +def parseParams (paramType : Type*) [HasFromJson paramType] (params : Json) : ServerM paramType := +match fromJson? params with +| some parsed => pure parsed +| none => throw (userError "got param with wrong structure") + +def handleNotification (method : String) (params : Json) : ServerM Unit := do +let h := (fun paramType [HasFromJson paramType] (handler : paramType → ServerM Unit) => + parseParams paramType params >>= handler); +match method with +| "textDocument/didChange" => h DidChangeTextDocumentParams handleDidChange +| "$/cancelRequest" => pure () -- TODO when we're async +| _ => throw (userError "got unsupported notification method") + +def queueRequest {α : Type*} (id : RequestID) (handler : α → EditableDocument → IO Unit) (params : α) : ServerM Unit := do +doc ← getDocument; +requestTask ← monadLift $ asTask (handler params doc); +updatePendingRequests (fun pendingRequests => Array.push pendingRequests requestTask) + +def handleRequest (id : RequestID) (method : String) (params : Json) + : ServerM Unit := do +let h := (fun paramType [HasFromJson paramType] + (handler : paramType → EditableDocument → IO Unit) => + parseParams paramType params >>= queueRequest id handler); +match method with +| "textDocument/hover" => h HoverParams handleHover +| _ => throw (userError $ "got unsupported request: " ++ method ++ + "; params: " ++ toString params) + +partial def mainLoop : Unit → ServerM Unit +| () => do + -- TODO(MH): gracefully terminate when stdin is closed by watchdog? + msg ← readLspMessage; + -- TODO(MH): updatePendingRequests ...: get rid of all requests that are finished + match msg with + | Message.request id method (some params) => do + handleRequest id method (toJson params); + mainLoop () + | Message.notification method (some params) => do + handleNotification method (toJson params); + mainLoop () + | _ => throw (userError "got invalid JSON-RPC message") + +def initAndRunServer (i o : FS.Stream) : IO Unit := do +-- ignore InitializeParams for MWE +initRequest ← Lsp.readLspRequestAs i "initialize" InitializeParams; +docRequest ← Lsp.readLspRequestAs i "textDocument/didOpen" DidOpenTextDocumentParams; +doc ← openDocument o docRequest.param; +docRef ← IO.mkRef doc; +pendingRequestsRef ← IO.mkRef #[]; +runReader (mainLoop ()) (⟨i, o, docRef, pendingRequestsRef⟩ : ServerContext) + +end Server +end Lean diff --git a/src/Lean/Server/HasFileSource.lean b/src/Lean/Server/HasFileSource.lean new file mode 100644 index 0000000000..fcdabed2ee --- /dev/null +++ b/src/Lean/Server/HasFileSource.lean @@ -0,0 +1,47 @@ +/- +Copyright (c) 2020 Marc Huisinga. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. + +Authors: Marc Huisinga +-/ +import Lean.Data.Lsp + +namespace Lean +namespace Lsp + +class HasFileSource (α : Type*) := +(fileSource : α → DocumentUri) +export HasFileSource (fileSource) + +instance Location.hasFileSource : HasFileSource Location := +⟨fun l => l.uri⟩ + +instance TextDocumentIdentifier.hasFileSource : HasFileSource TextDocumentIdentifier := +⟨fun i => i.uri⟩ + +instance VersionedTextDocumentIdentifier.hasFileSource : HasFileSource VersionedTextDocumentIdentifier := +⟨fun i => i.uri⟩ + +instance TextDocumentEdit.hasFileSource : HasFileSource TextDocumentEdit := +⟨fun e => fileSource e.textDocument⟩ + +instance TextDocumentItem.hasFileSource : HasFileSource TextDocumentItem := +⟨fun i => i.uri⟩ + +instance TextDocumentPositionParams.hasFileSource : HasFileSource TextDocumentPositionParams := +⟨fun p => fileSource p.textDocument⟩ + +instance DidOpenTextDocumentParams.hasFileSource : HasFileSource DidOpenTextDocumentParams := +⟨fun p => fileSource p.textDocument⟩ + +instance DidChangeTextDocumentParams.hasFileSource : HasFileSource DidChangeTextDocumentParams := +⟨fun p => fileSource p.textDocument⟩ + +instance DidCloseTextDocumentParams.hasFileSource : HasFileSource DidCloseTextDocumentParams := +⟨fun p => fileSource p.textDocument⟩ + +instance HoverParams.hasFileSource : HasFileSource HoverParams := +⟨fun h => fileSource h.toTextDocumentPositionParams⟩ + +end Lsp +end Lean diff --git a/src/Lean/Server/Watchdog.lean b/src/Lean/Server/Watchdog.lean new file mode 100644 index 0000000000..fb7ac5d273 --- /dev/null +++ b/src/Lean/Server/Watchdog.lean @@ -0,0 +1,311 @@ +/- +Copyright (c) 2020 Marc Huisinga. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. + +Authors: Marc Huisinga, Wojciech Nawrocki +-/ +import Init.System.IO +import Std.Data.RBMap + +import Lean.Elab.Import + +import Lean.Data.Lsp +import Lean.Server.HasFileSource + +namespace Lean +namespace Server + +/- +The server architecture consists of a watchdog process that communicates with the user +and one file worker process for each open file. +This is because processing the header of a file creates objects that need to be freed manually. +Unfortunately, it is very difficult to ensure that these objects are not still in use by some user code, +potentially leading to segfaults when freeing these objects. +To contain this issue, each open file is processed by one dedicated process. +When the header is mutated, the process is killed and restarted by the watchdog process. +Lean elaboration can also be very expensive due to the tactic framework essentially allowing for arbitrary user +programs: If the user code for one file causes a stack overflow, we would not want the entire server to die. +Hence, isolating user code at a file-level and potentially restarting file worker processes upon error has the added +benefit of increased stability. + +To communicate the file state to the file worker upon restarting, the watchdog needs to maintain +the current state of the file, which it can also use to detect changes to the header and thus terminate +the corresponding file worker. + +The watchdog process and its file worker processes communicate via LSP. Most requests and notifications +are forwarded to the corresponding file worker process, with the exception of these notifications: +- didOpen: launch the file worker, create the associated watchdog state and launch a task to forward + incoming packets from the watchdog (e.g. request responses). +- didChange: update the local file state. if the header was mutated, + signal a shutdown to the file worker by closing the I/O channels and restart the file worker. + otherwise, forward the didChange notification. +- didClose: signal a shutdown to the file worker and remove the associated watchdog state. +Writes to Lean I/O channels are atomic, and hence we do not need additional synchronization for the tasks +that read file worker responses. + +While the communication between the watchdog and its file workers uses LSP packets, it does not implement the +full protocol: +- Upon starting, the initialize request is forwarded to the file worker, but it must not respond with its server capabilities. + Consequently, the watchdog will not send an initialized notification to the file worker. +- After initialize, the watch dog sends the corresponding didOpen notification with the full current state of the file. + No additional didOpen notifications will be forwarded to the file worker process. +- File workers will never receive a shutdown request or an exit notification. File workers are always terminated + by closing their I/O channels. Similarly, they never receive a didClose notification. +-/ + +open IO +open Std (RBMap RBMap.empty) +open Lsp +open JsonRpc + +private def replaceLspRange (text : FileMap) (r : Lsp.Range) (newText : String) : FileMap := +let start := text.lspPosToUtf8Pos r.start; +let «end» := text.lspPosToUtf8Pos r.«end»; +let pre := text.source.extract 0 start; +let post := text.source.extract «end» text.source.bsize; +(pre ++ newText ++ post).toFileMap + +def parsedImportsEndPos (input : String) : IO String.Pos := do +env ← mkEmptyEnvironment; +let fileName := ""; +let inputCtx := Parser.mkInputContext input fileName; +match Parser.parseHeader env inputCtx with +| (header, parserState, messages) => do + pure parserState.pos + +structure EditableDocument := +(version : Nat) +(text : FileMap) +(headerEndPos : String.Pos) + +def workerCfg : Process.StdioConfig := ⟨Process.Stdio.piped, Process.Stdio.piped, Process.Stdio.piped⟩ + +structure FileWorker := +(doc : EditableDocument) +(proc : Process.Child workerCfg) + +namespace FileWorker + +def spawnArgs : Process.SpawnArgs := {workerCfg with cmd := "fileworker"} + +def spawn (doc : EditableDocument) : IO FileWorker := do +proc ← Process.spawn spawnArgs; +pure ⟨doc, proc⟩ + +def stdin (fw : FileWorker) : FS.Stream := +FS.Stream.ofHandle fw.proc.stdin + +def stdout (fw : FileWorker) : FS.Stream := +FS.Stream.ofHandle fw.proc.stdout + +def stderr (fw : FileWorker) : FS.Stream := +FS.Stream.ofHandle fw.proc.stderr + +def wait (w : FileWorker) : IO Nat := pure 0 + +end FileWorker + +abbrev FileWorkerMap := RBMap DocumentUri FileWorker (fun a b => Decidable.decide (a < b)) + +structure ServerContext := +(hIn hOut : FS.Stream) +(fileWorkersRef : IO.Ref FileWorkerMap) +(initParams : InitializeParams) + +abbrev ServerM := ReaderT ServerContext IO + +def updateFileWorkers (key : DocumentUri) (val : FileWorker) : ServerM Unit := +fun st => st.fileWorkersRef.modify (fun fileWorkers => fileWorkers.insert key val) + +def findFileWorker (key : DocumentUri) : ServerM FileWorker := +fun st => do +fileWorkers ← st.fileWorkersRef.get; +match fileWorkers.find? key with +| some fw => pure fw +| none => throw (userError $ "got unknown document URI (" ++ key ++ ")") + +def readWorkerLspMessage (key : DocumentUri) : ServerM JsonRpc.Message := +findFileWorker key >>= fun fw => monadLift $ readLspMessage fw.stdout + +def readUserLspMessage : ServerM JsonRpc.Message := +fun st => monadLift $ readLspMessage st.hIn + +def readWorkerLspRequestAs (key : DocumentUri) (expectedMethod : String) (α : Type*) [HasFromJson α] : ServerM (Request α) := +findFileWorker key >>= fun fw => monadLift $ readLspRequestAs fw.stdout expectedMethod α + +def readUserLspRequestAs (expectedMethod : String) (α : Type*) [HasFromJson α] : ServerM (Request α) := +fun st => monadLift $ readLspRequestAs st.hIn expectedMethod α + +def readWorkerLspNotificationAs (key : DocumentUri) (expectedMethod : String) (α : Type*) [HasFromJson α] : ServerM α := +findFileWorker key >>= fun fw => monadLift $ readLspNotificationAs fw.stdout expectedMethod α + +def readUserLspNotificationAs (expectedMethod : String) (α : Type*) [HasFromJson α] : ServerM α := +fun st => monadLift $ readLspNotificationAs st.hIn expectedMethod α + +def writeWorkerLspMessage (key : DocumentUri) (msg : JsonRpc.Message) : ServerM Unit := +findFileWorker key >>= fun fw => monadLift $ writeLspMessage fw.stdin msg + +def writeUserLspMessage (msg : JsonRpc.Message) : ServerM Unit := +fun st => monadLift $ writeLspMessage st.hOut msg + +def writeWorkerLspRequest {α : Type*} [HasToJson α] (key : DocumentUri) (id : RequestID) (method : String) (params : α) : ServerM Unit := +findFileWorker key >>= fun fw => monadLift $ writeLspRequest fw.stdin id method params + +def writeUserLspRequest {α : Type*} [HasToJson α] (id : RequestID) (method : String) (params : α) : ServerM Unit := +fun st => monadLift $ writeLspRequest st.hOut id method params + +def writeWorkerLspNotification {α : Type*} [HasToJson α] (key : DocumentUri) (method : String) (params : α) : ServerM Unit := +findFileWorker key >>= fun fw => monadLift $ writeLspNotification fw.stdin method params + +def writeUserLspNotification {α : Type*} [HasToJson α] (method : String) (params : α) : ServerM Unit := +fun st => monadLift $ writeLspNotification st.hOut method params + +def writeWorkerLspResponse {α : Type*} [HasToJson α] (key : DocumentUri) (id : RequestID) (params : α) : ServerM Unit := +findFileWorker key >>= fun fw => monadLift $ writeLspResponse fw.stdin id params + +def writeUserLspResponse {α : Type*} [HasToJson α] (id : RequestID) (params : α) : ServerM Unit := +fun st => monadLift $ writeLspResponse st.hOut id params + +def writeWorkerInitializeParams (key : DocumentUri) : ServerM Unit := do +st ← read; +writeWorkerLspRequest key (0 : Nat) "initialize" st.initParams + +def writeWorkerDidOpenNotification (key : DocumentUri) : ServerM Unit := do +findFileWorker key >>= fun fw => writeWorkerLspNotification key "textDocument/didOpen" + (DidOpenTextDocumentParams.mk ⟨key, "lean", fw.doc.version, fw.doc.text.source⟩) + +def parseParams (paramType : Type*) [HasFromJson paramType] (params : Json) : ServerM paramType := +match fromJson? params with +| some parsed => pure parsed +| none => throw (userError "got param with wrong structure") + +-- NOTE(MH): forwardFileWorkerPackets needs to take a FileWorker, not a DocumentUri. +-- otherwise, it might occur that we update the list of file workers on the main task, +-- possibly yielding a race condition. +partial def forwardFileWorkerPackets (fw : FileWorker) : Unit → ServerM Unit +| ⟨⟩ => do + -- TODO(MH): detect closed stream somehow and terminate gracefully + -- TODO(MH): potentially catch unintended termination (e.g. due to stack overflow) and restart process + msg ← monadLift $ readLspMessage fw.stdout; + writeUserLspMessage msg; + forwardFileWorkerPackets ⟨⟩ + +def startFileWorker (key : DocumentUri) (version : Nat) (text : FileMap) : ServerM Unit := do +pos ← monadLift $ parsedImportsEndPos text.source; +fw ← monadLift $ FileWorker.spawn ⟨version, text, pos⟩; +updateFileWorkers key fw; +writeWorkerInitializeParams key; +writeWorkerDidOpenNotification key; +-- TODO(MH): replace with working IO variant +-- TODO(MH): Sebastian said something about this better being implemented as threads +-- (due to the long running nature of these tasks) but i did not yet have time to +-- look into this. +let _ := Task.mk (forwardFileWorkerPackets fw); +pure ⟨⟩ + +-- TODO(MH) +def terminateFileWorker (fw : FileWorker) : ServerM Unit := pure () + +def handleDidOpen (p : DidOpenTextDocumentParams) : ServerM Unit := +let doc := p.textDocument; +-- NOTE(WN): `toFileMap` marks line beginnings as immediately following +-- "\n", which should be enough to handle both LF and CRLF correctly. +-- This is because LSP always refers to characters by (line, column), +-- so if we get the line number correct it shouldn't matter that there +-- is a CR there. +startFileWorker doc.uri doc.version doc.text.toFileMap + +def handleDidChange (p : DidChangeTextDocumentParams) : ServerM Unit := do +let doc := p.textDocument; +let changes := p.contentChanges; +fw ← findFileWorker doc.uri; +let oldDoc := fw.doc; +some newVersion ← pure doc.version? | throw (userError "expected version number"); +if newVersion <= oldDoc.version then do + throw (userError "got outdated version number") +else changes.forM $ fun change => + match change with + | TextDocumentContentChangeEvent.rangeChange (range : Range) (newText : String) => do + let startOff := oldDoc.text.lspPosToUtf8Pos range.start; + let newDocText := replaceLspRange oldDoc.text range newText; + let oldHeaderEndPos := oldDoc.headerEndPos; + if startOff < oldHeaderEndPos then do + terminateFileWorker fw; + startFileWorker doc.uri newVersion newDocText + else + let newDoc : EditableDocument := ⟨newVersion, newDocText, oldHeaderEndPos⟩; + updateFileWorkers doc.uri { fw with doc := newDoc }; + writeWorkerLspNotification doc.uri "textDocument/didChange" p + | TextDocumentContentChangeEvent.fullChange (text : String) => + throw (userError "TODO impl computing the diff of two sources.") + +def handleDidClose (p : DidCloseTextDocumentParams) : ServerM Unit := do +let doc := p.textDocument; +fw ← findFileWorker doc.uri; +terminateFileWorker fw; +st ← read; +st.fileWorkersRef.modify (fun fileWorkers => fileWorkers.erase doc.uri) + +def handleRequest (id : RequestID) (method : String) (params : Json) : ServerM Unit := do +let h := (fun α [HasFromJson α] [HasToJson α] [HasFileSource α] => do + parsedParams ← parseParams α params; + writeWorkerLspRequest (fileSource parsedParams) id method parsedParams); +match method with +| "textDocument/hover" => h HoverParams +| _ => throw (userError $ "got unsupported request: " ++ method ++ + "; params: " ++ toString params) + +def handleNotification (method : String) (params : Json) : ServerM Unit := do +let forward := (fun α [HasFromJson α] [HasToJson α] [HasFileSource α] => do + parsedParams ← parseParams α params; + writeWorkerLspNotification (fileSource parsedParams) method parsedParams); +let handle := (fun α [HasFromJson α] (handler : α → ServerM Unit) => parseParams α params >>= handler); +match method with +| "textDocument/didOpen" => handle DidOpenTextDocumentParams handleDidOpen +| "textDocument/didChange" => handle DidChangeTextDocumentParams handleDidChange +| "textDocument/didClose" => handle DidCloseTextDocumentParams handleDidClose +| "$/cancelRequest" => pure () -- TODO when we're async +| _ => throw (userError "got unsupported notification method") + +partial def mainLoop : Unit → ServerM Unit +| () => do + msg ← readUserLspMessage; + match msg with + | Message.request id "shutdown" _ => + writeUserLspResponse id (Json.null) + | Message.request id method (some params) => do + handleRequest id method (toJson params); + mainLoop () + | Message.notification method (some params) => do + handleNotification method (toJson params); + mainLoop () + | _ => throw (userError "got invalid JSON-RPC message") + +def mkLeanServerCapabilities : ServerCapabilities := +{ textDocumentSync? := some + { openClose := true, + change := TextDocumentSyncKind.incremental, + willSave := false, + willSaveWaitUntil := false, + save? := none }, + hoverProvider := true } + +def initAndRunServer (i o : FS.Stream) : IO Unit := do +fileWorkersRef ← IO.mkRef (RBMap.empty : FileWorkerMap); +-- ignore InitializeParams for MWE +initRequest ← readLspRequestAs i "initialize" InitializeParams; +runReader + (do + writeUserLspResponse initRequest.id + { capabilities := mkLeanServerCapabilities, + serverInfo? := some { name := "Lean 4 server", + version? := "0.0.1" } : InitializeResult }; + _ ← readUserLspNotificationAs "initialized" InitializedParams; + mainLoop (); + Message.notification "exit" none ← readUserLspMessage + | throw (userError "Expected an Exit Notification."); + pure ()) + (⟨i, o, fileWorkersRef, initRequest.param⟩ : ServerContext) + +end Server +end Lean