diff --git a/extensions/components/Cargo.toml b/extensions/components/Cargo.toml index 582c0bc..b948966 100644 --- a/extensions/components/Cargo.toml +++ b/extensions/components/Cargo.toml @@ -3,6 +3,9 @@ name = "components" version = "0.1.0" edition = "2024" +[features] +logos-delivery = [] + [dependencies] # Workspace dependencies (sorted) crypto = { workspace = true } @@ -15,5 +18,6 @@ crossbeam-channel = { workspace = true } hex = "0.4.3" reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls"] } serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" thiserror = "2" tracing = "0.1" diff --git a/extensions/components/build.rs b/extensions/components/build.rs new file mode 100644 index 0000000..1f45a13 --- /dev/null +++ b/extensions/components/build.rs @@ -0,0 +1,28 @@ +fn main() { + println!("cargo:rerun-if-env-changed=LOGOS_DELIVERY_LIB_DIR"); + println!("cargo::rustc-check-cfg=cfg(logos_delivery)"); + + let feature_enabled = std::env::var("CARGO_FEATURE_LOGOS_DELIVERY").is_ok(); + let lib_dir = std::env::var("LOGOS_DELIVERY_LIB_DIR"); + + let lib_dir = match lib_dir { + Ok(dir) => dir, + Err(_) if !feature_enabled => return, + Err(_) => { + // Feature is on but no library path — enable compilation, skip linking. + println!("cargo:rustc-cfg=logos_delivery"); + return; + } + }; + + println!("cargo:rustc-cfg=logos_delivery"); + + println!("cargo:rustc-link-search=native={lib_dir}"); + println!("cargo:rustc-link-lib=dylib=logosdelivery"); + + let target_os = std::env::var("CARGO_CFG_TARGET_OS").unwrap_or_default(); + match target_os.as_str() { + "macos" | "linux" => println!("cargo:rustc-link-arg=-Wl,-rpath,{lib_dir}"), + other => panic!("unsupported OS for logos-delivery transport: {other}"), + } +} diff --git a/extensions/components/src/delivery.rs b/extensions/components/src/delivery.rs index 627ac55..a59dd88 100644 --- a/extensions/components/src/delivery.rs +++ b/extensions/components/src/delivery.rs @@ -1,3 +1,9 @@ mod local_broadcaster; pub use local_broadcaster::LocalBroadcaster; + +#[cfg(logos_delivery)] +mod embedded_p2p_delivery; + +#[cfg(logos_delivery)] +pub use embedded_p2p_delivery::EmbeddedP2pDeliveryService; diff --git a/extensions/components/src/delivery/embedded_p2p_delivery.rs b/extensions/components/src/delivery/embedded_p2p_delivery.rs new file mode 100644 index 0000000..bb8fb1d --- /dev/null +++ b/extensions/components/src/delivery/embedded_p2p_delivery.rs @@ -0,0 +1,314 @@ +//! logos-delivery backed [`client::DeliveryService`] implementation. +//! +//! `LogosDeliveryService` wraps an embedded logos-delivery node running on a +//! dedicated `std::thread`. All interaction is via synchronous `std::sync::mpsc` +//! channels. +//! +//! ## Content topic mapping +//! +//! `AddressedEnvelope::delivery_address` maps to logos-delivery content topic +//! `/logos-chat/1/{delivery_address}/proto`. + +pub(crate) mod sys; +pub(crate) mod wrapper; + +use std::sync::{Arc, Mutex, mpsc}; +use std::thread; +use std::time::Duration; + +use base64::Engine; +use base64::engine::general_purpose::STANDARD as BASE64; +use crossbeam_channel::{Receiver, Sender}; +use libchat::{AddressedEnvelope, DeliveryService}; +use tracing::{error, info, warn}; + +use wrapper::LogosNodeCtx; + +pub fn content_topic_for(delivery_address: &str) -> String { + format!("/logos-chat/1/{delivery_address}/proto") +} + +// ── Error ──────────────────────────────────────────────────────────────────── + +#[derive(Debug, thiserror::Error)] +pub enum DeliveryError { + #[error("node startup failed: {0}")] + StartupFailed(String), + #[error("publish failed: {0}")] + PublishFailed(String), + #[error("send channel closed")] + ChannelClosed, +} + +// ── Internals ──────────────────────────────────────────────────────────────── + +struct OutboundCmd { + message_json: String, + reply: mpsc::SyncSender>, +} + +type SubscriberList = Arc>>>>; + +// ── Config ─────────────────────────────────────────────────────────────────── + +#[derive(Debug, Clone)] +pub struct Config { + pub preset: String, + pub tcp_port: u16, + pub log_level: String, +} + +impl Default for Config { + fn default() -> Self { + Self { + preset: "logos.dev".into(), + tcp_port: 60000, + log_level: "ERROR".into(), + } + } +} + +// ── Wire types ────────────────────────────────────────────────────────────── + +/// Outbound message sent to the logos-delivery node. +#[derive(Debug, serde::Serialize, serde::Deserialize)] +struct WakuMessage { + #[serde(rename = "contentTopic")] + content_topic: String, + /// Base64-encoded payload. + payload: String, + ephemeral: bool, +} + +/// Top-level event envelope received from the logos-delivery node callback. +#[derive(Debug, serde::Deserialize)] +struct WakuEvent { + #[serde(rename = "eventType")] + event_type: String, + message: Option, +} + +/// Message payload from a `message_received` event. +#[derive(Debug, serde::Deserialize)] +struct ReceivedMessage { + #[serde(rename = "contentTopic")] + content_topic: String, + /// The node may deliver the payload as either a base64 string or a JSON + /// array of byte values. + payload: WakuPayload, +} + +/// Untagged union that handles both payload representations. +#[derive(Debug, serde::Deserialize)] +#[serde(untagged)] +enum WakuPayload { + Base64(String), + Bytes(Vec), +} + +impl WakuPayload { + fn decode(self) -> Option> { + match self { + WakuPayload::Base64(s) => BASE64.decode(s).ok(), + WakuPayload::Bytes(b) => Some(b), + } + } +} + +// ── EmbeddedP2pDeliveryService ────────────────────────────────────────────────── + +/// logos-delivery backed delivery service. Cheap to clone — all clones share +/// the same background node. +#[derive(Clone, Debug)] +pub struct EmbeddedP2pDeliveryService { + outbound: mpsc::SyncSender, + #[allow(dead_code)] + subscribers: SubscriberList, + inbound_rx: Option>>, +} + +impl EmbeddedP2pDeliveryService { + /// Start the embedded logos-delivery node. The client drains inbound + /// payloads via [`Transport::inbound`]. + pub fn start(cfg: Config) -> Result { + let (out_tx, out_rx) = mpsc::sync_channel::(256); + let subscribers: SubscriberList = Arc::new(Mutex::new(Vec::new())); + let (ready_tx, ready_rx) = mpsc::channel::>(); + // Create the inbound channel before spawning so the receiver is + // registered inside the thread, before any event callback fires. + let (inbound_tx, inbound_rx) = crossbeam_channel::bounded::>(1024); + + let subs_for_thread = subscribers.clone(); + + let handle = thread::Builder::new() + .name("logos-node".into()) + .spawn(move || { + if let Err(panic) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + Self::node_thread(cfg, out_rx, subs_for_thread, inbound_tx, ready_tx); + })) { + let msg = panic + .downcast_ref::<&str>() + .map(|s| s.to_string()) + .or_else(|| panic.downcast_ref::().cloned()) + .unwrap_or_else(|| "unknown panic".into()); + error!("logos-node thread panicked: {msg}"); + } + }) + .map_err(|e| DeliveryError::StartupFailed(e.to_string()))?; + + // On failure, the node thread drops LogosNodeCtx (stop+destroy against + // a half-initialized Nim node). Join it so the process doesn't begin + // teardown mid-destroy — that race SIGSEGVs inside the Nim async loop. + let ready = ready_rx.recv().unwrap_or_else(|_| { + Err(DeliveryError::StartupFailed( + "node thread exited before ready".into(), + )) + }); + if let Err(e) = ready { + let _ = handle.join(); + return Err(e); + } + + Ok(Self { + outbound: out_tx, + subscribers, + inbound_rx: Some(inbound_rx), + }) + } + + fn node_thread( + cfg: Config, + out_rx: mpsc::Receiver, + subscribers: SubscriberList, + inbound_tx: Sender>, + ready_tx: mpsc::Sender>, + ) { + // discv5UdpPort defaults to 9000 in libwaku, so a second instance with + // a distinct --port still collides on UDP. Bind it to tcp_port so a + // single --port knob keeps both ports distinct across instances. + let config_json = serde_json::json!({ + "logLevel": cfg.log_level, + "mode": "Core", + "preset": cfg.preset, + "tcpPort": cfg.tcp_port, + "discv5UdpPort": cfg.tcp_port, + }) + .to_string(); + + let mut node = match LogosNodeCtx::new(&config_json) { + Ok(n) => n, + Err(e) => { + let _ = ready_tx.send(Err(DeliveryError::StartupFailed(e))); + return; + } + }; + + // Register the inbound sender before installing the event callback so + // there is no window where the callback is live but the channel is not + // yet in the subscriber list. + subscribers.lock().unwrap().push(inbound_tx); + + let subs_for_cb = subscribers.clone(); + let event_closure = move |_ret: i32, data: &str| { + if let Some(payload) = Self::parse_message_received(data) { + let mut guard = match subs_for_cb.lock() { + Ok(g) => g, + Err(e) => { + error!("subscriber mutex poisoned: {e}"); + return; + } + }; + guard.retain(|tx| match tx.try_send(payload.clone()) { + Ok(()) => true, + Err(crossbeam_channel::TrySendError::Full(_)) => true, + Err(crossbeam_channel::TrySendError::Disconnected(_)) => false, + }); + } + }; + node.set_event_callback(event_closure); + + if let Err(e) = node.start() { + let _ = ready_tx.send(Err(DeliveryError::StartupFailed(e))); + return; + } + info!("logos-delivery node started (preset={})", cfg.preset); + + // FIXME: This unconditional sleep is a stand-in for proper + // peer-connectivity detection. The right approach is to listen for a + // `peer_connected` (or equivalent status-change) event from the node + // callback and only proceed once at least one peer is reachable, + // falling back to a configurable timeout. logos-delivery would need to + // surface such an event via its callback mechanism for this to work. + thread::sleep(Duration::from_secs(3)); + + let default_topic = content_topic_for("delivery_address"); + if let Err(e) = node.subscribe(&default_topic) { + warn!("subscribe to {default_topic}: {e}"); + } else { + info!("subscribed to {default_topic}"); + } + + let _ = ready_tx.send(Ok(())); + + while let Ok(cmd) = out_rx.recv() { + let result = node + .send(&cmd.message_json) + .map(|_| ()) + .map_err(DeliveryError::PublishFailed); + let _ = cmd.reply.try_send(result); + } + + info!("logos-node outbound loop finished"); + } + + fn parse_message_received(data: &str) -> Option> { + let event: WakuEvent = serde_json::from_str(data).ok()?; + + if event.event_type != "message_received" { + return None; + } + + let msg = event.message?; + + if !msg.content_topic.starts_with("/logos-chat/1/") { + return None; + } + + msg.payload.decode() + } + + pub fn inbound_queue(&mut self) -> Receiver> { + self.inbound_rx + .take() + .expect("inbound_queue called more than once") + } +} + +impl DeliveryService for EmbeddedP2pDeliveryService { + type Error = DeliveryError; + + fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), DeliveryError> { + let msg = WakuMessage { + content_topic: content_topic_for(&envelope.delivery_address), + payload: BASE64.encode(&envelope.data), + ephemeral: false, + }; + let message_json = + serde_json::to_string(&msg).map_err(|e| DeliveryError::PublishFailed(e.to_string()))?; + + let (reply_tx, reply_rx) = mpsc::sync_channel(1); + self.outbound + .send(OutboundCmd { + message_json, + reply: reply_tx, + }) + .map_err(|_| DeliveryError::ChannelClosed)?; + + reply_rx.recv().map_err(|_| DeliveryError::ChannelClosed)? + } + + fn subscribe(&mut self, _: &str) -> Result<(), ::Error> { + // This Service does not support filtering + Ok(()) + } +} diff --git a/extensions/components/src/delivery/embedded_p2p_delivery/sys.rs b/extensions/components/src/delivery/embedded_p2p_delivery/sys.rs new file mode 100644 index 0000000..2036116 --- /dev/null +++ b/extensions/components/src/delivery/embedded_p2p_delivery/sys.rs @@ -0,0 +1,102 @@ +//! Raw FFI declarations matching liblogosdelivery.h (trampoline pattern). +//! +//! No `#[link]` attribute — build.rs handles linking to liblogosdelivery. +#![allow(unused)] + +use std::os::raw::{c_char, c_int, c_void}; +use std::slice; + +pub const RET_OK: i32 = 0; + +pub type FFICallBack = unsafe extern "C" fn(c_int, *const c_char, usize, *const c_void); + +unsafe extern "C" { + pub fn logosdelivery_create_node( + config_json: *const c_char, + cb: FFICallBack, + user_data: *const c_void, + ) -> *mut c_void; + + pub fn logosdelivery_start_node( + ctx: *mut c_void, + cb: FFICallBack, + user_data: *const c_void, + ) -> c_int; + + pub fn logosdelivery_stop_node( + ctx: *mut c_void, + cb: FFICallBack, + user_data: *const c_void, + ) -> c_int; + + pub fn logosdelivery_destroy( + ctx: *mut c_void, + cb: FFICallBack, + user_data: *const c_void, + ) -> c_int; + + pub fn logosdelivery_subscribe( + ctx: *mut c_void, + cb: FFICallBack, + user_data: *const c_void, + content_topic: *const c_char, + ) -> c_int; + + pub fn logosdelivery_unsubscribe( + ctx: *mut c_void, + cb: FFICallBack, + user_data: *const c_void, + content_topic: *const c_char, + ) -> c_int; + + /// `message_json`: `{"contentTopic": "...", "payload": "", "ephemeral": false}` + pub fn logosdelivery_send( + ctx: *mut c_void, + cb: FFICallBack, + user_data: *const c_void, + message_json: *const c_char, + ) -> c_int; + + pub fn logosdelivery_set_event_callback( + ctx: *mut c_void, + cb: FFICallBack, + user_data: *const c_void, + ); + + pub fn logosdelivery_get_node_info( + ctx: *mut c_void, + cb: FFICallBack, + user_data: *const c_void, + node_info_id: *const c_char, + ) -> c_int; +} + +// ── Trampoline ─────────────────────────────────────────────────────────────── + +pub unsafe extern "C" fn trampoline( + return_val: c_int, + buffer: *const c_char, + buffer_len: usize, + data: *const c_void, +) where + C: FnMut(i32, &str), +{ + if data.is_null() { + return; + } + let closure = unsafe { &mut *(data as *mut C) }; + if buffer.is_null() || buffer_len == 0 { + closure(return_val, ""); + return; + } + let bytes = unsafe { slice::from_raw_parts(buffer as *const u8, buffer_len) }; + let s = String::from_utf8_lossy(bytes); + closure(return_val, &s); +} + +pub fn get_trampoline(_: &C) -> FFICallBack +where + C: FnMut(i32, &str), +{ + trampoline:: +} diff --git a/extensions/components/src/delivery/embedded_p2p_delivery/wrapper.rs b/extensions/components/src/delivery/embedded_p2p_delivery/wrapper.rs new file mode 100644 index 0000000..04296a7 --- /dev/null +++ b/extensions/components/src/delivery/embedded_p2p_delivery/wrapper.rs @@ -0,0 +1,227 @@ +//! Safe synchronous wrapper around the raw liblogosdelivery FFI. +//! +//! # Why Box::into_raw for one-shot callbacks? +//! +//! `sendRequestToFFIThread` (nim-ffi) signals the caller as soon as the FFI +//! thread *receives* the request, before it processes it. The actual result +//! callback fires later, from the Nim async event loop, after the Rust call +//! frame has returned and its stack variables are gone. Passing `&mut closure` +//! as `user_data` therefore produces a dangling pointer by the time the +//! callback fires — a use-after-free that manifests as a SIGSEGV when the +//! operation fails and the callback tries to write an error into captured +//! stack memory. +//! +//! Fix: heap-allocate each one-shot closure with `Box::into_raw`, synchronise +//! via an `mpsc` channel (blocking until the callback fires), then drop the +//! box. The pointer is valid for the entire async lifetime of the request. +//! +//! # Why store the event callback inside LogosNodeCtx? +//! +//! Rust drops locals in reverse declaration order. If the event-callback box +//! were held by the caller (outside the node), it would be freed before the +//! node's Drop runs stop+destroy. During stop/destroy the Nim async event +//! loop can still fire the event callback, which would access freed memory. +//! +//! By storing the box as `_event_cb` inside `LogosNodeCtx`, Rust's field-drop +//! order guarantees it is freed *after* Drop::drop returns (i.e. after +//! stop+destroy complete), so the pointer is always valid when Nim calls it. + +use std::ffi::CString; +use std::os::raw::c_void; +use std::sync::mpsc; + +use super::sys::{self as ffi, RET_OK, get_trampoline}; + +/// Opaque handle to a logos-delivery node context. +pub struct LogosNodeCtx { + ctx: *mut c_void, + /// Keeps the event-callback closure alive for the lifetime of the node. + _event_cb: Option>, +} + +// The logos-delivery ctx pointer is thread-safe (serialized calls inside C/Nim). +unsafe impl Send for LogosNodeCtx {} +unsafe impl Sync for LogosNodeCtx {} + +impl LogosNodeCtx { + pub fn new(config_json: &str) -> Result { + let config_cstr = CString::new(config_json).map_err(|e| e.to_string())?; + + let (tx, rx) = mpsc::sync_channel::>(1); + let closure = move |ret: i32, data: &str| { + let _ = tx.send(if ret == RET_OK { + Ok(()) + } else { + Err(data.to_string()) + }); + }; + let raw = Box::into_raw(Box::new(closure)); + let cb = get_trampoline(unsafe { &*raw }); + + let ctx = unsafe { + ffi::logosdelivery_create_node(config_cstr.as_ptr(), cb, raw as *const c_void) + }; + + // create_node may call the callback synchronously (try_recv) or + // asynchronously (recv). Handle both. + let callback_result: Result<(), String> = if ctx.is_null() { + rx.try_recv() + .unwrap_or(Err("logosdelivery_create_node returned null".into())) + } else { + rx.recv() + .unwrap_or(Err("callback channel disconnected".into())) + }; + drop(unsafe { Box::from_raw(raw) }); + + callback_result.map(|_| Self { + ctx, + _event_cb: None, + }) + } + + pub fn start(&self) -> Result<(), String> { + let (tx, rx) = mpsc::sync_channel::>(1); + let closure = move |ret: i32, data: &str| { + let _ = tx.send(if ret == RET_OK { + Ok(()) + } else { + Err(data.to_string()) + }); + }; + let raw = Box::into_raw(Box::new(closure)); + let cb = get_trampoline(unsafe { &*raw }); + + let ret = unsafe { ffi::logosdelivery_start_node(self.ctx, cb, raw as *const c_void) }; + + if ret != RET_OK { + drop(unsafe { Box::from_raw(raw) }); + return Err(format!("logosdelivery_start_node returned {ret}")); + } + let result = rx + .recv() + .unwrap_or(Err("callback channel disconnected".into())); + drop(unsafe { Box::from_raw(raw) }); + result + } + + pub fn subscribe(&self, content_topic: &str) -> Result<(), String> { + let topic_cstr = CString::new(content_topic).map_err(|e| e.to_string())?; + + let (tx, rx) = mpsc::sync_channel::>(1); + let closure = move |ret: i32, data: &str| { + let _ = tx.send(if ret == RET_OK { + Ok(()) + } else { + Err(data.to_string()) + }); + }; + let raw = Box::into_raw(Box::new(closure)); + let cb = get_trampoline(unsafe { &*raw }); + + let ret = unsafe { + ffi::logosdelivery_subscribe(self.ctx, cb, raw as *const c_void, topic_cstr.as_ptr()) + }; + + if ret != RET_OK { + drop(unsafe { Box::from_raw(raw) }); + return Err(format!("logosdelivery_subscribe returned {ret}")); + } + let result = rx + .recv() + .unwrap_or(Err("callback channel disconnected".into())); + drop(unsafe { Box::from_raw(raw) }); + result + } + + /// Returns the request ID on success. + pub fn send(&self, message_json: &str) -> Result { + let msg_cstr = CString::new(message_json).map_err(|e| e.to_string())?; + + let (tx, rx) = mpsc::sync_channel::>(1); + let closure = move |ret: i32, data: &str| { + let _ = tx.send(if ret == RET_OK { + Ok(data.to_string()) + } else { + Err(data.to_string()) + }); + }; + let raw = Box::into_raw(Box::new(closure)); + let cb = get_trampoline(unsafe { &*raw }); + + let ret = unsafe { + ffi::logosdelivery_send(self.ctx, cb, raw as *const c_void, msg_cstr.as_ptr()) + }; + + if ret != RET_OK { + drop(unsafe { Box::from_raw(raw) }); + return Err(format!("logosdelivery_send returned {ret}")); + } + let result = rx + .recv() + .unwrap_or(Err("callback channel disconnected".into())); + drop(unsafe { Box::from_raw(raw) }); + result + } + + /// Stores the event callback inside the node so it is dropped *after* + /// stop+destroy in Drop, keeping the pointer valid for the node's lifetime. + pub fn set_event_callback(&mut self, closure: C) + where + C: FnMut(i32, &str) + Send + 'static, + { + let mut boxed = Box::new(closure); + let cb = get_trampoline(&*boxed); + let user_data = &mut *boxed as *mut C as *const c_void; + unsafe { + ffi::logosdelivery_set_event_callback(self.ctx, cb, user_data); + } + // Move the box into self; the heap address (user_data) is unaffected. + self._event_cb = Some(boxed); + } + + pub fn stop(&self) -> Result<(), String> { + let (tx, rx) = mpsc::sync_channel::>(1); + let closure = move |ret: i32, data: &str| { + let _ = tx.send(if ret == RET_OK { + Ok(()) + } else { + Err(data.to_string()) + }); + }; + let raw = Box::into_raw(Box::new(closure)); + let cb = get_trampoline(unsafe { &*raw }); + + let ret = unsafe { ffi::logosdelivery_stop_node(self.ctx, cb, raw as *const c_void) }; + + if ret != RET_OK { + drop(unsafe { Box::from_raw(raw) }); + return Err(format!("logosdelivery_stop_node returned {ret}")); + } + let result = rx + .recv() + .unwrap_or(Err("callback channel disconnected".into())); + drop(unsafe { Box::from_raw(raw) }); + result + } +} + +impl Drop for LogosNodeCtx { + fn drop(&mut self) { + // stop+destroy must complete before _event_cb is freed. + // Rust drops fields after Drop::drop returns, so _event_cb outlives + // everything below — the event callback pointer stays valid throughout. + if let Err(e) = self.stop() { + tracing::warn!("logosdelivery_stop_node failed during drop: {e}"); + } + + let (tx, rx) = mpsc::sync_channel::<()>(1); + let closure = move |_: i32, _: &str| { + let _ = tx.send(()); + }; + let raw = Box::into_raw(Box::new(closure)); + let cb = get_trampoline(unsafe { &*raw }); + unsafe { ffi::logosdelivery_destroy(self.ctx, cb, raw as *const c_void) }; + let _ = rx.recv(); + drop(unsafe { Box::from_raw(raw) }); + } +}