feat: initial multiprocess watchdog arch

This commit is contained in:
mhuisi 2020-09-02 21:15:28 +02:00 committed by Sebastian Ullrich
parent ef51087138
commit 8decfb6b1d
8 changed files with 727 additions and 6 deletions

View file

@ -207,7 +207,16 @@ def readNotificationAs (h : FS.Stream) (nBytes : Nat) (expectedMethod : String)
def writeMessage (h : FS.Stream) (m : Message) : IO Unit :=
h.writeJson (toJson m)
def writeResponse {α} [ToJson α] (h : FS.Stream) (id : RequestID) (r : α) : IO Unit :=
def writeRequest {α : Type} [ToJson α] (h : FS.Stream) (id : RequestID) (method : String) (params : α) : IO Unit :=
h.writeMessage (Message.request id method (fromJson? (toJson params)))
def writeNotification {α : Type} [ToJson α] (h : FS.Stream) (method : String) (params : α) : IO Unit :=
h.writeMessage (Message.notification method (fromJson? (toJson params)))
def writeResponse {α : Type} [ToJson α] (h : FS.Stream) (id : RequestID) (r : α) : IO Unit :=
h.writeMessage (Message.response id (toJson r))
def writeResponseError {α : Type} [HasToJson α] (h : FS.Stream) (id : RequestID) (code : ErrorCode) (message : String) (data : α) : IO Unit :=
h.writeMessage (Message.responseError id code message (toJson data))
end IO.FS.Stream

View file

@ -23,6 +23,9 @@ inductive ClientCapabilities where
instance : FromJson ClientCapabilities :=
⟨fun j => ClientCapabilities.mk⟩
instance ClientCapabilities.hasToJson : HasToJson ClientCapabilities :=
⟨fun o => mkObj []⟩
-- TODO largely unimplemented
structure ServerCapabilities where
textDocumentSync? : Option TextDocumentSyncOptions := none

View file

@ -67,14 +67,17 @@ def writeLspMessage (h : FS.Stream) (m : Message) : IO Unit := do
h.putStr (header ++ j)
h.flush
def writeLspRequest {α : Type} [ToJson α] (h : FS.Stream) (id : RequestID) (method : String) (params : α) : IO Unit :=
writeLspMessage h (Message.request id method (fromJson? (toJson params)))
def writeLspNotification {α : Type} [ToJson α] (h : FS.Stream) (method : String) (r : α) : IO Unit :=
writeLspMessage h (Message.notification method (fromJson? (toJson r)))
def writeLspResponse {α : Type} [ToJson α] (h : FS.Stream) (id : RequestID) (r : α) : IO Unit :=
writeLspMessage h (Message.response id (toJson r))
def writeLspNotification {α : Type} [ToJson α] (h : FS.Stream) (method : String) (r : α) : IO Unit :=
match toJson r with
| Json.obj o => writeLspMessage h (Message.notification method o)
| Json.arr a => writeLspMessage h (Message.notification method a)
| _ => throw (userError "internal server error in Lean.Lsp.writeLspNotification: tried to write LSP notification that is neither a JSON object nor a JSON array")
def writeLspResponseError {α : Type} [ToJson α] (h : FS.Stream) (id : RequestID) (code : ErrorCode) (message : String) (data : α) : IO Unit :=
writeLspMessage h (Message.responseError id code message (toJson data))
end Lsp
end Lean

View file

@ -25,6 +25,9 @@ instance : FromJson ClientInfo := ⟨fun j => do
let version? := j.getObjValAs? String "version"
pure ⟨name, version?⟩⟩
instance ClientInfo.hasToJson : ToJson ClientInfo :=
⟨fun o => mkObj $ ⟨"name", o.name⟩ :: opt "version" o.version?⟩
inductive Trace where
| off
| messages
@ -37,6 +40,12 @@ instance : FromJson Trace := ⟨fun j =>
| some "verbose" => Trace.verbose
| _ => none⟩
instance Trace.hasToJson : ToJson Trace :=
⟨fun o => match o with
| Trace.off => "off"
| Trace.messages => "messages"
| Trace.verbose => "verbose"⟩
structure InitializeParams where
processId? : Option Int := none
clientInfo? : Option ClientInfo := none
@ -66,6 +75,16 @@ instance : FromJson InitializeParams := ⟨fun j => do
let workspaceFolders? := j.getObjValAs? (Array WorkspaceFolder) "workspaceFolders"
pure ⟨processId?, clientInfo?, rootUri?, initializationOptions?, capabilities, trace, workspaceFolders?⟩⟩
instance InitializeParams.hasToJson : HasToJson InitializeParams :=
⟨fun o => mkObj $
opt "processId" o.processId? ++
opt "clientInfo" o.clientInfo? ++
opt "rootUri" o.rootUri? ++
opt "initializationOptions" o.initializationOptions? ++
[⟨"capabilities", toJson o.capabilities⟩] ++
[⟨"trace", toJson o.trace⟩] ++
opt "workspaceFolders" o.workspaceFolders?⟩
inductive InitializedParams where
| mk

