feat: UDP socket support using LibUV (#7574)

This PR introduces UDP socket support using the LibUV library, enabling
asynchronous I/O operations with it.
This commit is contained in:
Sofia Rodrigues 2025-03-26 12:04:25 -03:00 committed by GitHub
parent 149b6423f8
commit 74b1c29a48
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 911 additions and 1 deletions

View file

@ -7,3 +7,4 @@ prelude
import Std.Internal.Async.Basic
import Std.Internal.Async.Timer
import Std.Internal.Async.TCP
import Std.Internal.Async.UDP

View file

@ -0,0 +1,144 @@
/-
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Sofia Rodrigues
-/
prelude
import Std.Time
import Std.Internal.UV
import Std.Internal.Async.Basic
import Std.Net.Addr
namespace Std
namespace Internal
namespace IO
namespace Async
namespace UDP
open Std.Net
/--
Represents a UDP socket.
-/
structure Socket where
private ofNative ::
native : Internal.UV.UDP.Socket
/--
Membership type for multicast operations.
-/
inductive Membership
| leaveGroup
| enterGroup
namespace Socket
/--
Creates a new UDP socket.
-/
@[inline]
def mk : IO Socket := do
let native ← Internal.UV.UDP.Socket.new
return Socket.ofNative native
/--
Binds the UDP socket to the given address. Address reuse is enabled to allow rebinding the
same address.
-/
@[inline]
def bind (s : Socket) (addr : SocketAddress) : IO Unit :=
s.native.bind addr
/--
Associates the UDP socket with the given address and port, so every message sent by this socket is
automatically sent to that destination.
-/
@[inline]
def connect (s : Socket) (addr : SocketAddress) : IO Unit :=
s.native.connect addr
/--
Sends data through an UDP socket. The `addr` parameter specifies the destination address. If `addr`
is `none`, the data is sent to the default peer address set by `connect`.
-/
@[inline]
def send (s : Socket) (data : ByteArray) (addr : Option SocketAddress := none) : IO (AsyncTask Unit) :=
AsyncTask.ofPromise <$> s.native.send data addr
/--
Receives data from an UDP socket. `size` is for the maximum bytes to receive.
The promise resolves when some data is available or an error occurs. If the socket
has not been previously bound with `bind`, it is automatically bound to `0.0.0.0`
(all interfaces) with a random port.
-/
@[inline]
def recv (s : Socket) (size : UInt64) : IO (AsyncTask (ByteArray × Option SocketAddress)) :=
AsyncTask.ofPromise <$> s.native.recv size
/--
Gets the local address of the UDP socket.
-/
@[inline]
def getSockName (s : Socket) : IO SocketAddress :=
s.native.getSockName
/--
Gets the remote address of the UDP socket. On unconnected handles, it throws the `.invalidArgument`.
error.
-/
@[inline]
def getPeerName (s : Socket) : IO SocketAddress :=
s.native.getPeerName
/--
Enables or disables broadcasting for the UDP socket.
-/
@[inline]
def setBroadcast (s : Socket) (enable : Bool) : IO Unit :=
s.native.setBroadcast enable
/--
Enables or disables multicast loopback for the UDP socket.
-/
@[inline]
def setMulticastLoop (s : Socket) (enable : Bool) : IO Unit :=
s.native.setMulticastLoop enable
/--
Sets the time-to-live (TTL) for multicast packets.
-/
@[inline]
def setMulticastTTL (s : Socket) (ttl : UInt32) : IO Unit :=
s.native.setMulticastTTL ttl
/--
Sets the membership for joining or leaving a multicast group.
-/
@[inline]
def setMembership (s : Socket) (multicastAddr : IPAddr) (interfaceAddr : Option IPAddr) (membership : Membership) : IO Unit :=
let membership := match membership with
| .leaveGroup => 0
| .enterGroup => 1
s.native.setMembership multicastAddr interfaceAddr membership
/--
Sets the multicast interface for sending packets.
-/
@[inline]
def setMulticastInterface (s : Socket) (interfaceAddr : IPAddr) : IO Unit :=
s.native.setMulticastInterface interfaceAddr
/--
Sets the TTL for outgoing packets.
-/
@[inline]
def setTTL (s : Socket) (ttl : UInt32) : IO Unit :=
s.native.setTTL ttl
end Socket
end UDP
end Async
end IO
end Internal
end Std

View file

@ -10,3 +10,4 @@ import Init.System.Promise
import Std.Internal.UV.Loop
import Std.Internal.UV.Timer
import Std.Internal.UV.TCP
import Std.Internal.UV.UDP

View file

@ -0,0 +1,118 @@
/-
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Sofia Rodrigues
-/
prelude
import Init.System.IO
import Init.System.Promise
import Std.Net
namespace Std
namespace Internal
namespace UV
namespace UDP
open Std.Net
private opaque SocketImpl : NonemptyType.{0}
/--
Represents a UDP socket.
-/
def Socket : Type := SocketImpl.type
instance : Nonempty Socket := SocketImpl.property
namespace Socket
/--
Creates a new UDP socket.
-/
@[extern "lean_uv_udp_new"]
opaque new : IO Socket
/--
Binds an UDP socket to a specific address. Address reuse is enabled to allow rebinding the
same address.
-/
@[extern "lean_uv_udp_bind"]
opaque bind (socket : @& Socket) (addr : @& SocketAddress) : IO Unit
/--
Associates the UDP socket with the given address and port, so every message sent by this socket is
automatically sent to that destination.
-/
@[extern "lean_uv_udp_connect"]
opaque connect (socket : @& Socket) (addr : @& SocketAddress) : IO Unit
/--
Sends data through an UDP socket. The `addr` parameter specifies the destination address. If `addr`
is `none`, the data is sent to the default peer address set by `connect`.
-/
@[extern "lean_uv_udp_send"]
opaque send (socket : @& Socket) (data : ByteArray) (addr : @& Option SocketAddress) : IO (IO.Promise (Except IO.Error Unit))
/--
Receives data from an UDP socket. `size` is for the maximum bytes to receive. The promise
resolves when some data is available or an error occurs.
-/
@[extern "lean_uv_udp_recv"]
opaque recv (socket : @& Socket) (size : UInt64) : IO (IO.Promise (Except IO.Error (ByteArray × Option SocketAddress)))
/--
Receives data from an UDP socket. `size` is for the maximum bytes to receive. The promise resolves
when some data is available or an error occurs. If the socket has not been previously bound with `bind`,
it is automatically bound to `0.0.0.0` (all interfaces) with a random port.
-/
@[extern "lean_uv_udp_getpeername"]
opaque getPeerName (socket : @& Socket) : IO SocketAddress
/--
Gets the local address of a bound UDP socket.
-/
@[extern "lean_uv_udp_getsockname"]
opaque getSockName (socket : @& Socket) : IO SocketAddress
/--
Enables or disables broadcasting on a UDP socket.
-/
@[extern "lean_uv_udp_set_broadcast"]
opaque setBroadcast (socket : @& Socket) (on : Bool) : IO Unit
/--
Enables or disables multicast loopback for a UDP socket.
-/
@[extern "lean_uv_udp_set_multicast_loop"]
opaque setMulticastLoop (socket : @& Socket) (on : Bool) : IO Unit
/--
Sets the time-to-live (TTL) value for multicast packets.
-/
@[extern "lean_uv_udp_set_multicast_ttl"]
opaque setMulticastTTL (socket : @& Socket) (ttl : UInt32) : IO Unit
/--
Sets the membership for joining or leaving a multicast group. If `interfaceAddr` is `none`, the
default network interface is used.
-/
@[extern "lean_uv_udp_set_membership"]
opaque setMembership (socket : @& Socket) (multicastAddr : @& IPAddr) (interfaceAddr : @& Option IPAddr) (membership : UInt8) : IO Unit
/--
Sets the multicast interface for sending packets.
-/
@[extern "lean_uv_udp_set_multicast_interface"]
opaque setMulticastInterface (socket : @& Socket) (interfaceAddr : @& IPAddr) : IO Unit
/--
Sets the TTL value for outgoing packets.
-/
@[extern "lean_uv_udp_set_ttl"]
opaque setTTL (socket : @& Socket) (ttl : UInt32) : IO Unit
end Socket
end UDP
end UV
end Internal
end Std

View file

@ -3,7 +3,7 @@ object.cpp apply.cpp exception.cpp interrupt.cpp memory.cpp
stackinfo.cpp compact.cpp init_module.cpp io.cpp hash.cpp
platform.cpp alloc.cpp allocprof.cpp sharecommon.cpp stack_overflow.cpp
process.cpp object_ref.cpp mpn.cpp mutex.cpp libuv.cpp uv/net_addr.cpp uv/event_loop.cpp
uv/timer.cpp uv/tcp.cpp)
uv/timer.cpp uv/tcp.cpp uv/udp.cpp)
add_library(leanrt_initial-exec STATIC ${RUNTIME_OBJS})
set_target_properties(leanrt_initial-exec PROPERTIES
ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})

