diff --git a/src/Std/Internal/Async.lean b/src/Std/Internal/Async.lean index 88fedb0938..ea25c8ead2 100644 --- a/src/Std/Internal/Async.lean +++ b/src/Std/Internal/Async.lean @@ -7,3 +7,4 @@ prelude import Std.Internal.Async.Basic import Std.Internal.Async.Timer import Std.Internal.Async.TCP +import Std.Internal.Async.UDP diff --git a/src/Std/Internal/Async/UDP.lean b/src/Std/Internal/Async/UDP.lean new file mode 100644 index 0000000000..74366af5cc --- /dev/null +++ b/src/Std/Internal/Async/UDP.lean @@ -0,0 +1,144 @@ +/- +Copyright (c) 2025 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Authors: Sofia Rodrigues +-/ +prelude +import Std.Time +import Std.Internal.UV +import Std.Internal.Async.Basic +import Std.Net.Addr + +namespace Std +namespace Internal +namespace IO +namespace Async +namespace UDP + +open Std.Net + +/-- +Represents a UDP socket. +-/ +structure Socket where + private ofNative :: + native : Internal.UV.UDP.Socket + +/-- +Membership type for multicast operations. +-/ +inductive Membership + | leaveGroup + | enterGroup + +namespace Socket + +/-- +Creates a new UDP socket. +-/ +@[inline] +def mk : IO Socket := do + let native ← Internal.UV.UDP.Socket.new + return Socket.ofNative native + +/-- +Binds the UDP socket to the given address. Address reuse is enabled to allow rebinding the +same address. +-/ +@[inline] +def bind (s : Socket) (addr : SocketAddress) : IO Unit := + s.native.bind addr + +/-- +Associates the UDP socket with the given address and port, so every message sent by this socket is +automatically sent to that destination. +-/ +@[inline] +def connect (s : Socket) (addr : SocketAddress) : IO Unit := + s.native.connect addr + +/-- +Sends data through an UDP socket. The `addr` parameter specifies the destination address. If `addr` +is `none`, the data is sent to the default peer address set by `connect`. +-/ +@[inline] +def send (s : Socket) (data : ByteArray) (addr : Option SocketAddress := none) : IO (AsyncTask Unit) := + AsyncTask.ofPromise <$> s.native.send data addr + +/-- +Receives data from an UDP socket. `size` is for the maximum bytes to receive. +The promise resolves when some data is available or an error occurs. If the socket +has not been previously bound with `bind`, it is automatically bound to `0.0.0.0` +(all interfaces) with a random port. +-/ +@[inline] +def recv (s : Socket) (size : UInt64) : IO (AsyncTask (ByteArray × Option SocketAddress)) := + AsyncTask.ofPromise <$> s.native.recv size + +/-- +Gets the local address of the UDP socket. +-/ +@[inline] +def getSockName (s : Socket) : IO SocketAddress := + s.native.getSockName + +/-- +Gets the remote address of the UDP socket. On unconnected handles, it throws the `.invalidArgument`. +error. +-/ +@[inline] +def getPeerName (s : Socket) : IO SocketAddress := + s.native.getPeerName + +/-- +Enables or disables broadcasting for the UDP socket. +-/ +@[inline] +def setBroadcast (s : Socket) (enable : Bool) : IO Unit := + s.native.setBroadcast enable + +/-- +Enables or disables multicast loopback for the UDP socket. +-/ +@[inline] +def setMulticastLoop (s : Socket) (enable : Bool) : IO Unit := + s.native.setMulticastLoop enable + +/-- +Sets the time-to-live (TTL) for multicast packets. +-/ +@[inline] +def setMulticastTTL (s : Socket) (ttl : UInt32) : IO Unit := + s.native.setMulticastTTL ttl + +/-- +Sets the membership for joining or leaving a multicast group. +-/ +@[inline] +def setMembership (s : Socket) (multicastAddr : IPAddr) (interfaceAddr : Option IPAddr) (membership : Membership) : IO Unit := + let membership := match membership with + | .leaveGroup => 0 + | .enterGroup => 1 + s.native.setMembership multicastAddr interfaceAddr membership + +/-- +Sets the multicast interface for sending packets. +-/ +@[inline] +def setMulticastInterface (s : Socket) (interfaceAddr : IPAddr) : IO Unit := + s.native.setMulticastInterface interfaceAddr + +/-- +Sets the TTL for outgoing packets. +-/ +@[inline] +def setTTL (s : Socket) (ttl : UInt32) : IO Unit := + s.native.setTTL ttl + +end Socket + +end UDP +end Async +end IO +end Internal +end Std diff --git a/src/Std/Internal/UV.lean b/src/Std/Internal/UV.lean index 2b47820200..8d9f524a57 100644 --- a/src/Std/Internal/UV.lean +++ b/src/Std/Internal/UV.lean @@ -10,3 +10,4 @@ import Init.System.Promise import Std.Internal.UV.Loop import Std.Internal.UV.Timer import Std.Internal.UV.TCP +import Std.Internal.UV.UDP diff --git a/src/Std/Internal/UV/UDP.lean b/src/Std/Internal/UV/UDP.lean new file mode 100644 index 0000000000..b5350fa8aa --- /dev/null +++ b/src/Std/Internal/UV/UDP.lean @@ -0,0 +1,118 @@ +/- +Copyright (c) 2025 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Authors: Sofia Rodrigues +-/ +prelude +import Init.System.IO +import Init.System.Promise +import Std.Net + +namespace Std +namespace Internal +namespace UV +namespace UDP + +open Std.Net + +private opaque SocketImpl : NonemptyType.{0} + +/-- +Represents a UDP socket. +-/ +def Socket : Type := SocketImpl.type + +instance : Nonempty Socket := SocketImpl.property + +namespace Socket + +/-- +Creates a new UDP socket. +-/ +@[extern "lean_uv_udp_new"] +opaque new : IO Socket + +/-- +Binds an UDP socket to a specific address. Address reuse is enabled to allow rebinding the +same address. +-/ +@[extern "lean_uv_udp_bind"] +opaque bind (socket : @& Socket) (addr : @& SocketAddress) : IO Unit + +/-- +Associates the UDP socket with the given address and port, so every message sent by this socket is +automatically sent to that destination. +-/ +@[extern "lean_uv_udp_connect"] +opaque connect (socket : @& Socket) (addr : @& SocketAddress) : IO Unit + +/-- +Sends data through an UDP socket. The `addr` parameter specifies the destination address. If `addr` +is `none`, the data is sent to the default peer address set by `connect`. +-/ +@[extern "lean_uv_udp_send"] +opaque send (socket : @& Socket) (data : ByteArray) (addr : @& Option SocketAddress) : IO (IO.Promise (Except IO.Error Unit)) + +/-- +Receives data from an UDP socket. `size` is for the maximum bytes to receive. The promise +resolves when some data is available or an error occurs. +-/ +@[extern "lean_uv_udp_recv"] +opaque recv (socket : @& Socket) (size : UInt64) : IO (IO.Promise (Except IO.Error (ByteArray × Option SocketAddress))) + +/-- +Receives data from an UDP socket. `size` is for the maximum bytes to receive. The promise resolves +when some data is available or an error occurs. If the socket has not been previously bound with `bind`, +it is automatically bound to `0.0.0.0` (all interfaces) with a random port. +-/ +@[extern "lean_uv_udp_getpeername"] +opaque getPeerName (socket : @& Socket) : IO SocketAddress + +/-- +Gets the local address of a bound UDP socket. +-/ +@[extern "lean_uv_udp_getsockname"] +opaque getSockName (socket : @& Socket) : IO SocketAddress + +/-- +Enables or disables broadcasting on a UDP socket. +-/ +@[extern "lean_uv_udp_set_broadcast"] +opaque setBroadcast (socket : @& Socket) (on : Bool) : IO Unit + +/-- +Enables or disables multicast loopback for a UDP socket. +-/ +@[extern "lean_uv_udp_set_multicast_loop"] +opaque setMulticastLoop (socket : @& Socket) (on : Bool) : IO Unit + +/-- +Sets the time-to-live (TTL) value for multicast packets. +-/ +@[extern "lean_uv_udp_set_multicast_ttl"] +opaque setMulticastTTL (socket : @& Socket) (ttl : UInt32) : IO Unit + +/-- +Sets the membership for joining or leaving a multicast group. If `interfaceAddr` is `none`, the +default network interface is used. +-/ +@[extern "lean_uv_udp_set_membership"] +opaque setMembership (socket : @& Socket) (multicastAddr : @& IPAddr) (interfaceAddr : @& Option IPAddr) (membership : UInt8) : IO Unit + +/-- +Sets the multicast interface for sending packets. +-/ +@[extern "lean_uv_udp_set_multicast_interface"] +opaque setMulticastInterface (socket : @& Socket) (interfaceAddr : @& IPAddr) : IO Unit + +/-- +Sets the TTL value for outgoing packets. +-/ +@[extern "lean_uv_udp_set_ttl"] +opaque setTTL (socket : @& Socket) (ttl : UInt32) : IO Unit + +end Socket +end UDP +end UV +end Internal +end Std diff --git a/src/runtime/CMakeLists.txt b/src/runtime/CMakeLists.txt index 7d4a9fbaab..da43fc085b 100644 --- a/src/runtime/CMakeLists.txt +++ b/src/runtime/CMakeLists.txt @@ -3,7 +3,7 @@ object.cpp apply.cpp exception.cpp interrupt.cpp memory.cpp stackinfo.cpp compact.cpp init_module.cpp io.cpp hash.cpp platform.cpp alloc.cpp allocprof.cpp sharecommon.cpp stack_overflow.cpp process.cpp object_ref.cpp mpn.cpp mutex.cpp libuv.cpp uv/net_addr.cpp uv/event_loop.cpp -uv/timer.cpp uv/tcp.cpp) +uv/timer.cpp uv/tcp.cpp uv/udp.cpp) add_library(leanrt_initial-exec STATIC ${RUNTIME_OBJS}) set_target_properties(leanrt_initial-exec PROPERTIES ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) diff --git a/src/runtime/libuv.cpp b/src/runtime/libuv.cpp index 58fffa4006..feeb16aab0 100644 --- a/src/runtime/libuv.cpp +++ b/src/runtime/libuv.cpp @@ -16,6 +16,7 @@ namespace lean { extern "C" void initialize_libuv() { initialize_libuv_timer(); initialize_libuv_tcp_socket(); + initialize_libuv_udp_socket(); initialize_libuv_loop(); lthread([]() { event_loop_run_loop(&global_ev); }); diff --git a/src/runtime/libuv.h b/src/runtime/libuv.h index f158ce024c..f994a324d8 100644 --- a/src/runtime/libuv.h +++ b/src/runtime/libuv.h @@ -9,6 +9,7 @@ Author: Markus Himmel, Sofia Rodrigues #include "runtime/uv/event_loop.h" #include "runtime/uv/timer.h" #include "runtime/uv/tcp.h" +#include "runtime/uv/udp.h" #include "runtime/alloc.h" #include "runtime/io.h" #include "runtime/utf8.h" diff --git a/src/runtime/uv/net_addr.cpp b/src/runtime/uv/net_addr.cpp index d8aaec8897..df88cda131 100644 --- a/src/runtime/uv/net_addr.cpp +++ b/src/runtime/uv/net_addr.cpp @@ -28,6 +28,28 @@ void lean_ipv6_addr_to_in6_addr(b_obj_arg ipv6_addr, in6_addr* out) { } } +void lean_ip_addr_to_in_addr_storage(b_obj_arg ip_addr, int* type, in_addr_storage* out) { + lean_object* ip_obj = lean_ctor_get(ip_addr, 0); + + if (lean_ptr_tag(ip_addr) == 0) { + lean_ipv4_addr_to_in_addr(ip_obj, (in_addr*) out); + *type = AF_INET; + } else { + lean_ipv6_addr_to_in6_addr(ip_obj, (in6_addr*) out); + *type = AF_INET6; + } +} + +void lean_ip_addr_ntop(b_obj_arg ip_addr, char *buffer, size_t buffer_size) { + int ip_type; + in_addr_storage ip_addr_storage; + + lean_ip_addr_to_in_addr_storage(ip_addr, &ip_type, &ip_addr_storage); + int ret = uv_inet_ntop(ip_type, &ip_addr_storage, buffer, buffer_size); + + lean_always_assert(ret == 0); +} + void lean_socket_address_to_sockaddr_storage(b_obj_arg ip_addr, sockaddr_storage* out) { memset(out, 0, sizeof(*out)); diff --git a/src/runtime/uv/net_addr.h b/src/runtime/uv/net_addr.h index b03337075a..ba36cd84c1 100644 --- a/src/runtime/uv/net_addr.h +++ b/src/runtime/uv/net_addr.h @@ -19,7 +19,9 @@ typedef union in_addr_storage { void lean_ipv4_addr_to_in_addr(b_obj_arg ipv4_addr, struct in_addr* out); void lean_ipv6_addr_to_in6_addr(b_obj_arg ipv6_addr, struct in6_addr* out); +void lean_ip_addr_to_in_addr_storage(b_obj_arg ip_addr, int* type, in_addr_storage* out); void lean_socket_address_to_sockaddr_storage(b_obj_arg ip_addr, struct sockaddr_storage* out); +void lean_ip_addr_ntop(b_obj_arg ip_addr, char *buffer, size_t buffer_size); lean_obj_res lean_phys_addr_to_mac_addr(char phys_addr[6]); lean_obj_res lean_in_addr_to_ipv4_addr(const struct in_addr* ipv4_addr); diff --git a/src/runtime/uv/udp.cpp b/src/runtime/uv/udp.cpp new file mode 100644 index 0000000000..1f0fcb9d55 --- /dev/null +++ b/src/runtime/uv/udp.cpp @@ -0,0 +1,508 @@ +/* +Copyright (c) 2025 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Author: Sofia Rodrigues +*/ + +#include "runtime/uv/udp.h" +#include + +namespace lean { + +#ifndef LEAN_EMSCRIPTEN + +// Stores all the things needed to send data to a UDP socket. +typedef struct { + lean_object *promise; + lean_object *data; + lean_object *socket; +} udp_send_data; + +void lean_uv_udp_socket_finalizer(void* ptr) { + lean_uv_udp_socket_object* udp_socket = (lean_uv_udp_socket_object*)ptr; + + lean_always_assert(udp_socket->m_promise_read == nullptr); + lean_always_assert(udp_socket->m_byte_array == nullptr); + + /// It's changing here because the object is being freed in the finalizer, and we need the data + /// inside of it. + udp_socket->m_uv_udp->data = ptr; + + event_loop_lock(&global_ev); + + uv_close((uv_handle_t*)udp_socket->m_uv_udp, [](uv_handle_t* handle) { + lean_uv_udp_socket_object* udp_socket = (lean_uv_udp_socket_object*)handle->data; + free(udp_socket->m_uv_udp); + free(udp_socket); + }); + + event_loop_unlock(&global_ev); +} + +void initialize_libuv_udp_socket() { + g_uv_udp_socket_external_class = lean_register_external_class(lean_uv_udp_socket_finalizer, [](void* obj, lean_object* f) { + lean_uv_udp_socket_object* udp_socket = (lean_uv_udp_socket_object*)obj; + + if (udp_socket->m_promise_read != nullptr) { + lean_inc(f); + lean_apply_1(f, udp_socket->m_promise_read); + } + + if (udp_socket->m_byte_array != nullptr) { + lean_inc(f); + lean_apply_1(f, udp_socket->m_byte_array); + } + }); +} + +// ======================================= +// UDP Socket Operations + +/* Std.Internal.UV.UDP.Socket.new : IO Socket */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_new(obj_arg /* w */) { + lean_uv_udp_socket_object* udp_socket = (lean_uv_udp_socket_object*)malloc(sizeof(lean_uv_udp_socket_object)); + + udp_socket->m_promise_read = nullptr; + udp_socket->m_byte_array = nullptr; + + uv_udp_t* uv_udp = (uv_udp_t*)malloc(sizeof(uv_udp_t)); + + event_loop_lock(&global_ev); + int result = uv_udp_init(global_ev.loop, uv_udp); + event_loop_unlock(&global_ev); + + if (result != 0) { + free(uv_udp); + free(udp_socket); + + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + lean_object* obj = lean_uv_udp_socket_new(udp_socket); + lean_mark_mt(obj); + + udp_socket->m_uv_udp = uv_udp; + udp_socket->m_uv_udp->data = obj; + + return lean_io_result_mk_ok(obj); +} + +/* Std.Internal.UV.UDP.Socket.bind (socket : @& Socket) (addr : @& SocketAddress) : IO Unit */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_bind(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */) { + lean_uv_udp_socket_object* udp_socket = lean_to_uv_udp_socket(socket); + + sockaddr_storage addr_ptr; + lean_socket_address_to_sockaddr_storage(addr, &addr_ptr); + + event_loop_lock(&global_ev); + int result = uv_udp_bind(udp_socket->m_uv_udp, (sockaddr*)&addr_ptr, UV_UDP_REUSEADDR); + event_loop_unlock(&global_ev); + + if (result < 0) { + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + return lean_io_result_mk_ok(lean_box(0)); +} + +/* Std.Internal.UV.UDP.Socket.connect (socket : @& Socket) (addr : @& SocketAddress) : IO Unit */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_connect(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */) { + lean_uv_udp_socket_object* udp_socket = lean_to_uv_udp_socket(socket); + + sockaddr_storage addr_ptr; + lean_socket_address_to_sockaddr_storage(addr, &addr_ptr); + + event_loop_lock(&global_ev); + int result = uv_udp_connect(udp_socket->m_uv_udp, (sockaddr*)&addr_ptr); + event_loop_unlock(&global_ev); + + if (result < 0) { + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + return lean_io_result_mk_ok(lean_box(0)); +} + +/* Std.Internal.UV.UDP.Socket.send (socket : @& Socket) (data : ByteArray) (addr : @& Option SocketAddress) : IO (IO.Promise (Except IO.Error Unit)) */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg data, b_obj_arg opt_addr, obj_arg /* w */) { + lean_uv_udp_socket_object* udp_socket = lean_to_uv_udp_socket(socket); + + size_t data_len = lean_sarray_size(data); + char* data_str = (char*)lean_sarray_cptr(data); + + uv_buf_t buf = uv_buf_init(data_str, data_len); + + lean_object* promise = lean_promise_new(); + mark_mt(promise); + + uv_udp_send_t* send_uv = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t)); + send_uv->data = (udp_send_data*)malloc(sizeof(udp_send_data)); + + udp_send_data* send_data = (udp_send_data*)send_uv->data; + send_data->promise = promise; + send_data->data = data; + send_data->socket = socket; + + // These objects are going to enter the loop and be owned by it + lean_inc(promise); + lean_inc(socket); + + sockaddr_storage* addr_ptr = nullptr; + + if (lean_obj_tag(opt_addr) == 1) { + lean_object* addr = lean_ctor_get(opt_addr, 0); + addr_ptr = (sockaddr_storage*)malloc(sizeof(sockaddr_storage)); + lean_socket_address_to_sockaddr_storage(addr, addr_ptr); + } + + event_loop_lock(&global_ev); + + int result = uv_udp_send(send_uv, udp_socket->m_uv_udp, &buf, 1, (sockaddr*)addr_ptr, [](uv_udp_send_t* req, int status) { + udp_send_data* tup = (udp_send_data*) req->data; + lean_promise_resolve_with_code(status, tup->promise); + + lean_dec(tup->promise); + lean_dec(tup->socket); + lean_dec(tup->data); + + free(req->data); + free(req); + }); + + event_loop_unlock(&global_ev); + + if (addr_ptr != nullptr) { + free(addr_ptr); + } + + if (result < 0) { + lean_dec(promise); // The structure does not own it. + lean_dec(promise); // We are not going to return it. + lean_dec(socket); // The loop does not own the object. + lean_dec(data); // The data is owned. + + free(send_uv->data); + free(send_uv); + + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + return lean_io_result_mk_ok(promise); +} + +/* Std.Internal.UV.UDP.Socket.recv (socket : @& Socket) (size : UInt64) : IO (IO.Promise (Except IO.Error (ByteArray × SocketAddress))) */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_recv(b_obj_arg socket, uint64_t buffer_size, obj_arg /* w */) { + lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket(socket); + + // Locking earlier to avoid parallelism issues with m_promise_read. + event_loop_lock(&global_ev); + + if (udp_socket->m_promise_read != nullptr) { + event_loop_unlock(&global_ev); + return lean_io_result_mk_error(lean_decode_uv_error(UV_EALREADY, nullptr)); + } + + lean_object* byte_array = lean_alloc_sarray(1, 0, buffer_size); + lean_object* promise = lean_promise_new(); + mark_mt(promise); + + udp_socket->m_byte_array = byte_array; + udp_socket->m_promise_read = promise; + + // The event loop owns the socket. + lean_inc(promise); + lean_inc(socket); + + int result = uv_udp_recv_start(udp_socket->m_uv_udp, [](uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { + lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket((lean_object*)handle->data); + + buf->base = (char*)lean_sarray_cptr(udp_socket->m_byte_array); + buf->len = lean_sarray_capacity(udp_socket->m_byte_array); + }, [](uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) { + uv_udp_recv_stop(handle); + + lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket((lean_object*)handle->data); + lean_object* promise = udp_socket->m_promise_read; + lean_object* byte_array = udp_socket->m_byte_array; + + udp_socket->m_promise_read = nullptr; + udp_socket->m_byte_array = nullptr; + + if (nread >= 0) { + lean_sarray_set_size(byte_array, nread); + + lean_object* addr_obj; + + if (addr != NULL) { + addr_obj = lean::mk_option_some(lean_sockaddr_to_socketaddress(addr)); + } else { + addr_obj = lean::mk_option_none(); + } + + lean_object* prod = lean_alloc_ctor(1, 2, 0); + lean_ctor_set(prod, 0, byte_array); + lean_ctor_set(prod, 1, addr_obj); + + lean_promise_resolve(mk_except_ok(prod), promise); + } else if (nread < 0) { + lean_dec(byte_array); + lean_promise_resolve(mk_except_err(lean_decode_uv_error(nread, nullptr)), promise); + } + + lean_dec(promise); + + // The event loop does not own the object anymore. + lean_dec((lean_object*)handle->data); + }); + + if (result < 0) { + udp_socket->m_byte_array = nullptr; + udp_socket->m_promise_read = nullptr; + + event_loop_unlock(&global_ev); + + lean_dec(byte_array); + lean_dec(promise); // The structure does not own it. + lean_dec(promise); // We are not going to return it. + lean_dec(socket); + + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + event_loop_unlock(&global_ev); + + return lean_io_result_mk_ok(promise); +} + + +// ======================================= +// UDP Socket Utility Functions + +/* Std.Internal.UV.UDP.Socket.getPeerName (socket : @& Socket) : IO SocketAddress */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_getpeername(b_obj_arg socket, obj_arg /* w */) { + lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket(socket); + + struct sockaddr_storage addr_storage; + int addr_len = sizeof(addr_storage); + + event_loop_lock(&global_ev); + int result = uv_udp_getpeername(udp_socket->m_uv_udp, (struct sockaddr*)&addr_storage, &addr_len); + event_loop_unlock(&global_ev); + + if (result < 0) { + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + lean_object *lean_addr = lean_sockaddr_to_socketaddress((struct sockaddr*)&addr_storage); + + return lean_io_result_mk_ok(lean_addr); +} + +/* Std.Internal.UV.UDP.Socket.getSockName (socket : @& Socket) : IO SocketAddress */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_getsockname(b_obj_arg socket) { + lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket(socket); + + struct sockaddr_storage addr_storage; + int addr_len = sizeof(addr_storage); + + event_loop_lock(&global_ev); + int result = uv_udp_getsockname(udp_socket->m_uv_udp, (struct sockaddr*)&addr_storage, &addr_len); + event_loop_unlock(&global_ev); + + if (result < 0) { + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + lean_object *lean_addr = lean_sockaddr_to_socketaddress((struct sockaddr*)&addr_storage); + return lean_io_result_mk_ok(lean_addr); +} + +/* Std.Internal.UV.UDP.Socket.setBroadcast (socket : @& Socket) (on : Bool) : IO Unit */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_broadcast(b_obj_arg socket, uint8_t enable, obj_arg /* w */) { + lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket(socket); + + event_loop_lock(&global_ev); + int result = uv_udp_set_broadcast(udp_socket->m_uv_udp, enable); + event_loop_unlock(&global_ev); + + if (result < 0) { + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + return lean_io_result_mk_ok(lean_box(0)); +} + +/* Std.Internal.UV.UDP.Socket.setMulticastLoop (socket : @& Socket) (on : Bool) : IO Unit */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_multicast_loop(b_obj_arg socket, uint8_t enable, obj_arg /* w */) { + lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket(socket); + + event_loop_lock(&global_ev); + int result = uv_udp_set_multicast_loop(udp_socket->m_uv_udp, enable); + event_loop_unlock(&global_ev); + + if (result < 0) { + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + return lean_io_result_mk_ok(lean_box(0)); +} + +/* Std.Internal.UV.UDP.Socket.setMulticastTTL (socket : @& Socket) (ttl : UInt32) : IO Unit */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_multicast_ttl(b_obj_arg socket, uint32_t ttl, obj_arg /* w */) { + lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket(socket); + + event_loop_lock(&global_ev); + int result = uv_udp_set_multicast_ttl(udp_socket->m_uv_udp, ttl); + event_loop_unlock(&global_ev); + + if (result < 0) { + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + return lean_io_result_mk_ok(lean_box(0)); +} + +/* Std.Internal.UV.UDP.Socket.setMembership (socket : @& Socket) (multicastAddr : @& IpAddr) (interfaceAddr : @& Option IpAddr) (membership : UInt8) : IO Unit */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_membership(b_obj_arg socket, b_obj_arg multicast_addr, b_obj_arg interface_addr, uint8_t membership, obj_arg /* w */) { + lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket(socket); + + char multicast_addr_str[INET_ADDRSTRLEN]; + lean_ip_addr_ntop(multicast_addr, multicast_addr_str, sizeof(multicast_addr_str)); + + bool is_interface_null = is_scalar(interface_addr); + char interface_addr_str[INET_ADDRSTRLEN]; + + if (!is_interface_null) { + lean_object* interface_addr_obj = lean_ctor_get(interface_addr, 0); + lean_ip_addr_ntop(interface_addr_obj, interface_addr_str, sizeof(interface_addr_str)); + } + + event_loop_lock(&global_ev); + int result = uv_udp_set_membership(udp_socket->m_uv_udp, multicast_addr_str, is_interface_null ? nullptr : interface_addr_str, (uv_membership)membership); + event_loop_unlock(&global_ev); + + if (result < 0) { + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + return lean_io_result_mk_ok(lean_box(0)); +} + +/* Std.Internal.UV.UDP.Socket.setMulticastInterface (socket : @& Socket) (interfaceAddr : @& IPAddr) : IO Unit */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_multicast_interface(b_obj_arg socket, b_obj_arg interface_addr, obj_arg /* w */) { + lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket(socket); + + char interface_addr_str[INET_ADDRSTRLEN]; + lean_ip_addr_ntop(interface_addr, interface_addr_str, sizeof(interface_addr_str)); + + event_loop_lock(&global_ev); + int result = uv_udp_set_multicast_interface(udp_socket->m_uv_udp, interface_addr_str); + event_loop_unlock(&global_ev); + + if (result < 0) { + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + return lean_io_result_mk_ok(lean_box(0)); +} + +/* Std.Internal.UV.UDP.Socket.setTTL (socket : @& Socket) (ttl : UInt32) : IO Unit */ +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_ttl(b_obj_arg socket, uint32_t ttl, obj_arg /* w */) { + lean_uv_udp_socket_object *udp_socket = lean_to_uv_udp_socket(socket); + + event_loop_lock(&global_ev); + int result = uv_udp_set_ttl(udp_socket->m_uv_udp, ttl); + event_loop_unlock(&global_ev); + + if (result < 0) { + return lean_io_result_mk_error(lean_decode_uv_error(result, nullptr)); + } + + return lean_io_result_mk_ok(lean_box(0)); +} + +#else + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_new(obj_arg /* w */) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_bind(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_connect(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg data, b_obj_arg opt_addr, obj_arg /* w */) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_recv(b_obj_arg socket, uint64_t buffer_size, obj_arg /* w */) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +// ======================================= +// UDP Socket Utility Functions + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_getpeername(b_obj_arg socket, obj_arg /* w */) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_getsockname(b_obj_arg socket) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_broadcast(b_obj_arg socket, uint8_t enable, obj_arg /* w */) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_multicast_loop(b_obj_arg socket, uint8_t enable, obj_arg /* w */) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_multicast_ttl(b_obj_arg socket, uint32_t ttl, obj_arg /* w */) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_membership(b_obj_arg socket, b_obj_arg multicast_addr, b_obj_arg interface_addr, uint8_t membership, obj_arg /* w */) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_multicast_interface(b_obj_arg socket, b_obj_arg interface_addr, obj_arg /* w */) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_ttl(b_obj_arg socket, uint32_t ttl, obj_arg /* w */) { + lean_always_assert( + false && ("Please build a version of Lean4 with libuv to invoke this.") + ); +} + +#endif +} \ No newline at end of file diff --git a/src/runtime/uv/udp.h b/src/runtime/uv/udp.h new file mode 100644 index 0000000000..74fa6b0fa3 --- /dev/null +++ b/src/runtime/uv/udp.h @@ -0,0 +1,56 @@ +/* +Copyright (c) 2025 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Author: Sofia Rodrigues +*/ +#pragma once +#include +#include "runtime/uv/event_loop.h" +#include "runtime/uv/net_addr.h" +#include "runtime/object_ref.h" + +namespace lean { + +static lean_external_class * g_uv_udp_socket_external_class = NULL; +void initialize_libuv_udp_socket(); + +#ifndef LEAN_EMSCRIPTEN +#include + +// Structure for managing a single UDP socket object, including promise handling, +// connection state, and read/write buffers. +typedef struct { + uv_udp_t * m_uv_udp; // LibUV UDP handle. + lean_object * m_promise_read; // The associated promise for asynchronous results for reading from the socket. + lean_object * m_byte_array; // The received data stored. +} lean_uv_udp_socket_object; + +// ======================================= +// UDP socket object manipulation functions. +static inline lean_object* lean_uv_udp_socket_new(lean_uv_udp_socket_object * s) { return lean_alloc_external(g_uv_udp_socket_external_class, s); } +static inline lean_uv_udp_socket_object* lean_to_uv_udp_socket(lean_object * o) { return (lean_uv_udp_socket_object*)(lean_get_external_data(o)); } + +#endif + +// ======================================= +// UDP Socket Operations + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_new(obj_arg /* w */); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_bind(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_connect(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_send(b_obj_arg socket, obj_arg data, b_obj_arg opt_addr, obj_arg /* w */); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_recv(b_obj_arg socket, uint64_t buffer_size, obj_arg /* w */); + +// ======================================= +// UDP Socket Utility Functions + +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_getpeername(b_obj_arg socket, obj_arg /* w */); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_getsockname(b_obj_arg socket); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_broadcast(b_obj_arg socket, uint8_t enable, obj_arg /* w */); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_multicast_loop(b_obj_arg socket, uint8_t enable, obj_arg /* w */); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_multicast_ttl(b_obj_arg socket, uint32_t ttl, obj_arg /* w */); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_membership(b_obj_arg socket, b_obj_arg multicast_addr, b_obj_arg interface_addr, uint8_t membership, obj_arg /* w */); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_multicast_interface(b_obj_arg socket, b_obj_arg interface_addr, obj_arg /* w */); +extern "C" LEAN_EXPORT lean_obj_res lean_uv_udp_set_ttl(b_obj_arg socket, uint32_t ttl, obj_arg /* w */); + +} \ No newline at end of file diff --git a/tests/lean/run/async_udp_sockets.lean b/tests/lean/run/async_udp_sockets.lean new file mode 100644 index 0000000000..9e316be0f9 --- /dev/null +++ b/tests/lean/run/async_udp_sockets.lean @@ -0,0 +1,56 @@ +import Std.Internal.Async +import Std.Internal.UV +import Std.Net.Addr + +open Std.Internal.IO.Async.UDP +open Std.Internal.IO.Async +open Std.Net + +-- Define the Async monad +structure Async (α : Type) where + run : IO (AsyncTask α) + +namespace Async + +-- Monad instance for Async +instance : Monad Async where + pure x := Async.mk (pure (AsyncTask.pure x)) + bind ma f := Async.mk do + let task ← ma.run + task.bindIO fun a => (f a).run + +-- Await function to simplify AsyncTask handling +def await (task : IO (AsyncTask α)) : Async α := + Async.mk task + +instance : MonadLift IO Async where + monadLift io := Async.mk (io >>= (pure ∘ AsyncTask.pure)) + +/-- Joe is another client. -/ +def runJoe (addr : UInt16 → SocketAddress) (first second : UInt16) : Async Unit := do + let client ← UDP.Socket.mk + + client.bind (addr second) + client.connect (addr first) + + await (client.send (String.toUTF8 "hello robert!")) + + +def acceptClose (addr : UInt16 → SocketAddress) (first second : UInt16) : IO Unit := do + + let server ← UDP.Socket.mk + server.bind (addr first) + + let res ← (runJoe addr first second).run + res.block + + let res ← server.recv 1024 + let (msg, addr) ← res.block + + if let some addr := addr then + assert! addr.port == second + + assert! ("hello robert!" == String.fromUTF8! msg) + +#eval acceptClose (SocketAddress.v4 ∘ SocketAddressV4.mk (.ofParts 127 0 0 1)) 9001 9002 +#eval acceptClose (SocketAddress.v6 ∘ SocketAddressV6.mk (.ofParts 0 0 0 0 0 0 0 1)) 9003 9004