This PR is a follow up to #8055 and implements a `Selector` for async TCP in order to allow IO multiplexing using TCP sockets. As we must not commit to actually fetching data from the socket buffer this cannot be implemented by just racing on `recv?`. Instead we perform a call to `uv_read_start` and pass an `alloc_cb` that allocates no memory at all. According to the docs of [`uv_alloc_cb`](https://docs.libuv.org/en/v1.x/handle.html#c.uv_alloc_cb) this is guaranteed to give us a `UV_ENOBUFS` in the relevant callback. Thus we can first run this "zero read" and then go into one of three cases: 1. We get cancelled before the zero read completes, in this case just cancel the zero read and give up. 2. The zero read completes and we loose the race for completing the `select`, in this case just don't do anything anymore 3. The zero read completes and we win the race for completing the `select`, in this case we perform the actual read on the socket. As we know that data is available already (since the read callback of the zero read is only triggered if data actually is available) we know that the subsequent actual read should complete right away. In this way we avoid any data loss if we loose the race.
50 lines
1.7 KiB
Text
50 lines
1.7 KiB
Text
import Std.Internal.Async.Timer
|
|
import Std.Internal.Async.TCP
|
|
|
|
open Std Internal IO Async
|
|
|
|
def testClient (addr : Net.SocketAddress) : IO (AsyncTask String) := do
|
|
let client ← TCP.Socket.Client.mk
|
|
(← client.connect addr).bindIO fun _ => do
|
|
Selectable.one #[
|
|
.case (← Selector.sleep 1000) fun _ => return AsyncTask.pure "Timeout",
|
|
.case (← client.recvSelector 4096) fun data? => do
|
|
if let some data := data? then
|
|
return AsyncTask.pure <| String.fromUTF8! data
|
|
else
|
|
return AsyncTask.pure "Closed"
|
|
]
|
|
|
|
def test (serverFn : TCP.Socket.Server → IO (AsyncTask Unit)) (addr : Net.SocketAddress) :
|
|
IO Unit := do
|
|
let server ← TCP.Socket.Server.mk
|
|
server.bind addr
|
|
server.listen 1
|
|
let serverTask ← serverFn server
|
|
let clientTask ← testClient addr
|
|
serverTask.block
|
|
IO.println (← clientTask.block)
|
|
|
|
def testServerSend (server : TCP.Socket.Server) : IO (AsyncTask Unit) := do
|
|
(← server.accept).bindIO fun client => do
|
|
client.send (String.toUTF8 "Success")
|
|
|
|
def testServerTimeout (server : TCP.Socket.Server) : IO (AsyncTask Unit) := do
|
|
(← server.accept).bindIO fun client => do
|
|
(← Async.sleep 1500).bindIO fun _ => do
|
|
client.shutdown
|
|
|
|
def testServerClose (server : TCP.Socket.Server) : IO (AsyncTask Unit) := do
|
|
(← server.accept).bindIO fun client => client.shutdown
|
|
|
|
/-- info: Success -/
|
|
#guard_msgs in
|
|
#eval test testServerSend (Net.SocketAddressV4.mk (.ofParts 127 0 0 1) 7070)
|
|
|
|
/-- info: Closed -/
|
|
#guard_msgs in
|
|
#eval test testServerClose (Net.SocketAddressV4.mk (.ofParts 127 0 0 1) 7071)
|
|
|
|
/-- info: Timeout -/
|
|
#guard_msgs in
|
|
#eval test testServerTimeout (Net.SocketAddressV4.mk (.ofParts 127 0 0 1) 7072)
|