View file

@ -16,6 +16,7 @@ namespace lean {
extern "C" void initialize_libuv() {
initialize_libuv_timer();
initialize_libuv_tcp_socket();
initialize_libuv_udp_socket();
initialize_libuv_loop();
lthread([]() { event_loop_run_loop(&global_ev); });

View file

@ -9,6 +9,7 @@ Author: Markus Himmel, Sofia Rodrigues
#include "runtime/uv/event_loop.h"
#include "runtime/uv/timer.h"
#include "runtime/uv/tcp.h"
#include "runtime/uv/udp.h"
#include "runtime/alloc.h"
#include "runtime/io.h"
#include "runtime/utf8.h"

View file

@ -28,6 +28,28 @@ void lean_ipv6_addr_to_in6_addr(b_obj_arg ipv6_addr, in6_addr* out) {
}
}
void lean_ip_addr_to_in_addr_storage(b_obj_arg ip_addr, int* type, in_addr_storage* out) {
lean_object* ip_obj = lean_ctor_get(ip_addr, 0);
if (lean_ptr_tag(ip_addr) == 0) {
lean_ipv4_addr_to_in_addr(ip_obj, (in_addr*) out);
*type = AF_INET;
} else {
lean_ipv6_addr_to_in6_addr(ip_obj, (in6_addr*) out);
*type = AF_INET6;
}
}
void lean_ip_addr_ntop(b_obj_arg ip_addr, char *buffer, size_t buffer_size) {
int ip_type;
in_addr_storage ip_addr_storage;
lean_ip_addr_to_in_addr_storage(ip_addr, &ip_type, &ip_addr_storage);
int ret = uv_inet_ntop(ip_type, &ip_addr_storage, buffer, buffer_size);
lean_always_assert(ret == 0);
}
void lean_socket_address_to_sockaddr_storage(b_obj_arg ip_addr, sockaddr_storage* out) {
memset(out, 0, sizeof(*out));

View file

@ -19,7 +19,9 @@ typedef union in_addr_storage {
void lean_ipv4_addr_to_in_addr(b_obj_arg ipv4_addr, struct in_addr* out);
void lean_ipv6_addr_to_in6_addr(b_obj_arg ipv6_addr, struct in6_addr* out);
void lean_ip_addr_to_in_addr_storage(b_obj_arg ip_addr, int* type, in_addr_storage* out);
void lean_socket_address_to_sockaddr_storage(b_obj_arg ip_addr, struct sockaddr_storage* out);
void lean_ip_addr_ntop(b_obj_arg ip_addr, char *buffer, size_t buffer_size);
lean_obj_res lean_phys_addr_to_mac_addr(char phys_addr[6]);
lean_obj_res lean_in_addr_to_ipv4_addr(const struct in_addr* ipv4_addr);

508
src/runtime/uv/udp.cpp Normal file
View file

@ -0,0 +1,508 @@
/*
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Author: Sofia Rodrigues
*/
#include "runtime/uv/udp.h"
#include <cstring>
namespace lean {
#ifndef LEAN_EMSCRIPTEN
// Stores all the things needed to send data to a UDP socket.
typedef struct {
lean_object *promise;
lean_object *data;
lean_object *socket;
} udp_send_data;
void lean_uv_udp_socket_finalizer(void* ptr) {
lean_uv_udp_socket_object* udp_socket = (lean_uv_udp_socket_object*)ptr;
lean_always_assert(udp_socket->m_promise_read == nullptr);
lean_always_assert(udp_socket->m_byte_array == nullptr);
/// It's changing here because the object is being freed in the finalizer, and we need the data
/// inside of it.
udp_socket->m_uv_udp->data = ptr;
event_loop_lock(&global_ev);
uv_close((uv_handle_t*)udp_socket->m_uv_udp, [](uv_handle_t* handle) {
lean_uv_udp_socket_object* udp_socket = (lean_uv_udp_socket_object*)handle->data;
free(udp_socket->m_uv_udp);
free(udp_socket);
});
event_loop_unlock(&global_ev);
}
void initialize_libuv_udp_socket() {
g_uv_udp_socket_external_class = lean_register_external_class(lean_uv_udp_socket_finalizer, [](void* obj, lean_object* f) {
lean_uv_udp_socket_object* udp_socket = (lean_uv_udp_socket_object*)obj;
if (udp_socket->m_promise_read != nullptr) {
lean_inc(f);
lean_apply_1(f, udp_socket->m_promise_read);
}
if (udp_socket->m_byte_array != nullptr) {
lean_inc(f);
lean_apply_1(f, udp_socket->m_byte_array);
}
});
}
// =======================================
// UDP Socket Operations
/* Std.Internal.UV.UDP.Socket.new : IO Socket */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_new(obj_arg /* w */) {
lean_uv_udp_socket_object* udp_socket = (lean_uv_udp_socket_object*)malloc(sizeof(lean_uv_udp_socket_object));
udp_socket->m_promise_read = nullptr;
udp_socket->m_byte_array = nullptr;
uv_udp_t* uv_udp = (uv_udp_t*)malloc(sizeof(uv_udp_t));
event_loop_lock(&global_ev);
int result = uv_udp_init(global_ev.loop, uv_udp);
event_loop_unlock(&global_ev);
if (result != 0) {
free(uv_udp);
free(udp_socket);
return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr));
}
lean_object* obj = lean_uv_udp_socket_new(udp_socket);
lean_mark_mt(obj);
udp_socket->m_uv_udp = uv_udp;
udp_socket->m_uv_udp->data = obj;
return lean_io_result_mk_ok(obj);
}
/* Std.Internal.UV.UDP.Socket.bind (socket : @& Socket) (addr : @& SocketAddress) : IO Unit */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_bind(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */) {
lean_uv_udp_socket_object* udp_socket = lean_to_uv_udp_socket(socket);
sockaddr_storage addr_ptr;
lean_socket_address_to_sockaddr_storage(addr, &addr_ptr);
event_loop_lock(&global_ev);
int result = uv_udp_bind(udp_socket->m_uv_udp, (sockaddr*)&addr_ptr, UV_UDP_REUSEADDR);
event_loop_unlock(&global_ev);
if (result < 0) {
return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr));
}
return lean_io_result_mk_ok(lean_box(0));
}
/* Std.Internal.UV.UDP.Socket.connect (socket : @& Socket) (addr : @& SocketAddress) : IO Unit */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_connect(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */) {
lean_uv_udp_socket_object* udp_socket = lean_to_uv_udp_socket(socket);
sockaddr_storage addr_ptr;
lean_socket_address_to_sockaddr_storage(addr, &addr_ptr);
event_loop_lock(&global_ev);
int result = uv_udp_connect(udp_socket->m_uv_udp, (sockaddr*)&addr_ptr);
event_loop_unlock(&global_ev);
if (result < 0) {
return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr));
}
return lean_io_result_mk_ok(lean_box(0));
}
/* Std.Internal.UV.UDP.Socket.send (socket : @& Socket) (data : ByteArray) (addr : @& Option SocketAddress) : IO (IO.Promise (Except IO.Error Unit)) */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg data, b_obj_arg opt_addr, obj_arg /* w */) {
lean_uv_udp_socket_object* udp_socket = lean_to_uv_udp_socket(socket);
size_t data_len = lean_sarray_size(data);
char* data_str = (char*)lean_sarray_cptr(data);
uv_buf_t buf = uv_buf_init(data_str, data_len);
lean_object* promise = lean_promise_new();
mark_mt(promise);
uv_udp_send_t* send_uv = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
send_uv->data = (udp_send_data*)malloc(sizeof(udp_send_data));
udp_send_data* send_data = (udp_send_data*)send_uv->data;
send_data->promise = promise;
send_data->data = data;
send_data->socket = socket;
// These objects are going to enter the loop and be owned by it
lean_inc(promise);
lean_inc(socket);
sockaddr_storage* addr_ptr = nullptr;
if (lean_obj_tag(opt_addr) == 1) {
lean_object* addr = lean_ctor_get(opt_addr, 0);
addr_ptr = (sockaddr_storage*)malloc(sizeof(sockaddr_storage));
lean_socket_address_to_sockaddr_storage(addr, addr_ptr);
}
event_loop_lock(&global_ev);
int result = uv_udp_send(send_uv, udp_socket->m_uv_udp, &buf, 1, (sockaddr*)addr_ptr, [](uv_udp_send_t* req, int status) {
udp_send_data* tup = (udp_send_data*) req->data;
lean_promise_resolve_with_code(status, tup->promise);
lean_dec(tup->promise);
lean_dec(tup->socket);
lean_dec(tup->data);
free(req->data);
free(req);
});
event_loop_unlock(&global_ev);
if (addr_ptr != nullptr) {
free(addr_ptr);
}
if (result < 0) {
lean_dec(promise); // The structure does not own it.
lean_dec(promise); // We are not going to return it.
lean_dec(socket); // The loop does not own the object.
lean_dec(data); // The data is owned.
free(send_uv->data);
free(send_uv);
return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr));
}
return lean_io_result_mk_ok(promise);
}
/* Std.Internal.UV.UDP.Socket.recv (socket : @& Socket) (size : UInt64) : IO (IO.Promise (Except IO.Error (ByteArray × SocketAddress))) */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_recv(b_obj_arg socket, uint64_t buffer_size, obj_arg /* w */) {
lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket(socket);
// Locking earlier to avoid parallelism issues with m_promise_read.
event_loop_lock(&global_ev);
if (udp_socket->m_promise_read != nullptr) {
event_loop_unlock(&global_ev);
return lean_io_result_mk_error(lean_decode_uv_error(UV_EALREADY, nullptr));
}
lean_object* byte_array = lean_alloc_sarray(1, 0, buffer_size);
lean_object* promise = lean_promise_new();
mark_mt(promise);
udp_socket->m_byte_array = byte_array;
udp_socket->m_promise_read = promise;
// The event loop owns the socket.
lean_inc(promise);
lean_inc(socket);
int result = uv_udp_recv_start(udp_socket->m_uv_udp, [](uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket((lean_object*)handle->data);
buf->base = (char*)lean_sarray_cptr(udp_socket->m_byte_array);
buf->len = lean_sarray_capacity(udp_socket->m_byte_array);
}, [](uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) {
uv_udp_recv_stop(handle);
lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket((lean_object*)handle->data);
lean_object* promise = udp_socket->m_promise_read;
lean_object* byte_array = udp_socket->m_byte_array;
udp_socket->m_promise_read = nullptr;
udp_socket->m_byte_array = nullptr;
if (nread >= 0) {
lean_sarray_set_size(byte_array, nread);
lean_object* addr_obj;
if (addr != NULL) {
addr_obj = lean::mk_option_some(lean_sockaddr_to_socketaddress(addr));
} else {
addr_obj = lean::mk_option_none();
}
lean_object* prod = lean_alloc_ctor(1, 2, 0);
lean_ctor_set(prod, 0, byte_array);
lean_ctor_set(prod, 1, addr_obj);
lean_promise_resolve(mk_except_ok(prod), promise);
} else if (nread < 0) {
lean_dec(byte_array);
lean_promise_resolve(mk_except_err(lean_decode_uv_error(nread, nullptr)), promise);
}
lean_dec(promise);
// The event loop does not own the object anymore.
lean_dec((lean_object*)handle->data);
});
if (result < 0) {
udp_socket->m_byte_array = nullptr;
udp_socket->m_promise_read = nullptr;
event_loop_unlock(&global_ev);
lean_dec(byte_array);
lean_dec(promise); // The structure does not own it.
lean_dec(promise); // We are not going to return it.
lean_dec(socket);
return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr));
}
event_loop_unlock(&global_ev);
return lean_io_result_mk_ok(promise);
}
// =======================================
// UDP Socket Utility Functions
/* Std.Internal.UV.UDP.Socket.getPeerName (socket : @& Socket) : IO SocketAddress */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_getpeername(b_obj_arg socket, obj_arg /* w */) {
lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket(socket);
struct sockaddr_storage addr_storage;
int addr_len = sizeof(addr_storage);
event_loop_lock(&global_ev);
int result = uv_udp_getpeername(udp_socket->m_uv_udp, (struct sockaddr*)&addr_storage, &addr_len);
event_loop_unlock(&global_ev);
if (result < 0) {
return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr));
}
lean_object *lean_addr = lean_sockaddr_to_socketaddress((struct sockaddr*)&addr_storage);
return lean_io_result_mk_ok(lean_addr);
}
/* Std.Internal.UV.UDP.Socket.getSockName (socket : @& Socket) : IO SocketAddress */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_getsockname(b_obj_arg socket) {
lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket(socket);
struct sockaddr_storage addr_storage;
int addr_len = sizeof(addr_storage);
event_loop_lock(&global_ev);
int result = uv_udp_getsockname(udp_socket->m_uv_udp, (struct sockaddr*)&addr_storage, &addr_len);
event_loop_unlock(&global_ev);
if (result < 0) {
return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr));
}
lean_object *lean_addr = lean_sockaddr_to_socketaddress((struct sockaddr*)&addr_storage);
return lean_io_result_mk_ok(lean_addr);
}
/* Std.Internal.UV.UDP.Socket.setBroadcast (socket : @& Socket) (on : Bool) : IO Unit */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_broadcast(b_obj_arg socket, uint8_t enable, obj_arg /* w */) {
lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket(socket);
event_loop_lock(&global_ev);
int result = uv_udp_set_broadcast(udp_socket->m_uv_udp, enable);
event_loop_unlock(&global_ev);
if (result < 0) {
return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr));
}
return lean_io_result_mk_ok(lean_box(0));
}
/* Std.Internal.UV.UDP.Socket.setMulticastLoop (socket : @& Socket) (on : Bool) : IO Unit */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_multicast_loop(b_obj_arg socket, uint8_t enable, obj_arg /* w */) {
lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket(socket);
event_loop_lock(&global_ev);
int result = uv_udp_set_multicast_loop(udp_socket->m_uv_udp, enable);
event_loop_unlock(&global_ev);
if (result < 0) {
return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr));
}
return lean_io_result_mk_ok(lean_box(0));
}
/* Std.Internal.UV.UDP.Socket.setMulticastTTL (socket : @& Socket) (ttl : UInt32) : IO Unit */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_multicast_ttl(b_obj_arg socket, uint32_t ttl, obj_arg /* w */) {
lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket(socket);
event_loop_lock(&global_ev);
int result = uv_udp_set_multicast_ttl(udp_socket->m_uv_udp, ttl);
event_loop_unlock(&global_ev);
if (result < 0) {
return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr));
}
return lean_io_result_mk_ok(lean_box(0));
}
/* Std.Internal.UV.UDP.Socket.setMembership (socket : @& Socket) (multicastAddr : @& IpAddr) (interfaceAddr : @& Option IpAddr) (membership : UInt8) : IO Unit */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_membership(b_obj_arg socket, b_obj_arg multicast_addr, b_obj_arg interface_addr, uint8_t membership, obj_arg /* w */) {
lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket(socket);
char multicast_addr_str[INET_ADDRSTRLEN];
lean_ip_addr_ntop(multicast_addr, multicast_addr_str, sizeof(multicast_addr_str));
bool is_interface_null = is_scalar(interface_addr);
char interface_addr_str[INET_ADDRSTRLEN];
if (!is_interface_null) {
lean_object* interface_addr_obj = lean_ctor_get(interface_addr, 0);
lean_ip_addr_ntop(interface_addr_obj, interface_addr_str, sizeof(interface_addr_str));
}
event_loop_lock(&global_ev);
int result = uv_udp_set_membership(udp_socket->m_uv_udp, multicast_addr_str, is_interface_null ? nullptr : interface_addr_str, (uv_membership)membership);
event_loop_unlock(&global_ev);
if (result < 0) {
return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr));
}
return lean_io_result_mk_ok(lean_box(0));
}
/* Std.Internal.UV.UDP.Socket.setMulticastInterface (socket : @& Socket) (interfaceAddr : @& IPAddr) : IO Unit */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_multicast_interface(b_obj_arg socket, b_obj_arg interface_addr, obj_arg /* w */) {
lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket(socket);
char interface_addr_str[INET_ADDRSTRLEN];
lean_ip_addr_ntop(interface_addr, interface_addr_str, sizeof(interface_addr_str));
event_loop_lock(&global_ev);
int result = uv_udp_set_multicast_interface(udp_socket->m_uv_udp, interface_addr_str);
event_loop_unlock(&global_ev);
if (result < 0) {
return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr));
}
return lean_io_result_mk_ok(lean_box(0));
}
/* Std.Internal.UV.UDP.Socket.setTTL (socket : @& Socket) (ttl : UInt32) : IO Unit */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_ttl(b_obj_arg socket, uint32_t ttl, obj_arg /* w */) {
lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket(socket);
event_loop_lock(&global_ev);
int result = uv_udp_set_ttl(udp_socket->m_uv_udp, ttl);
event_loop_unlock(&global_ev);
if (result < 0) {
return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr));
}
return lean_io_result_mk_ok(lean_box(0));
}
#else
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_new(obj_arg /* w */) {
lean_always_assert(
false && ("Please build a version of Lean4 with libuv to invoke this.")
);
}
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_bind(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */) {
lean_always_assert(
false && ("Please build a version of Lean4 with libuv to invoke this.")
);
}
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_connect(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */) {
lean_always_assert(
false && ("Please build a version of Lean4 with libuv to invoke this.")
);
}
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg data, b_obj_arg opt_addr, obj_arg /* w */) {
lean_always_assert(
false && ("Please build a version of Lean4 with libuv to invoke this.")
);
}
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_recv(b_obj_arg socket, uint64_t buffer_size, obj_arg /* w */) {
lean_always_assert(
false && ("Please build a version of Lean4 with libuv to invoke this.")
);
}
// =======================================
// UDP Socket Utility Functions
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_getpeername(b_obj_arg socket, obj_arg /* w */) {
lean_always_assert(
false && ("Please build a version of Lean4 with libuv to invoke this.")
);
}
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_getsockname(b_obj_arg socket) {
lean_always_assert(
false && ("Please build a version of Lean4 with libuv to invoke this.")
);
}
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_broadcast(b_obj_arg socket, uint8_t enable, obj_arg /* w */) {
lean_always_assert(
false && ("Please build a version of Lean4 with libuv to invoke this.")
);
}
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_multicast_loop(b_obj_arg socket, uint8_t enable, obj_arg /* w */) {
lean_always_assert(
false && ("Please build a version of Lean4 with libuv to invoke this.")
);
}
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_multicast_ttl(b_obj_arg socket, uint32_t ttl, obj_arg /* w */) {
lean_always_assert(
false && ("Please build a version of Lean4 with libuv to invoke this.")
);
}
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_membership(b_obj_arg socket, b_obj_arg multicast_addr, b_obj_arg interface_addr, uint8_t membership, obj_arg /* w */) {
lean_always_assert(
false && ("Please build a version of Lean4 with libuv to invoke this.")
);
}
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_multicast_interface(b_obj_arg socket, b_obj_arg interface_addr, obj_arg /* w */) {
lean_always_assert(
false && ("Please build a version of Lean4 with libuv to invoke this.")
);
}
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_ttl(b_obj_arg socket, uint32_t ttl, obj_arg /* w */) {
lean_always_assert(
false && ("Please build a version of Lean4 with libuv to invoke this.")
);
}
#endif
}