View file

@ -62,6 +62,11 @@ instance : FromJson TextDocumentContentChangeEvent := ⟨fun j =>
pure $ TextDocumentContentChangeEvent.rangeChange range text) <|>
(TextDocumentContentChangeEvent.fullChange <$> j.getObjValAs? String "text")⟩
instance TextDocumentContentChangeEvent.hasToJson : ToJson TextDocumentContentChangeEvent :=
⟨fun o => mkObj $ match o with
| TextDocumentContentChangeEvent.rangeChange range text => [⟨"range", toJson range⟩, ⟨"text", toJson text⟩]
| TextDocumentContentChangeEvent.fullChange text => [⟨"text", toJson text⟩]⟩
structure DidChangeTextDocumentParams where
textDocument : VersionedTextDocumentIdentifier
contentChanges : Array TextDocumentContentChangeEvent
@ -71,6 +76,9 @@ instance : FromJson DidChangeTextDocumentParams := ⟨fun j => do
let contentChanges ← j.getObjValAs? (Array TextDocumentContentChangeEvent) "contentChanges"
pure ⟨textDocument, contentChanges⟩⟩
instance DidChangeTextDocumentParams.hasToJson : HasToJson DidChangeTextDocumentParams :=
⟨fun o => mkObj $ [⟨"textDocument", toJson o.textDocument⟩, ⟨"contentChanges", toJson o.contentChanges⟩]⟩
-- TODO: missing:
-- WillSaveTextDocumentParams, TextDocumentSaveReason,
-- TextDocumentSaveRegistrationOptions, DidSaveTextDocumentParams
@ -87,6 +95,9 @@ structure DidCloseTextDocumentParams where
instance : FromJson DidCloseTextDocumentParams := ⟨fun j =>
DidCloseTextDocumentParams.mk <$> j.getObjValAs? TextDocumentIdentifier "textDocument"⟩
instance DidCloseTextDocumentParams.hasToJson : HasToJson DidCloseTextDocumentParams :=
⟨fun o => mkObj $ [⟨"textDocument", toJson o.textDocument⟩]⟩
-- TODO: TextDocumentSyncClientCapabilities
/- NOTE: This is defined twice in the spec. The latter version has more fields. -/

View file

