feat: add pending requests tracking

This commit is contained in:
Marc Huisinga 2020-10-02 19:30:35 +02:00 committed by Sebastian Ullrich
parent 1f91f1e194
commit c83499a54e
3 changed files with 108 additions and 19 deletions

View file

@ -40,6 +40,56 @@ private partial def countDigitsAux (n digits : Nat) : Nat :=
private def countDigits (n : Nat) : Nat :=
countDigitsAux n 1
-- convert mantissa * 10^-exponent to 0.mantissa * 10^exponent
protected def normalize : JsonNumber → Int × Nat × Int
| ⟨m, e⟩ =>
if m = 0 then (0, 0, 0)
else
let sign : Int := if m > 0 then 1 else -1;
let mAbs := m.natAbs;
let nDigits := countDigits mAbs;
-- eliminate trailing zeros
let m' := (List.range nDigits).foldr
(fun _ acc => if acc % 10 = 0 then acc / 10 else acc)
mAbs;
(sign, m', -e + nDigits)
def lt (a b : JsonNumber) : Bool :=
let (as, am, ae) := a.normalize;
let (bs, bm, be) := b.normalize;
match (as, bs) with
| (-1, 1) => true
| (1, -1) => false
| _ =>
let ((am, ae), (bm, be)) :=
if as = -1 && bs = -1 then
((bm, be), (am, ae))
else
((am, ae), (bm, be));
let amDigits := countDigits am;
let bmDigits := countDigits bm;
-- align the mantissas
let (am, bm) :=
if amDigits < bmDigits then
(am * 10^(bmDigits - amDigits), bm)
else
(am, bm * 10^(amDigits - bmDigits));
if ae < be then true
else if ae > be then false
else am < bm
def ltProp : HasLess JsonNumber :=
⟨fun a b => lt a b = true⟩
instance hasLess : HasLess JsonNumber :=
ltProp
instance hasDecidableLess : DecidableRel (@HasLess.Less JsonNumber ltProp) :=
inferInstanceAs (DecidableRel (fun a b => lt a b = true))
protected def fromNat (n : Nat) : JsonNumber := ⟨n, 0⟩
protected def fromInt (n : Int) : JsonNumber := ⟨n, 0⟩
protected def toString : JsonNumber → String
| ⟨m, 0⟩ => m.repr
| ⟨m, e⟩ =>

View file

@ -95,6 +95,23 @@ structure Error where
instance : Coe String RequestID := ⟨RequestID.str⟩
instance : Coe JsonNumber RequestID := ⟨RequestID.num⟩
private def RequestID.lt : RequestID → RequestID → Bool
| RequestID.str a, RequestID.str b => a < b
| RequestID.num a, RequestID.num b => a < b
| RequestID.null, RequestID.num _ => true
| RequestID.null, RequestID.str _ => true
| RequestID.num _, RequestID.str _ => true
| _, _ /- str < *, num < null, null < null -/ => false
private def RequestID.ltProp : Less RequestID :=
⟨fun a b => RequestID.lt a b = true⟩
instance : Less RequestID :=
RequestID.ltProp
instance : DecidableRel (@Less.Less RequestID RequestID.ltProp) :=
inferInstanceAs (DecidableRel (fun a b => RequestID.lt a b = true))
instance : FromJson RequestID := ⟨fun j =>
match j with
| str s => RequestID.str s

View file

@ -86,11 +86,15 @@ inductive WorkerState
| crashed (queuedMsgs : Array JsonRpc.Message)
| running
abbrev PendingRequestMap := RBMap RequestID JsonRpc.Message (fun a b => Decidable.decide (a < b))
structure FileWorker :=
(doc : OpenDocument)
(proc : Process.Child workerCfg)
(commTask : Task WorkerEvent)
(state : WorkerState)
-- NOTE: this should not be mutated outside of namespace FileWorker
(pendingRequestsRef : IO.Ref PendingRequestMap)
namespace FileWorker
@ -103,6 +107,27 @@ FS.Stream.ofHandle fw.proc.stdout
def stderr (fw : FileWorker) : FS.Stream :=
FS.Stream.ofHandle fw.proc.stderr
variables {m : Type → Type} [Monad m] [MonadIO m]
def readMessage (fw : FileWorker) : m JsonRpc.Message := do
msg ← readLspMessage fw.stdin;
match msg with
| Message.request id method params? =>
liftIO $ fw.pendingRequestsRef.modify (fun pendingRequests => pendingRequests.erase id)
| _ => pure ();
pure msg
def writeMessage (fw : FileWorker) (msg : JsonRpc.Message) : m Unit :=
writeLspMessage fw.stdin msg
def writeNotification {α : Type*} [HasToJson α] (fw : FileWorker) (method : String) (param : α) : m Unit :=
writeLspNotification fw.stdin method param
def writeRequest {α : Type*} [HasToJson α] (fw : FileWorker) (id : RequestID) (method : String) (param : α) : m Unit := do
writeLspRequest fw.stdin id method param;
liftIO $ fw.pendingRequestsRef.modify (fun pendingRequests =>
pendingRequests.insert id (Message.request id method (fromJson? (toJson param))))
end FileWorker
abbrev FileWorkerMap := RBMap DocumentUri FileWorker (fun a b => Decidable.decide (a < b))
@ -129,32 +154,27 @@ match fileWorkers.find? uri with
def eraseFileWorker (uri : DocumentUri) : ServerM Unit :=
fun st => st.fileWorkersRef.modify (fun fileWorkers => fileWorkers.erase uri)
-- NOTE: not atomic
def modifyFileWorker (uri : DocumentUri) (f : FileWorker → FileWorker) : ServerM Unit := do
fw ← findFileWorker uri;
updateFileWorkers uri (f fw)
-- TODO: this creates a long-running Task, which should be okay with upcoming API changes.
partial def fwdMsgAux (workerProc : Process.Child workerCfg) (hWrk : FS.Stream) (hOut : FS.Stream) : Unit → IO WorkerEvent
partial def fwdMsgAux (fw : FileWorker) (hOut : FS.Stream) : Unit → IO WorkerEvent
| ⟨⟩ => catch
(do
msg ← readLspMessage hWrk;
msg ← fw.readMessage;
-- NOTE: Writes to Lean I/O channels are atomic, so these won't trample on each other.
writeLspMessage hOut msg;
fwdMsgAux ())
(fun err => do
-- NOTE: if writeLspMessage errors we will block here, but the main task will
-- quit eventually anyways if that happens
exitCode ← workerProc.wait;
exitCode ← fw.proc.wait;
pure $ if exitCode = 0 then WorkerEvent.terminated else WorkerEvent.crashed err)
/-- A Task which forwards a worker's messages into the output stream until an event
which must be handled in the main watchdog thread (e.g. an I/O error) happens. -/
def fwdMsgTask (workerProc : Process.Child workerCfg) (hWrk : FS.Stream) : ServerM (Task WorkerEvent) :=
def fwdMsgTask (fw : FileWorker) : ServerM (Task WorkerEvent) :=
fun st =>
(Task.map (fun either => match either with
| Except.ok ev => ev
| Except.error e => WorkerEvent.crashed e)) <$> (IO.asTask $ fwdMsgAux workerProc hWrk st.hOut ())
| Except.error e => WorkerEvent.crashed e)) <$> (IO.asTask $ fwdMsgAux fw st.hOut ())
private def parsedImportsEndPos (input : String) : IO String.Pos := do
emptyEnv ← mkEmptyEnvironment;
@ -167,11 +187,13 @@ st ← read;
pos ← monadLift $ parsedImportsEndPos text.source;
let doc : OpenDocument := ⟨version, text, pos⟩;
workerProc ← monadLift $ Process.spawn {workerCfg with cmd := st.workerPath};
writeLspRequest (FS.Stream.ofHandle workerProc.stdin) (0 : Nat) "initialize" st.initParams;
writeLspNotification (FS.Stream.ofHandle workerProc.stdin) "textDocument/didOpen"
(DidOpenTextDocumentParams.mk ⟨uri, "lean", version, text.source⟩);
commTask ← fwdMsgTask workerProc $ FS.Stream.ofHandle workerProc.stdout;
let fw : FileWorker := ⟨doc, workerProc, commTask, WorkerState.running⟩;
pendingRequests ← IO.mkRef (RBMap.empty : PendingRequestMap);
-- the task will never access itself, so this is fine
let commTaskFw : FileWorker := ⟨doc, workerProc, Task.pure WorkerEvent.terminated, WorkerState.running, pendingRequests⟩;
commTask ← fwdMsgTask commTaskFw;
let fw : FileWorker := {commTaskFw with commTask := commTask};
fw.writeRequest (0 : Nat) "initialize" st.initParams;
fw.writeNotification "textDocument/didOpen" (DidOpenTextDocumentParams.mk ⟨uri, "lean", version, text.source⟩);
updateFileWorkers uri fw
def terminateFileWorker (uri : DocumentUri) : ServerM Unit := do
@ -181,7 +203,7 @@ fw ← findFileWorker uri;
-- (on didClose we won't need the crashed file worker anymore
-- and when the header changed we'll start a new one right after
-- anyways)
catch (writeLspMessage fw.stdin (Message.notification "exit" none)) (fun err => pure ());
catch (fw.writeMessage (Message.notification "exit" none)) (fun err => pure ());
-- TODO(MH): error pending requests
eraseFileWorker uri
@ -201,7 +223,7 @@ newFw ← findFileWorker uri;
let tryDischargeQueuedMsgs : Array JsonRpc.Message → JsonRpc.Message → ServerM (Array JsonRpc.Message) :=
fun crashedMsgs m => catch
(do
writeLspMessage newFw.stdin m;
newFw.writeMessage m;
pure crashedMsgs)
(fun err => pure (crashedMsgs.push m));
crashedMsgs ← queuedMsgs.foldlM tryDischargeQueuedMsgs #[];
@ -261,7 +283,7 @@ else match changes.get? 0 with
| WorkerState.crashed queuedMsgs => restartCrashedFileWorker doc.uri newFw (queuedMsgs.push msg)
| WorkerState.running =>
-- looks like it crashed now!
catch (writeLspNotification fw.stdin "textDocument/didChange" p)
catch (fw.writeNotification "textDocument/didChange" p)
(fun err => handleCrash doc.uri newFw #[msg])
def handleDidClose (p : DidCloseTextDocumentParams) : ServerM Unit :=
@ -277,7 +299,7 @@ let h := (fun α [HasFromJson α] [HasToJson α] [HasFileSource α] => do
| WorkerState.crashed queuedMsgs => restartCrashedFileWorker uri fw (queuedMsgs.push msg)
| WorkerState.running => do
-- if this errors, the file worker crashed
catch (writeLspRequest fw.stdin id method parsedParams)
catch (fw.writeRequest id method parsedParams)
(fun err => handleCrash uri fw #[msg]));
match method with
| "textDocument/hover" => h HoverParams