56
src/runtime/uv/udp.h Normal file
View file

@ -0,0 +1,56 @@
/*
Copyright (c) 2025 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Author: Sofia Rodrigues
*/
#pragma once
#include <lean/lean.h>
#include "runtime/uv/event_loop.h"
#include "runtime/uv/net_addr.h"
#include "runtime/object_ref.h"
namespace lean {
static lean_external_class * g_uv_udp_socket_external_class = NULL;
void initialize_libuv_udp_socket();
#ifndef LEAN_EMSCRIPTEN
#include <uv.h>
// Structure for managing a single UDP socket object, including promise handling,
// connection state, and read/write buffers.
typedef struct {
uv_udp_t * m_uv_udp; // LibUV UDP handle.
lean_object * m_promise_read; // The associated promise for asynchronous results for reading from the socket.
lean_object * m_byte_array; // The received data stored.
} lean_uv_udp_socket_object;
// =======================================
// UDP socket object manipulation functions.
static inline lean_object* lean_uv_udp_socket_new(lean_uv_udp_socket_object * s) { return lean_alloc_external(g_uv_udp_socket_external_class, s); }
static inline lean_uv_udp_socket_object* lean_to_uv_udp_socket(lean_object * o) { return (lean_uv_udp_socket_object*)(lean_get_external_data(o)); }
#endif
// =======================================
// UDP Socket Operations
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_new(obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_bind(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_connect(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg data, b_obj_arg opt_addr, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_recv(b_obj_arg socket, uint64_t buffer_size, obj_arg /* w */);
// =======================================
// UDP Socket Utility Functions
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_getpeername(b_obj_arg socket, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_getsockname(b_obj_arg socket);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_broadcast(b_obj_arg socket, uint8_t enable, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_multicast_loop(b_obj_arg socket, uint8_t enable, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_multicast_ttl(b_obj_arg socket, uint32_t ttl, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_membership(b_obj_arg socket, b_obj_arg multicast_addr, b_obj_arg interface_addr, uint8_t membership, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_multicast_interface(b_obj_arg socket, b_obj_arg interface_addr, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_ttl(b_obj_arg socket, uint32_t ttl, obj_arg /* w */);
}

View file

@ -0,0 +1,56 @@
import Std.Internal.Async
import Std.Internal.UV
import Std.Net.Addr
open Std.Internal.IO.Async.UDP
open Std.Internal.IO.Async
open Std.Net
-- Define the Async monad
structure Async (α : Type) where
run : IO (AsyncTask α)
namespace Async
-- Monad instance for Async
instance : Monad Async where
pure x := Async.mk (pure (AsyncTask.pure x))
bind ma f := Async.mk do
let task ← ma.run
task.bindIO fun a => (f a).run
-- Await function to simplify AsyncTask handling
def await (task : IO (AsyncTask α)) : Async α :=
Async.mk task
instance : MonadLift IO Async where
monadLift io := Async.mk (io >>= (pure ∘ AsyncTask.pure))
/-- Joe is another client. -/
def runJoe (addr : UInt16 → SocketAddress) (first second : UInt16) : Async Unit := do
let client ← UDP.Socket.mk
client.bind (addr second)
client.connect (addr first)
await (client.send (String.toUTF8 "hello robert!"))
def acceptClose (addr : UInt16 → SocketAddress) (first second : UInt16) : IO Unit := do
let server ← UDP.Socket.mk
server.bind (addr first)
let res ← (runJoe addr first second).run
res.block
let res ← server.recv 1024
let (msg, addr) ← res.block
if let some addr := addr then
assert! addr.port == second
assert! ("hello robert!" == String.fromUTF8! msg)
#eval acceptClose (SocketAddress.v4 ∘ SocketAddressV4.mk (.ofParts 127 0 0 1)) 9001 9002
#eval acceptClose (SocketAddress.v6 ∘ SocketAddressV6.mk (.ofParts 0 0 0 0 0 0 0 1)) 9003 9004