@ -0,0 +1,318 @@
/-
Copyright (c) 2020 Marc Huisinga. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Marc Huisinga, Wojciech Nawrocki
-/
import Init.System.IO
import Std.Data.RBMap
import Lean.Environment
import Lean.Server.Snapshots
import Lean.Data.Lsp
import Lean.Data.Json.FromToJson
namespace Lean
namespace Server
/-
Each file worker process manages a single file. File workers are launched and terminated
by a watchdog process. See Watchdog.lean for a description of how file workers are expected
to interact with the watchdog process.
File processing and requests+notifications against a file should be concurrent for two reasons:
- By the LSP standard, requests should be cancellable.
- Since Lean allows arbitrary user code to be executed during elaboration via the tactic framework,
elaboration can be extremely slow and even not halt in some cases. Users should be able to
work with the file while this is happening, e.g. make new changes to the file or send requests.
To achieve these goals, elaboration is executed in a chain of tasks, where each task corresponds to
the elaboration of one command. When the elaboration of one command is done, the next task is spawned.
On didChange notifications, we search for the task in which the change occured. If we stumble across
a task that has not yet finished before finding the task we're looking for, we terminate it
and start the elaboration there, otherwise we start the elaboration at the task where the change occured.
Requests iterate over tasks until they find the command that they need to answer the request.
In order to not block the main thread, this is done in a request task.
If a task that the request task waits for is terminated, a change occured somewhere before the
command that the request is looking for and the request sends a "content changed" error.
-/
open Lsp
open IO
open Snapshots
private def sendDiagnosticsCore (h : FS.Stream) (uri : DocumentUri) (version : Nat) (text : FileMap) (log : MessageLog)
: IO Unit :=
let diagnostics := log.msgs.map (msgToDiagnostic text);
Lsp.writeLspNotification h "textDocument/publishDiagnostics"
{ uri := uri,
version? := version,
diagnostics := diagnostics.toArray : PublishDiagnosticsParams }
inductive TaskError
| aborted
| eof
| ioError (e : IO.Error)
-- TODO(MH): use proper standard library version
constant asTask {α : Type} (act : IO α) : IO (Task (Except IO.Error α)) := arbitrary _
inductive ElabTask
-- TODO(MH): Sebastian said something about wrapping next in Thunk but i did not have time to look into it yet
| mk (snap : Snapshot) (next : Task (Except TaskError ElabTask)) : ElabTask
namespace ElabTask
private def runTask (act : IO (Except TaskError ElabTask)) : IO (Task (Except TaskError ElabTask)) := do
t ← asTask act;
pure $ t.map $ fun error => match error with
| Except.ok e => e
| Except.error ioError => Except.error (TaskError.ioError ioError)
private partial def runCore (h : FS.Stream) (uri : DocumentUri) (version : Nat) (contents : FileMap) : Snapshot → IO (Except TaskError ElabTask)
| parent => do
result ← compileNextCmd contents.source parent;
match result with
| Sum.inl snap => do
sendDiagnosticsCore h uri version contents snap.msgLog;
-- TODO(MH): check for interrupt (maybe with increased precision even in compileNextCmd, but not after runTask!)
t ← runTask (runCore snap);
pure (Except.ok ⟨snap, t⟩)
| Sum.inr msgLog => pure (Except.error TaskError.eof)
def run (h : FS.Stream) (uri : DocumentUri) (version : Nat) (contents : FileMap) (parent : Snapshot) : IO ElabTask := do
t ← runTask (runCore h uri version contents parent);
pure ⟨parent, t⟩
-- TODO(MH)
def nextHasFinished (t : ElabTask) : IO Bool :=
pure true
-- TODO(MH)
def interruptNext (t : ElabTask) : IO Unit :=
pure ()
partial def interruptAfter : ElabTask → IO Unit
| task@⟨snap, nextTask⟩ => do
task.interruptNext; -- assumption: interrupting a finished task does not cause problems
-- it may happen that we interrupt the task
-- but it still finishes and launches a next task.
-- hence we need to chase down the chain of tasks until
-- one of them errors.
match nextTask.get with
| Except.ok next => interruptAfter next
| Except.error _ => pure ()
partial def branchOffAt (h : FS.Stream) (uri : DocumentUri) (version : Nat) (contents : FileMap) : ElabTask → String.Pos → IO ElabTask
| task@⟨snap, nextTask⟩, changePos => do
finished ← task.nextHasFinished;
if finished then
match nextTask.get with
| Except.ok (next@⟨nextSnap, _⟩) =>
-- if next contains the change ...
-- (it will never be the header snap because the
-- watchdog will never send didChange notifs with
-- header changes to the file worker)
if changePos < nextSnap.endPos then do
newNext ← run h uri version contents snap;
-- interruptAfter task; -- TODO(MH): we may not need to interrupt since tasks without refs are marked as cancelled by the GC
pure newNext
else do
newNext ← branchOffAt next changePos;
pure ⟨snap, Task.pure (Except.ok newNext)⟩
| Except.error e => match e with
-- this case should not be possible. only the main task aborts tasks and ensures that aborted tasks
-- do not show up in `snapshots` of EditableDocument below.
| TaskError.aborted => throw (userError "reached case that should not be possible during server file worker task branching")
| TaskError.eof => do
newNext ← run h uri version contents snap;
pure newNext
-- TODO(MH): can this ever occur in reasonable cases?
| TaskError.ioError ioError => throw ioError
else do
newNext ← run h uri version contents snap;
-- next might finish before it sees the interrupt, so we must chase down the chain of tasks
-- interruptAfter task; -- TODO(MH): we may not need to interrupt since tasks without refs are marked as cancelled by the GC
pure newNext
end ElabTask
/-- A document editable in the sense that we track the environment
and parser state after each command so that edits can be applied
without recompiling code appearing earlier in the file. -/
structure EditableDocument :=
(version : Nat)
(text : FileMap)
/- Subsequent snapshots occur after each command. -/
(snapshots : ElabTask)
namespace EditableDocument
open Elab
/-- Compiles the contents of a Lean file. -/
def compileDocument (h : FS.Stream) (uri : DocumentUri) (version : Nat) (text : FileMap) : IO EditableDocument := do
headerSnap ← Snapshots.compileHeader text.source;
task ← ElabTask.run h uri version text headerSnap;
let docOut : EditableDocument := ⟨version, text, task⟩;
pure docOut
/-- Given `changePos`, the UTF-8 offset of a change into the pre-change source,
and the new document, updates editable doc state. -/
def updateDocument (h : FS.Stream) (uri : DocumentUri) (doc : EditableDocument) (changePos : String.Pos) (newVersion : Nat) (newText : FileMap) : IO EditableDocument := do
newSnapshots ← doc.snapshots.branchOffAt h uri newVersion newText changePos;
pure ⟨newVersion, newText, newSnapshots⟩
end EditableDocument
open EditableDocument
open IO
open Std (RBMap RBMap.empty)
open JsonRpc
structure ServerContext :=
(hIn hOut : FS.Stream)
(docRef : IO.Ref EditableDocument)
(pendingRequestsRef : IO.Ref (Array (Task (Except IO.Error Unit))))
abbrev ServerM := ReaderT ServerContext IO
def setDocument (val : EditableDocument) : ServerM Unit :=
fun st => st.docRef.set val
def getDocument : ServerM EditableDocument :=
fun st => st.docRef.get
def updatePendingRequests (map : Array (Task (Except IO.Error Unit)) → Array (Task (Except IO.Error Unit))) : ServerM Unit :=
fun st => st.pendingRequestsRef.modify map
def readLspMessage : ServerM JsonRpc.Message :=
fun st => monadLift $ readLspMessage st.hIn
def readLspRequestAs (expectedMethod : String) (α : Type*) [HasFromJson α] : ServerM (Request α) :=
fun st => monadLift $ readLspRequestAs st.hIn expectedMethod α
def readLspNotificationAs (expectedMethod : String) (α : Type*) [HasFromJson α] : ServerM α :=
fun st => monadLift $ readLspNotificationAs st.hIn expectedMethod α
def writeLspNotification {α : Type*} [HasToJson α] (method : String) (params : α) : ServerM Unit :=
fun st => monadLift $ writeLspNotification st.hOut method params
def writeLspResponse {α : Type*} [HasToJson α] (id : RequestID) (params : α) : ServerM Unit :=
fun st => monadLift $ writeLspResponse st.hOut id params
/-- Clears diagnostics for the document version 'version'. -/
-- TODO(WN): how to clear all diagnostics? Sending version 'none' doesn't seem to work
def clearDiagnostics (uri : DocumentUri) (version : Nat) : ServerM Unit :=
writeLspNotification "textDocument/publishDiagnostics"
{ uri := uri,
version? := version,
diagnostics := #[] : PublishDiagnosticsParams }
def sendDiagnostics (uri : DocumentUri) (doc : EditableDocument) (log : MessageLog)
: ServerM Unit := do
fun st => monadLift $ sendDiagnosticsCore st.hOut uri doc.version doc.text log
def openDocument (h : FS.Stream) (p : DidOpenTextDocumentParams) : IO EditableDocument := do
let doc := p.textDocument;
-- NOTE(WN): `toFileMap` marks line beginnings as immediately following
-- "\n", which should be enough to handle both LF and CRLF correctly.
-- This is because LSP always refers to characters by (line, column),
-- so if we get the line number correct it shouldn't matter that there
-- is a CR there.
let text := doc.text.toFileMap;
newDoc ← compileDocument h doc.uri doc.version text;
pure newDoc
private def replaceLspRange (text : FileMap) (r : Lsp.Range) (newText : String) : FileMap :=
let start := text.lspPosToUtf8Pos r.start;
let «end» := text.lspPosToUtf8Pos r.«end»;
let pre := text.source.extract 0 start;
let post := text.source.extract «end» text.source.bsize;
(pre ++ newText ++ post).toFileMap
def handleDidChange (p : DidChangeTextDocumentParams) : ServerM Unit := do
let docId := p.textDocument;
let changes := p.contentChanges;
oldDoc ← getDocument;
some newVersion ← pure docId.version? | throw (userError "expected version number");
if newVersion <= oldDoc.version then do
throw (userError "got outdated version number")
else changes.forM $ fun change =>
match change with
| TextDocumentContentChangeEvent.rangeChange (range : Range) (newText : String) => do
-- Clients don't have to clear diagnostics, so we clear them
-- for the *previous* version here.
clearDiagnostics docId.uri oldDoc.version;
let startOff := oldDoc.text.lspPosToUtf8Pos range.start;
let newDocText := replaceLspRange oldDoc.text range newText;
st ← read;
newDoc ← monadLift $
updateDocument st.hOut docId.uri oldDoc startOff newVersion newDocText;
setDocument newDoc
| TextDocumentContentChangeEvent.fullChange (text : String) =>
throw (userError "TODO impl computing the diff of two sources.")
-- TODO(MH): requests that need data from a certain command should traverse e.snapshots
-- by successively getting the next task, meaning that we might need to wait for elaboration.
-- Sebastian said something about a future function TaskIO.bind that ensures that the
-- request task will also stop waiting when the reference to the task is released by handleDidChange.
-- when that happens, the request should send a "content changed" error to the user.
-- (this way, the server doesn't get bogged down in requests for an old state of the document)
def handleHover (p : HoverParams) (e : EditableDocument) : IO Unit := pure ⟨⟩
def parseParams (paramType : Type*) [HasFromJson paramType] (params : Json) : ServerM paramType :=
match fromJson? params with
| some parsed => pure parsed
| none => throw (userError "got param with wrong structure")
def handleNotification (method : String) (params : Json) : ServerM Unit := do
let h := (fun paramType [HasFromJson paramType] (handler : paramType → ServerM Unit) =>
parseParams paramType params >>= handler);
match method with
| "textDocument/didChange" => h DidChangeTextDocumentParams handleDidChange
| "$/cancelRequest" => pure () -- TODO when we're async
| _ => throw (userError "got unsupported notification method")
def queueRequest {α : Type*} (id : RequestID) (handler : α → EditableDocument → IO Unit) (params : α) : ServerM Unit := do
doc ← getDocument;
requestTask ← monadLift $ asTask (handler params doc);
updatePendingRequests (fun pendingRequests => Array.push pendingRequests requestTask)
def handleRequest (id : RequestID) (method : String) (params : Json)
: ServerM Unit := do
let h := (fun paramType [HasFromJson paramType]
(handler : paramType → EditableDocument → IO Unit) =>
parseParams paramType params >>= queueRequest id handler);
match method with
| "textDocument/hover" => h HoverParams handleHover
| _ => throw (userError $ "got unsupported request: " ++ method ++
"; params: " ++ toString params)
partial def mainLoop : Unit → ServerM Unit
| () => do
-- TODO(MH): gracefully terminate when stdin is closed by watchdog?
msg ← readLspMessage;
-- TODO(MH): updatePendingRequests ...: get rid of all requests that are finished
match msg with
| Message.request id method (some params) => do
handleRequest id method (toJson params);
mainLoop ()
| Message.notification method (some params) => do
handleNotification method (toJson params);
mainLoop ()
| _ => throw (userError "got invalid JSON-RPC message")
def initAndRunServer (i o : FS.Stream) : IO Unit := do
-- ignore InitializeParams for MWE
initRequest ← Lsp.readLspRequestAs i "initialize" InitializeParams;
docRequest ← Lsp.readLspRequestAs i "textDocument/didOpen" DidOpenTextDocumentParams;
doc ← openDocument o docRequest.param;
docRef ← IO.mkRef doc;
pendingRequestsRef ← IO.mkRef #[];
runReader (mainLoop ()) (⟨i, o, docRef, pendingRequestsRef⟩ : ServerContext)
end Server
end Lean

View file

@ -0,0 +1,47 @@
/-
Copyright (c) 2020 Marc Huisinga. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Marc Huisinga
-/
import Lean.Data.Lsp
namespace Lean
namespace Lsp
class HasFileSource (α : Type*) :=
(fileSource : α → DocumentUri)
export HasFileSource (fileSource)
instance Location.hasFileSource : HasFileSource Location :=
⟨fun l => l.uri⟩
instance TextDocumentIdentifier.hasFileSource : HasFileSource TextDocumentIdentifier :=
⟨fun i => i.uri⟩
instance VersionedTextDocumentIdentifier.hasFileSource : HasFileSource VersionedTextDocumentIdentifier :=
⟨fun i => i.uri⟩
instance TextDocumentEdit.hasFileSource : HasFileSource TextDocumentEdit :=
⟨fun e => fileSource e.textDocument⟩
instance TextDocumentItem.hasFileSource : HasFileSource TextDocumentItem :=
⟨fun i => i.uri⟩
instance TextDocumentPositionParams.hasFileSource : HasFileSource TextDocumentPositionParams :=
⟨fun p => fileSource p.textDocument⟩
instance DidOpenTextDocumentParams.hasFileSource : HasFileSource DidOpenTextDocumentParams :=
⟨fun p => fileSource p.textDocument⟩
instance DidChangeTextDocumentParams.hasFileSource : HasFileSource DidChangeTextDocumentParams :=
⟨fun p => fileSource p.textDocument⟩
instance DidCloseTextDocumentParams.hasFileSource : HasFileSource DidCloseTextDocumentParams :=
⟨fun p => fileSource p.textDocument⟩
instance HoverParams.hasFileSource : HasFileSource HoverParams :=
⟨fun h => fileSource h.toTextDocumentPositionParams⟩
end Lsp
end Lean

View file

@ -0,0 +1,311 @@
/-
Copyright (c) 2020 Marc Huisinga. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Marc Huisinga, Wojciech Nawrocki
-/
import Init.System.IO
import Std.Data.RBMap
import Lean.Elab.Import
import Lean.Data.Lsp
import Lean.Server.HasFileSource
namespace Lean
namespace Server
/-
The server architecture consists of a watchdog process that communicates with the user
and one file worker process for each open file.
This is because processing the header of a file creates objects that need to be freed manually.
Unfortunately, it is very difficult to ensure that these objects are not still in use by some user code,
potentially leading to segfaults when freeing these objects.
To contain this issue, each open file is processed by one dedicated process.
When the header is mutated, the process is killed and restarted by the watchdog process.
Lean elaboration can also be very expensive due to the tactic framework essentially allowing for arbitrary user
programs: If the user code for one file causes a stack overflow, we would not want the entire server to die.
Hence, isolating user code at a file-level and potentially restarting file worker processes upon error has the added
benefit of increased stability.
To communicate the file state to the file worker upon restarting, the watchdog needs to maintain
the current state of the file, which it can also use to detect changes to the header and thus terminate
the corresponding file worker.
The watchdog process and its file worker processes communicate via LSP. Most requests and notifications
are forwarded to the corresponding file worker process, with the exception of these notifications:
- didOpen: launch the file worker, create the associated watchdog state and launch a task to forward
incoming packets from the watchdog (e.g. request responses).
- didChange: update the local file state. if the header was mutated,
signal a shutdown to the file worker by closing the I/O channels and restart the file worker.
otherwise, forward the didChange notification.
- didClose: signal a shutdown to the file worker and remove the associated watchdog state.
Writes to Lean I/O channels are atomic, and hence we do not need additional synchronization for the tasks
that read file worker responses.
While the communication between the watchdog and its file workers uses LSP packets, it does not implement the
full protocol:
- Upon starting, the initialize request is forwarded to the file worker, but it must not respond with its server capabilities.
Consequently, the watchdog will not send an initialized notification to the file worker.
- After initialize, the watch dog sends the corresponding didOpen notification with the full current state of the file.
No additional didOpen notifications will be forwarded to the file worker process.
- File workers will never receive a shutdown request or an exit notification. File workers are always terminated
by closing their I/O channels. Similarly, they never receive a didClose notification.
-/
open IO
open Std (RBMap RBMap.empty)
open Lsp
open JsonRpc
private def replaceLspRange (text : FileMap) (r : Lsp.Range) (newText : String) : FileMap :=
let start := text.lspPosToUtf8Pos r.start;
let «end» := text.lspPosToUtf8Pos r.«end»;
let pre := text.source.extract 0 start;
let post := text.source.extract «end» text.source.bsize;
(pre ++ newText ++ post).toFileMap
def parsedImportsEndPos (input : String) : IO String.Pos := do
env ← mkEmptyEnvironment;
let fileName := "<input>";
let inputCtx := Parser.mkInputContext input fileName;
match Parser.parseHeader env inputCtx with
| (header, parserState, messages) => do
pure parserState.pos
structure EditableDocument :=
(version : Nat)
(text : FileMap)
(headerEndPos : String.Pos)
def workerCfg : Process.StdioConfig := ⟨Process.Stdio.piped, Process.Stdio.piped, Process.Stdio.piped⟩
structure FileWorker :=
(doc : EditableDocument)
(proc : Process.Child workerCfg)
namespace FileWorker
def spawnArgs : Process.SpawnArgs := {workerCfg with cmd := "fileworker"}
def spawn (doc : EditableDocument) : IO FileWorker := do
proc ← Process.spawn spawnArgs;
pure ⟨doc, proc⟩
def stdin (fw : FileWorker) : FS.Stream :=
FS.Stream.ofHandle fw.proc.stdin
def stdout (fw : FileWorker) : FS.Stream :=
FS.Stream.ofHandle fw.proc.stdout
def stderr (fw : FileWorker) : FS.Stream :=
FS.Stream.ofHandle fw.proc.stderr
def wait (w : FileWorker) : IO Nat := pure 0
end FileWorker
abbrev FileWorkerMap := RBMap DocumentUri FileWorker (fun a b => Decidable.decide (a < b))
structure ServerContext :=
(hIn hOut : FS.Stream)
(fileWorkersRef : IO.Ref FileWorkerMap)
(initParams : InitializeParams)
abbrev ServerM := ReaderT ServerContext IO
def updateFileWorkers (key : DocumentUri) (val : FileWorker) : ServerM Unit :=
fun st => st.fileWorkersRef.modify (fun fileWorkers => fileWorkers.insert key val)
def findFileWorker (key : DocumentUri) : ServerM FileWorker :=
fun st => do
fileWorkers ← st.fileWorkersRef.get;
match fileWorkers.find? key with
| some fw => pure fw
| none => throw (userError $ "got unknown document URI (" ++ key ++ ")")
def readWorkerLspMessage (key : DocumentUri) : ServerM JsonRpc.Message :=
findFileWorker key >>= fun fw => monadLift $ readLspMessage fw.stdout
def readUserLspMessage : ServerM JsonRpc.Message :=
fun st => monadLift $ readLspMessage st.hIn
def readWorkerLspRequestAs (key : DocumentUri) (expectedMethod : String) (α : Type*) [HasFromJson α] : ServerM (Request α) :=
findFileWorker key >>= fun fw => monadLift $ readLspRequestAs fw.stdout expectedMethod α
def readUserLspRequestAs (expectedMethod : String) (α : Type*) [HasFromJson α] : ServerM (Request α) :=
fun st => monadLift $ readLspRequestAs st.hIn expectedMethod α
def readWorkerLspNotificationAs (key : DocumentUri) (expectedMethod : String) (α : Type*) [HasFromJson α] : ServerM α :=
findFileWorker key >>= fun fw => monadLift $ readLspNotificationAs fw.stdout expectedMethod α
def readUserLspNotificationAs (expectedMethod : String) (α : Type*) [HasFromJson α] : ServerM α :=
fun st => monadLift $ readLspNotificationAs st.hIn expectedMethod α
def writeWorkerLspMessage (key : DocumentUri) (msg : JsonRpc.Message) : ServerM Unit :=
findFileWorker key >>= fun fw => monadLift $ writeLspMessage fw.stdin msg
def writeUserLspMessage (msg : JsonRpc.Message) : ServerM Unit :=
fun st => monadLift $ writeLspMessage st.hOut msg
def writeWorkerLspRequest {α : Type*} [HasToJson α] (key : DocumentUri) (id : RequestID) (method : String) (params : α) : ServerM Unit :=
findFileWorker key >>= fun fw => monadLift $ writeLspRequest fw.stdin id method params
def writeUserLspRequest {α : Type*} [HasToJson α] (id : RequestID) (method : String) (params : α) : ServerM Unit :=
fun st => monadLift $ writeLspRequest st.hOut id method params
def writeWorkerLspNotification {α : Type*} [HasToJson α] (key : DocumentUri) (method : String) (params : α) : ServerM Unit :=
findFileWorker key >>= fun fw => monadLift $ writeLspNotification fw.stdin method params
def writeUserLspNotification {α : Type*} [HasToJson α] (method : String) (params : α) : ServerM Unit :=
fun st => monadLift $ writeLspNotification st.hOut method params
def writeWorkerLspResponse {α : Type*} [HasToJson α] (key : DocumentUri) (id : RequestID) (params : α) : ServerM Unit :=
findFileWorker key >>= fun fw => monadLift $ writeLspResponse fw.stdin id params
def writeUserLspResponse {α : Type*} [HasToJson α] (id : RequestID) (params : α) : ServerM Unit :=
fun st => monadLift $ writeLspResponse st.hOut id params
def writeWorkerInitializeParams (key : DocumentUri) : ServerM Unit := do
st ← read;
writeWorkerLspRequest key (0 : Nat) "initialize" st.initParams
def writeWorkerDidOpenNotification (key : DocumentUri) : ServerM Unit := do
findFileWorker key >>= fun fw => writeWorkerLspNotification key "textDocument/didOpen"
(DidOpenTextDocumentParams.mk ⟨key, "lean", fw.doc.version, fw.doc.text.source⟩)
def parseParams (paramType : Type*) [HasFromJson paramType] (params : Json) : ServerM paramType :=
match fromJson? params with
| some parsed => pure parsed
| none => throw (userError "got param with wrong structure")
-- NOTE(MH): forwardFileWorkerPackets needs to take a FileWorker, not a DocumentUri.
-- otherwise, it might occur that we update the list of file workers on the main task,
-- possibly yielding a race condition.
partial def forwardFileWorkerPackets (fw : FileWorker) : Unit → ServerM Unit
| ⟨⟩ => do
-- TODO(MH): detect closed stream somehow and terminate gracefully
-- TODO(MH): potentially catch unintended termination (e.g. due to stack overflow) and restart process
msg ← monadLift $ readLspMessage fw.stdout;
writeUserLspMessage msg;
forwardFileWorkerPackets ⟨⟩
def startFileWorker (key : DocumentUri) (version : Nat) (text : FileMap) : ServerM Unit := do
pos ← monadLift $ parsedImportsEndPos text.source;
fw ← monadLift $ FileWorker.spawn ⟨version, text, pos⟩;
updateFileWorkers key fw;
writeWorkerInitializeParams key;
writeWorkerDidOpenNotification key;
-- TODO(MH): replace with working IO variant
-- TODO(MH): Sebastian said something about this better being implemented as threads
-- (due to the long running nature of these tasks) but i did not yet have time to
-- look into this.
let _ := Task.mk (forwardFileWorkerPackets fw);
pure ⟨⟩
-- TODO(MH)
def terminateFileWorker (fw : FileWorker) : ServerM Unit := pure ()
def handleDidOpen (p : DidOpenTextDocumentParams) : ServerM Unit :=
let doc := p.textDocument;
-- NOTE(WN): `toFileMap` marks line beginnings as immediately following
-- "\n", which should be enough to handle both LF and CRLF correctly.
-- This is because LSP always refers to characters by (line, column),
-- so if we get the line number correct it shouldn't matter that there
-- is a CR there.
startFileWorker doc.uri doc.version doc.text.toFileMap
def handleDidChange (p : DidChangeTextDocumentParams) : ServerM Unit := do
let doc := p.textDocument;
let changes := p.contentChanges;
fw ← findFileWorker doc.uri;
let oldDoc := fw.doc;
some newVersion ← pure doc.version? | throw (userError "expected version number");
if newVersion <= oldDoc.version then do
throw (userError "got outdated version number")
else changes.forM $ fun change =>
match change with
| TextDocumentContentChangeEvent.rangeChange (range : Range) (newText : String) => do
let startOff := oldDoc.text.lspPosToUtf8Pos range.start;
let newDocText := replaceLspRange oldDoc.text range newText;
let oldHeaderEndPos := oldDoc.headerEndPos;
if startOff < oldHeaderEndPos then do
terminateFileWorker fw;
startFileWorker doc.uri newVersion newDocText
else
let newDoc : EditableDocument := ⟨newVersion, newDocText, oldHeaderEndPos⟩;
updateFileWorkers doc.uri { fw with doc := newDoc };
writeWorkerLspNotification doc.uri "textDocument/didChange" p
| TextDocumentContentChangeEvent.fullChange (text : String) =>
throw (userError "TODO impl computing the diff of two sources.")
def handleDidClose (p : DidCloseTextDocumentParams) : ServerM Unit := do
let doc := p.textDocument;
fw ← findFileWorker doc.uri;
terminateFileWorker fw;
st ← read;
st.fileWorkersRef.modify (fun fileWorkers => fileWorkers.erase doc.uri)
def handleRequest (id : RequestID) (method : String) (params : Json) : ServerM Unit := do
let h := (fun α [HasFromJson α] [HasToJson α] [HasFileSource α] => do
parsedParams ← parseParams α params;
writeWorkerLspRequest (fileSource parsedParams) id method parsedParams);
match method with
| "textDocument/hover" => h HoverParams
| _ => throw (userError $ "got unsupported request: " ++ method ++
"; params: " ++ toString params)
def handleNotification (method : String) (params : Json) : ServerM Unit := do
let forward := (fun α [HasFromJson α] [HasToJson α] [HasFileSource α] => do
parsedParams ← parseParams α params;
writeWorkerLspNotification (fileSource parsedParams) method parsedParams);
let handle := (fun α [HasFromJson α] (handler : α → ServerM Unit) => parseParams α params >>= handler);
match method with
| "textDocument/didOpen" => handle DidOpenTextDocumentParams handleDidOpen
| "textDocument/didChange" => handle DidChangeTextDocumentParams handleDidChange
| "textDocument/didClose" => handle DidCloseTextDocumentParams handleDidClose
| "$/cancelRequest" => pure () -- TODO when we're async
| _ => throw (userError "got unsupported notification method")
partial def mainLoop : Unit → ServerM Unit
| () => do
msg ← readUserLspMessage;
match msg with
| Message.request id "shutdown" _ =>
writeUserLspResponse id (Json.null)
| Message.request id method (some params) => do
handleRequest id method (toJson params);
mainLoop ()
| Message.notification method (some params) => do
handleNotification method (toJson params);
mainLoop ()
| _ => throw (userError "got invalid JSON-RPC message")
def mkLeanServerCapabilities : ServerCapabilities :=
{ textDocumentSync? := some
{ openClose := true,
change := TextDocumentSyncKind.incremental,
willSave := false,
willSaveWaitUntil := false,
save? := none },
hoverProvider := true }
def initAndRunServer (i o : FS.Stream) : IO Unit := do
fileWorkersRef ← IO.mkRef (RBMap.empty : FileWorkerMap);
-- ignore InitializeParams for MWE
initRequest ← readLspRequestAs i "initialize" InitializeParams;
runReader
(do
writeUserLspResponse initRequest.id
{ capabilities := mkLeanServerCapabilities,
serverInfo? := some { name := "Lean 4 server",
version? := "0.0.1" } : InitializeResult };
_ ← readUserLspNotificationAs "initialized" InitializedParams;
mainLoop ();
Message.notification "exit" none ← readUserLspMessage
| throw (userError "Expected an Exit Notification.");
pure ())
(⟨i, o, fileWorkersRef, initRequest.param⟩ : ServerContext)
end Server
end Lean