fix: missing wakeup in bounded channel try receive (#8052)
This PR fixes a small oversight in the wakeup mechanism of blocked bounded channel senders that occurs when calling `tryRecv`. Marked as `changelog-no` as this isn't released yet.
This commit is contained in:
parent
be117c4738
commit
8e1b9abb7a
1 changed files with 9 additions and 6 deletions
|
|
@ -391,17 +391,24 @@ private def isClosed (ch : Bounded α) : BaseIO Bool :=
|
|||
return (← get).closed
|
||||
|
||||
private def tryRecv' : AtomicT (Bounded.State α) BaseIO (Option α) := do
|
||||
let st ← get
|
||||
let mut st ← get
|
||||
if st.bufCount == 0 then
|
||||
return none
|
||||
else
|
||||
let val ← st.buf[st.recvIdx]'st.hrecv |>.swap none
|
||||
let nextRecvIdx := incMod st.recvIdx st.capacity
|
||||
set { st with
|
||||
st := { st with
|
||||
bufCount := st.bufCount - 1
|
||||
recvIdx := nextRecvIdx,
|
||||
hrecv := incMod_lt st.hrecv
|
||||
}
|
||||
|
||||
if let some (producer, producers) := st.producers.dequeue? then
|
||||
producer.resolve true
|
||||
st := { st with producers }
|
||||
|
||||
set st
|
||||
|
||||
return val
|
||||
|
||||
private def tryRecv (ch : Bounded α) : BaseIO (Option α) :=
|
||||
|
|
@ -411,10 +418,6 @@ private def tryRecv (ch : Bounded α) : BaseIO (Option α) :=
|
|||
private partial def recv (ch : Bounded α) : BaseIO (Task (Option α)) := do
|
||||
ch.state.atomically do
|
||||
if let some val ← tryRecv' then
|
||||
let st ← get
|
||||
if let some (producer, producers) := (← get).producers.dequeue? then
|
||||
producer.resolve true
|
||||
set { st with producers }
|
||||
return .pure <| some val
|
||||
else if (← get).closed then
|
||||
return .pure none
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue