From 06e29b1dbfa8b33923c60b9330fab1dd418d8301 Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Tue, 23 Jun 2026 13:52:29 -0700 Subject: [PATCH 01/13] Move logos_delivery to components --- extensions/components/Cargo.toml | 4 + extensions/components/build.rs | 28 ++ extensions/components/src/delivery.rs | 6 + .../src/delivery/embedded_p2p_delivery.rs | 314 ++++++++++++++++++ .../src/delivery/embedded_p2p_delivery/sys.rs | 102 ++++++ .../delivery/embedded_p2p_delivery/wrapper.rs | 227 +++++++++++++ 6 files changed, 681 insertions(+) create mode 100644 extensions/components/build.rs create mode 100644 extensions/components/src/delivery/embedded_p2p_delivery.rs create mode 100644 extensions/components/src/delivery/embedded_p2p_delivery/sys.rs create mode 100644 extensions/components/src/delivery/embedded_p2p_delivery/wrapper.rs diff --git a/extensions/components/Cargo.toml b/extensions/components/Cargo.toml index 582c0bc..b948966 100644 --- a/extensions/components/Cargo.toml +++ b/extensions/components/Cargo.toml @@ -3,6 +3,9 @@ name = "components" version = "0.1.0" edition = "2024" +[features] +logos-delivery = [] + [dependencies] # Workspace dependencies (sorted) crypto = { workspace = true } @@ -15,5 +18,6 @@ crossbeam-channel = { workspace = true } hex = "0.4.3" reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls"] } serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" thiserror = "2" tracing = "0.1" diff --git a/extensions/components/build.rs b/extensions/components/build.rs new file mode 100644 index 0000000..1f45a13 --- /dev/null +++ b/extensions/components/build.rs @@ -0,0 +1,28 @@ +fn main() { + println!("cargo:rerun-if-env-changed=LOGOS_DELIVERY_LIB_DIR"); + println!("cargo::rustc-check-cfg=cfg(logos_delivery)"); + + let feature_enabled = std::env::var("CARGO_FEATURE_LOGOS_DELIVERY").is_ok(); + let lib_dir = std::env::var("LOGOS_DELIVERY_LIB_DIR"); + + let lib_dir = match lib_dir { + Ok(dir) => dir, + Err(_) if !feature_enabled => return, + Err(_) => { + // Feature is on but no library path — enable compilation, skip linking. + println!("cargo:rustc-cfg=logos_delivery"); + return; + } + }; + + println!("cargo:rustc-cfg=logos_delivery"); + + println!("cargo:rustc-link-search=native={lib_dir}"); + println!("cargo:rustc-link-lib=dylib=logosdelivery"); + + let target_os = std::env::var("CARGO_CFG_TARGET_OS").unwrap_or_default(); + match target_os.as_str() { + "macos" | "linux" => println!("cargo:rustc-link-arg=-Wl,-rpath,{lib_dir}"), + other => panic!("unsupported OS for logos-delivery transport: {other}"), + } +} diff --git a/extensions/components/src/delivery.rs b/extensions/components/src/delivery.rs index 627ac55..a59dd88 100644 --- a/extensions/components/src/delivery.rs +++ b/extensions/components/src/delivery.rs @@ -1,3 +1,9 @@ mod local_broadcaster; pub use local_broadcaster::LocalBroadcaster; + +#[cfg(logos_delivery)] +mod embedded_p2p_delivery; + +#[cfg(logos_delivery)] +pub use embedded_p2p_delivery::EmbeddedP2pDeliveryService; diff --git a/extensions/components/src/delivery/embedded_p2p_delivery.rs b/extensions/components/src/delivery/embedded_p2p_delivery.rs new file mode 100644 index 0000000..bb8fb1d --- /dev/null +++ b/extensions/components/src/delivery/embedded_p2p_delivery.rs @@ -0,0 +1,314 @@ +//! logos-delivery backed [`client::DeliveryService`] implementation. +//! +//! `LogosDeliveryService` wraps an embedded logos-delivery node running on a +//! dedicated `std::thread`. All interaction is via synchronous `std::sync::mpsc` +//! channels. +//! +//! ## Content topic mapping +//! +//! `AddressedEnvelope::delivery_address` maps to logos-delivery content topic +//! `/logos-chat/1/{delivery_address}/proto`. + +pub(crate) mod sys; +pub(crate) mod wrapper; + +use std::sync::{Arc, Mutex, mpsc}; +use std::thread; +use std::time::Duration; + +use base64::Engine; +use base64::engine::general_purpose::STANDARD as BASE64; +use crossbeam_channel::{Receiver, Sender}; +use libchat::{AddressedEnvelope, DeliveryService}; +use tracing::{error, info, warn}; + +use wrapper::LogosNodeCtx; + +pub fn content_topic_for(delivery_address: &str) -> String { + format!("/logos-chat/1/{delivery_address}/proto") +} + +// ── Error ──────────────────────────────────────────────────────────────────── + +#[derive(Debug, thiserror::Error)] +pub enum DeliveryError { + #[error("node startup failed: {0}")] + StartupFailed(String), + #[error("publish failed: {0}")] + PublishFailed(String), + #[error("send channel closed")] + ChannelClosed, +} + +// ── Internals ──────────────────────────────────────────────────────────────── + +struct OutboundCmd { + message_json: String, + reply: mpsc::SyncSender>, +} + +type SubscriberList = Arc>>>>; + +// ── Config ─────────────────────────────────────────────────────────────────── + +#[derive(Debug, Clone)] +pub struct Config { + pub preset: String, + pub tcp_port: u16, + pub log_level: String, +} + +impl Default for Config { + fn default() -> Self { + Self { + preset: "logos.dev".into(), + tcp_port: 60000, + log_level: "ERROR".into(), + } + } +} + +// ── Wire types ────────────────────────────────────────────────────────────── + +/// Outbound message sent to the logos-delivery node. +#[derive(Debug, serde::Serialize, serde::Deserialize)] +struct WakuMessage { + #[serde(rename = "contentTopic")] + content_topic: String, + /// Base64-encoded payload. + payload: String, + ephemeral: bool, +} + +/// Top-level event envelope received from the logos-delivery node callback. +#[derive(Debug, serde::Deserialize)] +struct WakuEvent { + #[serde(rename = "eventType")] + event_type: String, + message: Option, +} + +/// Message payload from a `message_received` event. +#[derive(Debug, serde::Deserialize)] +struct ReceivedMessage { + #[serde(rename = "contentTopic")] + content_topic: String, + /// The node may deliver the payload as either a base64 string or a JSON + /// array of byte values. + payload: WakuPayload, +} + +/// Untagged union that handles both payload representations. +#[derive(Debug, serde::Deserialize)] +#[serde(untagged)] +enum WakuPayload { + Base64(String), + Bytes(Vec), +} + +impl WakuPayload { + fn decode(self) -> Option> { + match self { + WakuPayload::Base64(s) => BASE64.decode(s).ok(), + WakuPayload::Bytes(b) => Some(b), + } + } +} + +// ── EmbeddedP2pDeliveryService ────────────────────────────────────────────────── + +/// logos-delivery backed delivery service. Cheap to clone — all clones share +/// the same background node. +#[derive(Clone, Debug)] +pub struct EmbeddedP2pDeliveryService { + outbound: mpsc::SyncSender, + #[allow(dead_code)] + subscribers: SubscriberList, + inbound_rx: Option>>, +} + +impl EmbeddedP2pDeliveryService { + /// Start the embedded logos-delivery node. The client drains inbound + /// payloads via [`Transport::inbound`]. + pub fn start(cfg: Config) -> Result { + let (out_tx, out_rx) = mpsc::sync_channel::(256); + let subscribers: SubscriberList = Arc::new(Mutex::new(Vec::new())); + let (ready_tx, ready_rx) = mpsc::channel::>(); + // Create the inbound channel before spawning so the receiver is + // registered inside the thread, before any event callback fires. + let (inbound_tx, inbound_rx) = crossbeam_channel::bounded::>(1024); + + let subs_for_thread = subscribers.clone(); + + let handle = thread::Builder::new() + .name("logos-node".into()) + .spawn(move || { + if let Err(panic) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + Self::node_thread(cfg, out_rx, subs_for_thread, inbound_tx, ready_tx); + })) { + let msg = panic + .downcast_ref::<&str>() + .map(|s| s.to_string()) + .or_else(|| panic.downcast_ref::().cloned()) + .unwrap_or_else(|| "unknown panic".into()); + error!("logos-node thread panicked: {msg}"); + } + }) + .map_err(|e| DeliveryError::StartupFailed(e.to_string()))?; + + // On failure, the node thread drops LogosNodeCtx (stop+destroy against + // a half-initialized Nim node). Join it so the process doesn't begin + // teardown mid-destroy — that race SIGSEGVs inside the Nim async loop. + let ready = ready_rx.recv().unwrap_or_else(|_| { + Err(DeliveryError::StartupFailed( + "node thread exited before ready".into(), + )) + }); + if let Err(e) = ready { + let _ = handle.join(); + return Err(e); + } + + Ok(Self { + outbound: out_tx, + subscribers, + inbound_rx: Some(inbound_rx), + }) + } + + fn node_thread( + cfg: Config, + out_rx: mpsc::Receiver, + subscribers: SubscriberList, + inbound_tx: Sender>, + ready_tx: mpsc::Sender>, + ) { + // discv5UdpPort defaults to 9000 in libwaku, so a second instance with + // a distinct --port still collides on UDP. Bind it to tcp_port so a + // single --port knob keeps both ports distinct across instances. + let config_json = serde_json::json!({ + "logLevel": cfg.log_level, + "mode": "Core", + "preset": cfg.preset, + "tcpPort": cfg.tcp_port, + "discv5UdpPort": cfg.tcp_port, + }) + .to_string(); + + let mut node = match LogosNodeCtx::new(&config_json) { + Ok(n) => n, + Err(e) => { + let _ = ready_tx.send(Err(DeliveryError::StartupFailed(e))); + return; + } + }; + + // Register the inbound sender before installing the event callback so + // there is no window where the callback is live but the channel is not + // yet in the subscriber list. + subscribers.lock().unwrap().push(inbound_tx); + + let subs_for_cb = subscribers.clone(); + let event_closure = move |_ret: i32, data: &str| { + if let Some(payload) = Self::parse_message_received(data) { + let mut guard = match subs_for_cb.lock() { + Ok(g) => g, + Err(e) => { + error!("subscriber mutex poisoned: {e}"); + return; + } + }; + guard.retain(|tx| match tx.try_send(payload.clone()) { + Ok(()) => true, + Err(crossbeam_channel::TrySendError::Full(_)) => true, + Err(crossbeam_channel::TrySendError::Disconnected(_)) => false, + }); + } + }; + node.set_event_callback(event_closure); + + if let Err(e) = node.start() { + let _ = ready_tx.send(Err(DeliveryError::StartupFailed(e))); + return; + } + info!("logos-delivery node started (preset={})", cfg.preset); + + // FIXME: This unconditional sleep is a stand-in for proper + // peer-connectivity detection. The right approach is to listen for a + // `peer_connected` (or equivalent status-change) event from the node + // callback and only proceed once at least one peer is reachable, + // falling back to a configurable timeout. logos-delivery would need to + // surface such an event via its callback mechanism for this to work. + thread::sleep(Duration::from_secs(3)); + + let default_topic = content_topic_for("delivery_address"); + if let Err(e) = node.subscribe(&default_topic) { + warn!("subscribe to {default_topic}: {e}"); + } else { + info!("subscribed to {default_topic}"); + } + + let _ = ready_tx.send(Ok(())); + + while let Ok(cmd) = out_rx.recv() { + let result = node + .send(&cmd.message_json) + .map(|_| ()) + .map_err(DeliveryError::PublishFailed); + let _ = cmd.reply.try_send(result); + } + + info!("logos-node outbound loop finished"); + } + + fn parse_message_received(data: &str) -> Option> { + let event: WakuEvent = serde_json::from_str(data).ok()?; + + if event.event_type != "message_received" { + return None; + } + + let msg = event.message?; + + if !msg.content_topic.starts_with("/logos-chat/1/") { + return None; + } + + msg.payload.decode() + } + + pub fn inbound_queue(&mut self) -> Receiver> { + self.inbound_rx + .take() + .expect("inbound_queue called more than once") + } +} + +impl DeliveryService for EmbeddedP2pDeliveryService { + type Error = DeliveryError; + + fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), DeliveryError> { + let msg = WakuMessage { + content_topic: content_topic_for(&envelope.delivery_address), + payload: BASE64.encode(&envelope.data), + ephemeral: false, + }; + let message_json = + serde_json::to_string(&msg).map_err(|e| DeliveryError::PublishFailed(e.to_string()))?; + + let (reply_tx, reply_rx) = mpsc::sync_channel(1); + self.outbound + .send(OutboundCmd { + message_json, + reply: reply_tx, + }) + .map_err(|_| DeliveryError::ChannelClosed)?; + + reply_rx.recv().map_err(|_| DeliveryError::ChannelClosed)? + } + + fn subscribe(&mut self, _: &str) -> Result<(), ::Error> { + // This Service does not support filtering + Ok(()) + } +} diff --git a/extensions/components/src/delivery/embedded_p2p_delivery/sys.rs b/extensions/components/src/delivery/embedded_p2p_delivery/sys.rs new file mode 100644 index 0000000..2036116 --- /dev/null +++ b/extensions/components/src/delivery/embedded_p2p_delivery/sys.rs @@ -0,0 +1,102 @@ +//! Raw FFI declarations matching liblogosdelivery.h (trampoline pattern). +//! +//! No `#[link]` attribute — build.rs handles linking to liblogosdelivery. +#![allow(unused)] + +use std::os::raw::{c_char, c_int, c_void}; +use std::slice; + +pub const RET_OK: i32 = 0; + +pub type FFICallBack = unsafe extern "C" fn(c_int, *const c_char, usize, *const c_void); + +unsafe extern "C" { + pub fn logosdelivery_create_node( + config_json: *const c_char, + cb: FFICallBack, + user_data: *const c_void, + ) -> *mut c_void; + + pub fn logosdelivery_start_node( + ctx: *mut c_void, + cb: FFICallBack, + user_data: *const c_void, + ) -> c_int; + + pub fn logosdelivery_stop_node( + ctx: *mut c_void, + cb: FFICallBack, + user_data: *const c_void, + ) -> c_int; + + pub fn logosdelivery_destroy( + ctx: *mut c_void, + cb: FFICallBack, + user_data: *const c_void, + ) -> c_int; + + pub fn logosdelivery_subscribe( + ctx: *mut c_void, + cb: FFICallBack, + user_data: *const c_void, + content_topic: *const c_char, + ) -> c_int; + + pub fn logosdelivery_unsubscribe( + ctx: *mut c_void, + cb: FFICallBack, + user_data: *const c_void, + content_topic: *const c_char, + ) -> c_int; + + /// `message_json`: `{"contentTopic": "...", "payload": "", "ephemeral": false}` + pub fn logosdelivery_send( + ctx: *mut c_void, + cb: FFICallBack, + user_data: *const c_void, + message_json: *const c_char, + ) -> c_int; + + pub fn logosdelivery_set_event_callback( + ctx: *mut c_void, + cb: FFICallBack, + user_data: *const c_void, + ); + + pub fn logosdelivery_get_node_info( + ctx: *mut c_void, + cb: FFICallBack, + user_data: *const c_void, + node_info_id: *const c_char, + ) -> c_int; +} + +// ── Trampoline ─────────────────────────────────────────────────────────────── + +pub unsafe extern "C" fn trampoline( + return_val: c_int, + buffer: *const c_char, + buffer_len: usize, + data: *const c_void, +) where + C: FnMut(i32, &str), +{ + if data.is_null() { + return; + } + let closure = unsafe { &mut *(data as *mut C) }; + if buffer.is_null() || buffer_len == 0 { + closure(return_val, ""); + return; + } + let bytes = unsafe { slice::from_raw_parts(buffer as *const u8, buffer_len) }; + let s = String::from_utf8_lossy(bytes); + closure(return_val, &s); +} + +pub fn get_trampoline(_: &C) -> FFICallBack +where + C: FnMut(i32, &str), +{ + trampoline:: +} diff --git a/extensions/components/src/delivery/embedded_p2p_delivery/wrapper.rs b/extensions/components/src/delivery/embedded_p2p_delivery/wrapper.rs new file mode 100644 index 0000000..04296a7 --- /dev/null +++ b/extensions/components/src/delivery/embedded_p2p_delivery/wrapper.rs @@ -0,0 +1,227 @@ +//! Safe synchronous wrapper around the raw liblogosdelivery FFI. +//! +//! # Why Box::into_raw for one-shot callbacks? +//! +//! `sendRequestToFFIThread` (nim-ffi) signals the caller as soon as the FFI +//! thread *receives* the request, before it processes it. The actual result +//! callback fires later, from the Nim async event loop, after the Rust call +//! frame has returned and its stack variables are gone. Passing `&mut closure` +//! as `user_data` therefore produces a dangling pointer by the time the +//! callback fires — a use-after-free that manifests as a SIGSEGV when the +//! operation fails and the callback tries to write an error into captured +//! stack memory. +//! +//! Fix: heap-allocate each one-shot closure with `Box::into_raw`, synchronise +//! via an `mpsc` channel (blocking until the callback fires), then drop the +//! box. The pointer is valid for the entire async lifetime of the request. +//! +//! # Why store the event callback inside LogosNodeCtx? +//! +//! Rust drops locals in reverse declaration order. If the event-callback box +//! were held by the caller (outside the node), it would be freed before the +//! node's Drop runs stop+destroy. During stop/destroy the Nim async event +//! loop can still fire the event callback, which would access freed memory. +//! +//! By storing the box as `_event_cb` inside `LogosNodeCtx`, Rust's field-drop +//! order guarantees it is freed *after* Drop::drop returns (i.e. after +//! stop+destroy complete), so the pointer is always valid when Nim calls it. + +use std::ffi::CString; +use std::os::raw::c_void; +use std::sync::mpsc; + +use super::sys::{self as ffi, RET_OK, get_trampoline}; + +/// Opaque handle to a logos-delivery node context. +pub struct LogosNodeCtx { + ctx: *mut c_void, + /// Keeps the event-callback closure alive for the lifetime of the node. + _event_cb: Option>, +} + +// The logos-delivery ctx pointer is thread-safe (serialized calls inside C/Nim). +unsafe impl Send for LogosNodeCtx {} +unsafe impl Sync for LogosNodeCtx {} + +impl LogosNodeCtx { + pub fn new(config_json: &str) -> Result { + let config_cstr = CString::new(config_json).map_err(|e| e.to_string())?; + + let (tx, rx) = mpsc::sync_channel::>(1); + let closure = move |ret: i32, data: &str| { + let _ = tx.send(if ret == RET_OK { + Ok(()) + } else { + Err(data.to_string()) + }); + }; + let raw = Box::into_raw(Box::new(closure)); + let cb = get_trampoline(unsafe { &*raw }); + + let ctx = unsafe { + ffi::logosdelivery_create_node(config_cstr.as_ptr(), cb, raw as *const c_void) + }; + + // create_node may call the callback synchronously (try_recv) or + // asynchronously (recv). Handle both. + let callback_result: Result<(), String> = if ctx.is_null() { + rx.try_recv() + .unwrap_or(Err("logosdelivery_create_node returned null".into())) + } else { + rx.recv() + .unwrap_or(Err("callback channel disconnected".into())) + }; + drop(unsafe { Box::from_raw(raw) }); + + callback_result.map(|_| Self { + ctx, + _event_cb: None, + }) + } + + pub fn start(&self) -> Result<(), String> { + let (tx, rx) = mpsc::sync_channel::>(1); + let closure = move |ret: i32, data: &str| { + let _ = tx.send(if ret == RET_OK { + Ok(()) + } else { + Err(data.to_string()) + }); + }; + let raw = Box::into_raw(Box::new(closure)); + let cb = get_trampoline(unsafe { &*raw }); + + let ret = unsafe { ffi::logosdelivery_start_node(self.ctx, cb, raw as *const c_void) }; + + if ret != RET_OK { + drop(unsafe { Box::from_raw(raw) }); + return Err(format!("logosdelivery_start_node returned {ret}")); + } + let result = rx + .recv() + .unwrap_or(Err("callback channel disconnected".into())); + drop(unsafe { Box::from_raw(raw) }); + result + } + + pub fn subscribe(&self, content_topic: &str) -> Result<(), String> { + let topic_cstr = CString::new(content_topic).map_err(|e| e.to_string())?; + + let (tx, rx) = mpsc::sync_channel::>(1); + let closure = move |ret: i32, data: &str| { + let _ = tx.send(if ret == RET_OK { + Ok(()) + } else { + Err(data.to_string()) + }); + }; + let raw = Box::into_raw(Box::new(closure)); + let cb = get_trampoline(unsafe { &*raw }); + + let ret = unsafe { + ffi::logosdelivery_subscribe(self.ctx, cb, raw as *const c_void, topic_cstr.as_ptr()) + }; + + if ret != RET_OK { + drop(unsafe { Box::from_raw(raw) }); + return Err(format!("logosdelivery_subscribe returned {ret}")); + } + let result = rx + .recv() + .unwrap_or(Err("callback channel disconnected".into())); + drop(unsafe { Box::from_raw(raw) }); + result + } + + /// Returns the request ID on success. + pub fn send(&self, message_json: &str) -> Result { + let msg_cstr = CString::new(message_json).map_err(|e| e.to_string())?; + + let (tx, rx) = mpsc::sync_channel::>(1); + let closure = move |ret: i32, data: &str| { + let _ = tx.send(if ret == RET_OK { + Ok(data.to_string()) + } else { + Err(data.to_string()) + }); + }; + let raw = Box::into_raw(Box::new(closure)); + let cb = get_trampoline(unsafe { &*raw }); + + let ret = unsafe { + ffi::logosdelivery_send(self.ctx, cb, raw as *const c_void, msg_cstr.as_ptr()) + }; + + if ret != RET_OK { + drop(unsafe { Box::from_raw(raw) }); + return Err(format!("logosdelivery_send returned {ret}")); + } + let result = rx + .recv() + .unwrap_or(Err("callback channel disconnected".into())); + drop(unsafe { Box::from_raw(raw) }); + result + } + + /// Stores the event callback inside the node so it is dropped *after* + /// stop+destroy in Drop, keeping the pointer valid for the node's lifetime. + pub fn set_event_callback(&mut self, closure: C) + where + C: FnMut(i32, &str) + Send + 'static, + { + let mut boxed = Box::new(closure); + let cb = get_trampoline(&*boxed); + let user_data = &mut *boxed as *mut C as *const c_void; + unsafe { + ffi::logosdelivery_set_event_callback(self.ctx, cb, user_data); + } + // Move the box into self; the heap address (user_data) is unaffected. + self._event_cb = Some(boxed); + } + + pub fn stop(&self) -> Result<(), String> { + let (tx, rx) = mpsc::sync_channel::>(1); + let closure = move |ret: i32, data: &str| { + let _ = tx.send(if ret == RET_OK { + Ok(()) + } else { + Err(data.to_string()) + }); + }; + let raw = Box::into_raw(Box::new(closure)); + let cb = get_trampoline(unsafe { &*raw }); + + let ret = unsafe { ffi::logosdelivery_stop_node(self.ctx, cb, raw as *const c_void) }; + + if ret != RET_OK { + drop(unsafe { Box::from_raw(raw) }); + return Err(format!("logosdelivery_stop_node returned {ret}")); + } + let result = rx + .recv() + .unwrap_or(Err("callback channel disconnected".into())); + drop(unsafe { Box::from_raw(raw) }); + result + } +} + +impl Drop for LogosNodeCtx { + fn drop(&mut self) { + // stop+destroy must complete before _event_cb is freed. + // Rust drops fields after Drop::drop returns, so _event_cb outlives + // everything below — the event callback pointer stays valid throughout. + if let Err(e) = self.stop() { + tracing::warn!("logosdelivery_stop_node failed during drop: {e}"); + } + + let (tx, rx) = mpsc::sync_channel::<()>(1); + let closure = move |_: i32, _: &str| { + let _ = tx.send(()); + }; + let raw = Box::into_raw(Box::new(closure)); + let cb = get_trampoline(unsafe { &*raw }); + unsafe { ffi::logosdelivery_destroy(self.ctx, cb, raw as *const c_void) }; + let _ = rx.recv(); + drop(unsafe { Box::from_raw(raw) }); + } +} From 07c528040f298ff34c1d5ff7368741e75a8b2e87 Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Tue, 23 Jun 2026 17:31:07 -0700 Subject: [PATCH 02/13] Rename components --- Cargo.lock | 2 ++ extensions/components/Cargo.toml | 2 +- extensions/components/build.rs | 2 +- extensions/components/src/delivery.rs | 4 ++-- .../components/src/delivery/embedded_p2p_delivery.rs | 10 +++++----- extensions/components/src/lib.rs | 6 ++++-- flake.nix | 7 ++++++- 7 files changed, 21 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a28de64..4566a19 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1315,6 +1315,7 @@ dependencies = [ "arboard", "base64", "clap", + "components", "crossbeam-channel", "crossterm 0.29.0", "logos-chat", @@ -1468,6 +1469,7 @@ dependencies = [ "libchat", "reqwest 0.12.28", "serde", + "serde_json", "storage", "thiserror", "tracing", diff --git a/extensions/components/Cargo.toml b/extensions/components/Cargo.toml index b948966..4de8dca 100644 --- a/extensions/components/Cargo.toml +++ b/extensions/components/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2024" [features] -logos-delivery = [] +embedded_p2p_delivery = [] [dependencies] # Workspace dependencies (sorted) diff --git a/extensions/components/build.rs b/extensions/components/build.rs index 1f45a13..3b51fa8 100644 --- a/extensions/components/build.rs +++ b/extensions/components/build.rs @@ -2,7 +2,7 @@ fn main() { println!("cargo:rerun-if-env-changed=LOGOS_DELIVERY_LIB_DIR"); println!("cargo::rustc-check-cfg=cfg(logos_delivery)"); - let feature_enabled = std::env::var("CARGO_FEATURE_LOGOS_DELIVERY").is_ok(); + let feature_enabled = std::env::var("CARGO_FEATURE_EMBEDDED_P2P_DELIVERY").is_ok(); let lib_dir = std::env::var("LOGOS_DELIVERY_LIB_DIR"); let lib_dir = match lib_dir { diff --git a/extensions/components/src/delivery.rs b/extensions/components/src/delivery.rs index a59dd88..172c315 100644 --- a/extensions/components/src/delivery.rs +++ b/extensions/components/src/delivery.rs @@ -3,7 +3,7 @@ mod local_broadcaster; pub use local_broadcaster::LocalBroadcaster; #[cfg(logos_delivery)] -mod embedded_p2p_delivery; +pub mod embedded_p2p_delivery; #[cfg(logos_delivery)] -pub use embedded_p2p_delivery::EmbeddedP2pDeliveryService; +pub use embedded_p2p_delivery::{EmbeddedP2pDeliveryService, P2pConfig}; diff --git a/extensions/components/src/delivery/embedded_p2p_delivery.rs b/extensions/components/src/delivery/embedded_p2p_delivery.rs index bb8fb1d..e2b855d 100644 --- a/extensions/components/src/delivery/embedded_p2p_delivery.rs +++ b/extensions/components/src/delivery/embedded_p2p_delivery.rs @@ -49,16 +49,16 @@ struct OutboundCmd { type SubscriberList = Arc>>>>; -// ── Config ─────────────────────────────────────────────────────────────────── +// ── P2pConfig ─────────────────────────────────────────────────────────────────── #[derive(Debug, Clone)] -pub struct Config { +pub struct P2pConfig { pub preset: String, pub tcp_port: u16, pub log_level: String, } -impl Default for Config { +impl Default for P2pConfig { fn default() -> Self { Self { preset: "logos.dev".into(), @@ -130,7 +130,7 @@ pub struct EmbeddedP2pDeliveryService { impl EmbeddedP2pDeliveryService { /// Start the embedded logos-delivery node. The client drains inbound /// payloads via [`Transport::inbound`]. - pub fn start(cfg: Config) -> Result { + pub fn start(cfg: P2pConfig) -> Result { let (out_tx, out_rx) = mpsc::sync_channel::(256); let subscribers: SubscriberList = Arc::new(Mutex::new(Vec::new())); let (ready_tx, ready_rx) = mpsc::channel::>(); @@ -177,7 +177,7 @@ impl EmbeddedP2pDeliveryService { } fn node_thread( - cfg: Config, + cfg: P2pConfig, out_rx: mpsc::Receiver, subscribers: SubscriberList, inbound_tx: Sender>, diff --git a/extensions/components/src/lib.rs b/extensions/components/src/lib.rs index d7cb449..2ef540e 100644 --- a/extensions/components/src/lib.rs +++ b/extensions/components/src/lib.rs @@ -1,10 +1,12 @@ mod contact_registry; -mod delivery; +pub mod delivery; mod storage; mod wakeup; pub use contact_registry::ephemeral::EphemeralRegistry; pub use contact_registry::http::{HttpRegistry, HttpRegistryError}; -pub use delivery::*; pub use storage::*; pub use wakeup::*; + +#[cfg(logos_delivery)] +pub use delivery::{EmbeddedP2pDeliveryService, P2pConfig}; diff --git a/flake.nix b/flake.nix index 0929535..bffe3d2 100644 --- a/flake.nix +++ b/flake.nix @@ -37,9 +37,10 @@ } ); - devShells = forAllSystems ({ pkgs, ... }: + devShells = forAllSystems ({ pkgs, system, ... }: let rustToolchain = pkgs.rust-bin.fromRustupToolchainFile ./rust_toolchain.toml; + logosDeliveryLib = self.packages.${system}.logos-delivery; in { default = pkgs.mkShell { @@ -50,6 +51,10 @@ pkgs.perl pkgs.protobuf ]; + buildInputs = [ logosDeliveryLib ]; + shellHook = '' + export LOGOS_DELIVERY_LIB_DIR="${logosDeliveryLib}/lib" + ''; }; } ); From 7e2a52b2bfef25875102b1f94a0d5ea3b4d50cf8 Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Tue, 23 Jun 2026 17:52:16 -0700 Subject: [PATCH 03/13] update deps --- Cargo.lock | 1 - 1 file changed, 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 4566a19..fdc8384 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1315,7 +1315,6 @@ dependencies = [ "arboard", "base64", "clap", - "components", "crossbeam-channel", "crossterm 0.29.0", "logos-chat", From 7c1d691cc26778fab42cd13ea80a8cbe8720e9f0 Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Tue, 23 Jun 2026 22:08:50 -0700 Subject: [PATCH 04/13] WIP --- Cargo.lock | 1 + bin/chat-cli/Cargo.toml | 2 + bin/chat-cli/build.rs | 7 +- bin/chat-cli/src/main.rs | 44 ++- bin/chat-cli/src/transport.rs | 2 - bin/chat-cli/src/transport/logos_delivery.rs | 316 ------------------ .../src/transport/logos_delivery/sys.rs | 102 ------ .../src/transport/logos_delivery/wrapper.rs | 227 ------------- extensions/components/Cargo.toml | 1 + extensions/components/build.rs | 62 +++- extensions/components/src/lib.rs | 2 +- 11 files changed, 90 insertions(+), 676 deletions(-) delete mode 100644 bin/chat-cli/src/transport/logos_delivery.rs delete mode 100644 bin/chat-cli/src/transport/logos_delivery/sys.rs delete mode 100644 bin/chat-cli/src/transport/logos_delivery/wrapper.rs diff --git a/Cargo.lock b/Cargo.lock index fdc8384..4566a19 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1315,6 +1315,7 @@ dependencies = [ "arboard", "base64", "clap", + "components", "crossbeam-channel", "crossterm 0.29.0", "logos-chat", diff --git a/bin/chat-cli/Cargo.toml b/bin/chat-cli/Cargo.toml index d089f05..542c8d7 100644 --- a/bin/chat-cli/Cargo.toml +++ b/bin/chat-cli/Cargo.toml @@ -9,9 +9,11 @@ path = "src/main.rs" [dependencies] # Workspace dependencies (sorted) +components = { workspace = true , features = ["embedded_p2p_delivery"]} crossbeam-channel = { workspace = true } logos-chat = { workspace = true } + # External dependencies (sorted) anyhow = "1.0" arboard = "3" diff --git a/bin/chat-cli/build.rs b/bin/chat-cli/build.rs index 766add3..88db6de 100644 --- a/bin/chat-cli/build.rs +++ b/bin/chat-cli/build.rs @@ -1,18 +1,15 @@ fn main() { - println!("cargo:rerun-if-env-changed=LOGOS_DELIVERY_LIB_DIR"); println!("cargo::rustc-check-cfg=cfg(logos_delivery)"); - let Ok(lib_dir) = std::env::var("LOGOS_DELIVERY_LIB_DIR") else { + let Some(lib_dir) = std::env::var("DEP_LOGOSDELIVERY_LIB_DIR").ok() else { return; }; println!("cargo:rustc-cfg=logos_delivery"); - println!("cargo:rustc-link-search=native={lib_dir}"); - println!("cargo:rustc-link-lib=dylib=logosdelivery"); let target_os = std::env::var("CARGO_CFG_TARGET_OS").unwrap_or_default(); match target_os.as_str() { "macos" | "linux" => println!("cargo:rustc-link-arg=-Wl,-rpath,{lib_dir}"), - other => panic!("unsupported OS for logos-delivery transport: {other}"), + _ => {} } } diff --git a/bin/chat-cli/src/main.rs b/bin/chat-cli/src/main.rs index 9a35b45..bd75484 100644 --- a/bin/chat-cli/src/main.rs +++ b/bin/chat-cli/src/main.rs @@ -13,13 +13,33 @@ use logos_chat::{ RegistrationService, StorageConfig, Transport, }; +use components::{EmbeddedP2pDeliveryService, P2pConfig}; + +#[derive(Debug)] +struct P2pTransport(EmbeddedP2pDeliveryService); + +impl logos_chat::DeliveryService for P2pTransport { + type Error = ::Error; + fn publish(&mut self, envelope: logos_chat::AddressedEnvelope) -> Result<(), Self::Error> { + self.0.publish(envelope) + } + fn subscribe(&mut self, addr: &str) -> Result<(), Self::Error> { + self.0.subscribe(addr) + } +} + +impl logos_chat::Transport for P2pTransport { + fn inbound(&mut self) -> crossbeam_channel::Receiver> { + self.0.inbound_queue() + } +} + use app::ChatApp; #[derive(Copy, Clone, Debug, ValueEnum)] #[value(rename_all = "kebab-case")] enum TransportKind { File, - #[cfg(logos_delivery)] LogosDelivery, } @@ -79,22 +99,21 @@ fn main() -> Result<()> { .context("failed to create file transport")?; run(transport, &cli) } - #[cfg(logos_delivery)] TransportKind::LogosDelivery => { - use transport::logos_delivery::{Config, Service}; - println!("Starting logos-delivery node (preset={})...", cli.preset); println!("This may take a few seconds while connecting to the network."); - let cfg = Config { + let cfg = P2pConfig { preset: cli.preset.clone(), tcp_port: cli.port, ..Default::default() }; - let transport = Service::start(cfg).context("failed to start logos-delivery")?; + let transport = P2pTransport( + EmbeddedP2pDeliveryService::start(cfg).context("failed to start logos-delivery")?, + ); println!("Node connected. Initializing chat client..."); - run(transport, &cli) + return run(transport, &cli); } } } @@ -160,21 +179,20 @@ where result } -#[cfg_attr(not(logos_delivery), allow(dead_code, unused_variables))] fn run_logos_delivery(cli: Cli) -> Result<()> { - #[cfg(logos_delivery)] { - use transport::logos_delivery::{Config, Service}; - eprintln!("Starting logos-delivery node (preset={})...", cli.preset); eprintln!("This may take a few seconds while connecting to the network."); - let logos_cfg = Config { + let logos_cfg = P2pConfig { preset: cli.preset.clone(), tcp_port: cli.port, ..Default::default() }; - let delivery = Service::start(logos_cfg).context("failed to start logos-delivery")?; + let delivery = P2pTransport( + EmbeddedP2pDeliveryService::start(logos_cfg) + .context("failed to start logos-delivery")?, + ); eprintln!("Node connected. Initializing chat client..."); diff --git a/bin/chat-cli/src/transport.rs b/bin/chat-cli/src/transport.rs index 80f9b78..2e172cd 100644 --- a/bin/chat-cli/src/transport.rs +++ b/bin/chat-cli/src/transport.rs @@ -1,3 +1 @@ pub mod file; -#[cfg(logos_delivery)] -pub mod logos_delivery; diff --git a/bin/chat-cli/src/transport/logos_delivery.rs b/bin/chat-cli/src/transport/logos_delivery.rs deleted file mode 100644 index e99a40b..0000000 --- a/bin/chat-cli/src/transport/logos_delivery.rs +++ /dev/null @@ -1,316 +0,0 @@ -//! logos-delivery backed [`client::DeliveryService`] implementation. -//! -//! `LogosDeliveryService` wraps an embedded logos-delivery node running on a -//! dedicated `std::thread`. All interaction is via synchronous `std::sync::mpsc` -//! channels. -//! -//! ## Content topic mapping -//! -//! `AddressedEnvelope::delivery_address` maps to logos-delivery content topic -//! `/logos-chat/1/{delivery_address}/proto`. - -pub(crate) mod sys; -pub(crate) mod wrapper; - -use std::sync::{Arc, Mutex, mpsc}; -use std::thread; -use std::time::Duration; - -use base64::Engine; -use base64::engine::general_purpose::STANDARD as BASE64; -use crossbeam_channel::{Receiver, Sender}; -use logos_chat::{AddressedEnvelope, DeliveryService, Transport}; -use tracing::{error, info, warn}; - -use wrapper::LogosNodeCtx; - -pub fn content_topic_for(delivery_address: &str) -> String { - format!("/logos-chat/1/{delivery_address}/proto") -} - -// ── Error ──────────────────────────────────────────────────────────────────── - -#[derive(Debug, thiserror::Error)] -pub enum DeliveryError { - #[error("node startup failed: {0}")] - StartupFailed(String), - #[error("publish failed: {0}")] - PublishFailed(String), - #[error("send channel closed")] - ChannelClosed, -} - -// ── Internals ──────────────────────────────────────────────────────────────── - -struct OutboundCmd { - message_json: String, - reply: mpsc::SyncSender>, -} - -type SubscriberList = Arc>>>>; - -// ── Config ─────────────────────────────────────────────────────────────────── - -#[derive(Debug, Clone)] -pub struct Config { - pub preset: String, - pub tcp_port: u16, - pub log_level: String, -} - -impl Default for Config { - fn default() -> Self { - Self { - preset: "logos.dev".into(), - tcp_port: 60000, - log_level: "ERROR".into(), - } - } -} - -// ── Wire types ────────────────────────────────────────────────────────────── - -/// Outbound message sent to the logos-delivery node. -#[derive(Debug, serde::Serialize, serde::Deserialize)] -struct WakuMessage { - #[serde(rename = "contentTopic")] - content_topic: String, - /// Base64-encoded payload. - payload: String, - ephemeral: bool, -} - -/// Top-level event envelope received from the logos-delivery node callback. -#[derive(Debug, serde::Deserialize)] -struct WakuEvent { - #[serde(rename = "eventType")] - event_type: String, - message: Option, -} - -/// Message payload from a `message_received` event. -#[derive(Debug, serde::Deserialize)] -struct ReceivedMessage { - #[serde(rename = "contentTopic")] - content_topic: String, - /// The node may deliver the payload as either a base64 string or a JSON - /// array of byte values. - payload: WakuPayload, -} - -/// Untagged union that handles both payload representations. -#[derive(Debug, serde::Deserialize)] -#[serde(untagged)] -enum WakuPayload { - Base64(String), - Bytes(Vec), -} - -impl WakuPayload { - fn decode(self) -> Option> { - match self { - WakuPayload::Base64(s) => BASE64.decode(s).ok(), - WakuPayload::Bytes(b) => Some(b), - } - } -} - -// ── Service ────────────────────────────────────────────────────────────────── - -/// logos-delivery backed delivery service. Cheap to clone — all clones share -/// the same background node. -#[derive(Clone, Debug)] -pub struct Service { - outbound: mpsc::SyncSender, - #[allow(dead_code)] - subscribers: SubscriberList, - inbound_rx: Option>>, -} - -impl Service { - /// Start the embedded logos-delivery node. The client drains inbound - /// payloads via [`Transport::inbound`]. - pub fn start(cfg: Config) -> Result { - let (out_tx, out_rx) = mpsc::sync_channel::(256); - let subscribers: SubscriberList = Arc::new(Mutex::new(Vec::new())); - let (ready_tx, ready_rx) = mpsc::channel::>(); - // Create the inbound channel before spawning so the receiver is - // registered inside the thread, before any event callback fires. - let (inbound_tx, inbound_rx) = crossbeam_channel::bounded::>(1024); - - let subs_for_thread = subscribers.clone(); - - let handle = thread::Builder::new() - .name("logos-node".into()) - .spawn(move || { - if let Err(panic) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { - Self::node_thread(cfg, out_rx, subs_for_thread, inbound_tx, ready_tx); - })) { - let msg = panic - .downcast_ref::<&str>() - .map(|s| s.to_string()) - .or_else(|| panic.downcast_ref::().cloned()) - .unwrap_or_else(|| "unknown panic".into()); - error!("logos-node thread panicked: {msg}"); - } - }) - .map_err(|e| DeliveryError::StartupFailed(e.to_string()))?; - - // On failure, the node thread drops LogosNodeCtx (stop+destroy against - // a half-initialized Nim node). Join it so the process doesn't begin - // teardown mid-destroy — that race SIGSEGVs inside the Nim async loop. - let ready = ready_rx.recv().unwrap_or_else(|_| { - Err(DeliveryError::StartupFailed( - "node thread exited before ready".into(), - )) - }); - if let Err(e) = ready { - let _ = handle.join(); - return Err(e); - } - - Ok(Self { - outbound: out_tx, - subscribers, - inbound_rx: Some(inbound_rx), - }) - } - - fn node_thread( - cfg: Config, - out_rx: mpsc::Receiver, - subscribers: SubscriberList, - inbound_tx: Sender>, - ready_tx: mpsc::Sender>, - ) { - // discv5UdpPort defaults to 9000 in libwaku, so a second instance with - // a distinct --port still collides on UDP. Bind it to tcp_port so a - // single --port knob keeps both ports distinct across instances. - let config_json = serde_json::json!({ - "logLevel": cfg.log_level, - "mode": "Core", - "preset": cfg.preset, - "tcpPort": cfg.tcp_port, - "discv5UdpPort": cfg.tcp_port, - }) - .to_string(); - - let mut node = match LogosNodeCtx::new(&config_json) { - Ok(n) => n, - Err(e) => { - let _ = ready_tx.send(Err(DeliveryError::StartupFailed(e))); - return; - } - }; - - // Register the inbound sender before installing the event callback so - // there is no window where the callback is live but the channel is not - // yet in the subscriber list. - subscribers.lock().unwrap().push(inbound_tx); - - let subs_for_cb = subscribers.clone(); - let event_closure = move |_ret: i32, data: &str| { - if let Some(payload) = Self::parse_message_received(data) { - let mut guard = match subs_for_cb.lock() { - Ok(g) => g, - Err(e) => { - error!("subscriber mutex poisoned: {e}"); - return; - } - }; - guard.retain(|tx| match tx.try_send(payload.clone()) { - Ok(()) => true, - Err(crossbeam_channel::TrySendError::Full(_)) => true, - Err(crossbeam_channel::TrySendError::Disconnected(_)) => false, - }); - } - }; - node.set_event_callback(event_closure); - - if let Err(e) = node.start() { - let _ = ready_tx.send(Err(DeliveryError::StartupFailed(e))); - return; - } - info!("logos-delivery node started (preset={})", cfg.preset); - - // FIXME: This unconditional sleep is a stand-in for proper - // peer-connectivity detection. The right approach is to listen for a - // `peer_connected` (or equivalent status-change) event from the node - // callback and only proceed once at least one peer is reachable, - // falling back to a configurable timeout. logos-delivery would need to - // surface such an event via its callback mechanism for this to work. - thread::sleep(Duration::from_secs(3)); - - let default_topic = content_topic_for("delivery_address"); - if let Err(e) = node.subscribe(&default_topic) { - warn!("subscribe to {default_topic}: {e}"); - } else { - info!("subscribed to {default_topic}"); - } - - let _ = ready_tx.send(Ok(())); - - while let Ok(cmd) = out_rx.recv() { - let result = node - .send(&cmd.message_json) - .map(|_| ()) - .map_err(DeliveryError::PublishFailed); - let _ = cmd.reply.try_send(result); - } - - info!("logos-node outbound loop finished"); - } - - fn parse_message_received(data: &str) -> Option> { - let event: WakuEvent = serde_json::from_str(data).ok()?; - - if event.event_type != "message_received" { - return None; - } - - let msg = event.message?; - - if !msg.content_topic.starts_with("/logos-chat/1/") { - return None; - } - - msg.payload.decode() - } -} - -impl DeliveryService for Service { - type Error = DeliveryError; - - fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), DeliveryError> { - let msg = WakuMessage { - content_topic: content_topic_for(&envelope.delivery_address), - payload: BASE64.encode(&envelope.data), - ephemeral: false, - }; - let message_json = - serde_json::to_string(&msg).map_err(|e| DeliveryError::PublishFailed(e.to_string()))?; - - let (reply_tx, reply_rx) = mpsc::sync_channel(1); - self.outbound - .send(OutboundCmd { - message_json, - reply: reply_tx, - }) - .map_err(|_| DeliveryError::ChannelClosed)?; - - reply_rx.recv().map_err(|_| DeliveryError::ChannelClosed)? - } - - fn subscribe(&mut self, _: &str) -> Result<(), ::Error> { - // This Service does not support filtering - Ok(()) - } -} - -impl Transport for Service { - fn inbound(&mut self) -> Receiver> { - self.inbound_rx - .take() - .expect("Service::inbound called more than once") - } -} diff --git a/bin/chat-cli/src/transport/logos_delivery/sys.rs b/bin/chat-cli/src/transport/logos_delivery/sys.rs deleted file mode 100644 index 2036116..0000000 --- a/bin/chat-cli/src/transport/logos_delivery/sys.rs +++ /dev/null @@ -1,102 +0,0 @@ -//! Raw FFI declarations matching liblogosdelivery.h (trampoline pattern). -//! -//! No `#[link]` attribute — build.rs handles linking to liblogosdelivery. -#![allow(unused)] - -use std::os::raw::{c_char, c_int, c_void}; -use std::slice; - -pub const RET_OK: i32 = 0; - -pub type FFICallBack = unsafe extern "C" fn(c_int, *const c_char, usize, *const c_void); - -unsafe extern "C" { - pub fn logosdelivery_create_node( - config_json: *const c_char, - cb: FFICallBack, - user_data: *const c_void, - ) -> *mut c_void; - - pub fn logosdelivery_start_node( - ctx: *mut c_void, - cb: FFICallBack, - user_data: *const c_void, - ) -> c_int; - - pub fn logosdelivery_stop_node( - ctx: *mut c_void, - cb: FFICallBack, - user_data: *const c_void, - ) -> c_int; - - pub fn logosdelivery_destroy( - ctx: *mut c_void, - cb: FFICallBack, - user_data: *const c_void, - ) -> c_int; - - pub fn logosdelivery_subscribe( - ctx: *mut c_void, - cb: FFICallBack, - user_data: *const c_void, - content_topic: *const c_char, - ) -> c_int; - - pub fn logosdelivery_unsubscribe( - ctx: *mut c_void, - cb: FFICallBack, - user_data: *const c_void, - content_topic: *const c_char, - ) -> c_int; - - /// `message_json`: `{"contentTopic": "...", "payload": "", "ephemeral": false}` - pub fn logosdelivery_send( - ctx: *mut c_void, - cb: FFICallBack, - user_data: *const c_void, - message_json: *const c_char, - ) -> c_int; - - pub fn logosdelivery_set_event_callback( - ctx: *mut c_void, - cb: FFICallBack, - user_data: *const c_void, - ); - - pub fn logosdelivery_get_node_info( - ctx: *mut c_void, - cb: FFICallBack, - user_data: *const c_void, - node_info_id: *const c_char, - ) -> c_int; -} - -// ── Trampoline ─────────────────────────────────────────────────────────────── - -pub unsafe extern "C" fn trampoline( - return_val: c_int, - buffer: *const c_char, - buffer_len: usize, - data: *const c_void, -) where - C: FnMut(i32, &str), -{ - if data.is_null() { - return; - } - let closure = unsafe { &mut *(data as *mut C) }; - if buffer.is_null() || buffer_len == 0 { - closure(return_val, ""); - return; - } - let bytes = unsafe { slice::from_raw_parts(buffer as *const u8, buffer_len) }; - let s = String::from_utf8_lossy(bytes); - closure(return_val, &s); -} - -pub fn get_trampoline(_: &C) -> FFICallBack -where - C: FnMut(i32, &str), -{ - trampoline:: -} diff --git a/bin/chat-cli/src/transport/logos_delivery/wrapper.rs b/bin/chat-cli/src/transport/logos_delivery/wrapper.rs deleted file mode 100644 index 04296a7..0000000 --- a/bin/chat-cli/src/transport/logos_delivery/wrapper.rs +++ /dev/null @@ -1,227 +0,0 @@ -//! Safe synchronous wrapper around the raw liblogosdelivery FFI. -//! -//! # Why Box::into_raw for one-shot callbacks? -//! -//! `sendRequestToFFIThread` (nim-ffi) signals the caller as soon as the FFI -//! thread *receives* the request, before it processes it. The actual result -//! callback fires later, from the Nim async event loop, after the Rust call -//! frame has returned and its stack variables are gone. Passing `&mut closure` -//! as `user_data` therefore produces a dangling pointer by the time the -//! callback fires — a use-after-free that manifests as a SIGSEGV when the -//! operation fails and the callback tries to write an error into captured -//! stack memory. -//! -//! Fix: heap-allocate each one-shot closure with `Box::into_raw`, synchronise -//! via an `mpsc` channel (blocking until the callback fires), then drop the -//! box. The pointer is valid for the entire async lifetime of the request. -//! -//! # Why store the event callback inside LogosNodeCtx? -//! -//! Rust drops locals in reverse declaration order. If the event-callback box -//! were held by the caller (outside the node), it would be freed before the -//! node's Drop runs stop+destroy. During stop/destroy the Nim async event -//! loop can still fire the event callback, which would access freed memory. -//! -//! By storing the box as `_event_cb` inside `LogosNodeCtx`, Rust's field-drop -//! order guarantees it is freed *after* Drop::drop returns (i.e. after -//! stop+destroy complete), so the pointer is always valid when Nim calls it. - -use std::ffi::CString; -use std::os::raw::c_void; -use std::sync::mpsc; - -use super::sys::{self as ffi, RET_OK, get_trampoline}; - -/// Opaque handle to a logos-delivery node context. -pub struct LogosNodeCtx { - ctx: *mut c_void, - /// Keeps the event-callback closure alive for the lifetime of the node. - _event_cb: Option>, -} - -// The logos-delivery ctx pointer is thread-safe (serialized calls inside C/Nim). -unsafe impl Send for LogosNodeCtx {} -unsafe impl Sync for LogosNodeCtx {} - -impl LogosNodeCtx { - pub fn new(config_json: &str) -> Result { - let config_cstr = CString::new(config_json).map_err(|e| e.to_string())?; - - let (tx, rx) = mpsc::sync_channel::>(1); - let closure = move |ret: i32, data: &str| { - let _ = tx.send(if ret == RET_OK { - Ok(()) - } else { - Err(data.to_string()) - }); - }; - let raw = Box::into_raw(Box::new(closure)); - let cb = get_trampoline(unsafe { &*raw }); - - let ctx = unsafe { - ffi::logosdelivery_create_node(config_cstr.as_ptr(), cb, raw as *const c_void) - }; - - // create_node may call the callback synchronously (try_recv) or - // asynchronously (recv). Handle both. - let callback_result: Result<(), String> = if ctx.is_null() { - rx.try_recv() - .unwrap_or(Err("logosdelivery_create_node returned null".into())) - } else { - rx.recv() - .unwrap_or(Err("callback channel disconnected".into())) - }; - drop(unsafe { Box::from_raw(raw) }); - - callback_result.map(|_| Self { - ctx, - _event_cb: None, - }) - } - - pub fn start(&self) -> Result<(), String> { - let (tx, rx) = mpsc::sync_channel::>(1); - let closure = move |ret: i32, data: &str| { - let _ = tx.send(if ret == RET_OK { - Ok(()) - } else { - Err(data.to_string()) - }); - }; - let raw = Box::into_raw(Box::new(closure)); - let cb = get_trampoline(unsafe { &*raw }); - - let ret = unsafe { ffi::logosdelivery_start_node(self.ctx, cb, raw as *const c_void) }; - - if ret != RET_OK { - drop(unsafe { Box::from_raw(raw) }); - return Err(format!("logosdelivery_start_node returned {ret}")); - } - let result = rx - .recv() - .unwrap_or(Err("callback channel disconnected".into())); - drop(unsafe { Box::from_raw(raw) }); - result - } - - pub fn subscribe(&self, content_topic: &str) -> Result<(), String> { - let topic_cstr = CString::new(content_topic).map_err(|e| e.to_string())?; - - let (tx, rx) = mpsc::sync_channel::>(1); - let closure = move |ret: i32, data: &str| { - let _ = tx.send(if ret == RET_OK { - Ok(()) - } else { - Err(data.to_string()) - }); - }; - let raw = Box::into_raw(Box::new(closure)); - let cb = get_trampoline(unsafe { &*raw }); - - let ret = unsafe { - ffi::logosdelivery_subscribe(self.ctx, cb, raw as *const c_void, topic_cstr.as_ptr()) - }; - - if ret != RET_OK { - drop(unsafe { Box::from_raw(raw) }); - return Err(format!("logosdelivery_subscribe returned {ret}")); - } - let result = rx - .recv() - .unwrap_or(Err("callback channel disconnected".into())); - drop(unsafe { Box::from_raw(raw) }); - result - } - - /// Returns the request ID on success. - pub fn send(&self, message_json: &str) -> Result { - let msg_cstr = CString::new(message_json).map_err(|e| e.to_string())?; - - let (tx, rx) = mpsc::sync_channel::>(1); - let closure = move |ret: i32, data: &str| { - let _ = tx.send(if ret == RET_OK { - Ok(data.to_string()) - } else { - Err(data.to_string()) - }); - }; - let raw = Box::into_raw(Box::new(closure)); - let cb = get_trampoline(unsafe { &*raw }); - - let ret = unsafe { - ffi::logosdelivery_send(self.ctx, cb, raw as *const c_void, msg_cstr.as_ptr()) - }; - - if ret != RET_OK { - drop(unsafe { Box::from_raw(raw) }); - return Err(format!("logosdelivery_send returned {ret}")); - } - let result = rx - .recv() - .unwrap_or(Err("callback channel disconnected".into())); - drop(unsafe { Box::from_raw(raw) }); - result - } - - /// Stores the event callback inside the node so it is dropped *after* - /// stop+destroy in Drop, keeping the pointer valid for the node's lifetime. - pub fn set_event_callback(&mut self, closure: C) - where - C: FnMut(i32, &str) + Send + 'static, - { - let mut boxed = Box::new(closure); - let cb = get_trampoline(&*boxed); - let user_data = &mut *boxed as *mut C as *const c_void; - unsafe { - ffi::logosdelivery_set_event_callback(self.ctx, cb, user_data); - } - // Move the box into self; the heap address (user_data) is unaffected. - self._event_cb = Some(boxed); - } - - pub fn stop(&self) -> Result<(), String> { - let (tx, rx) = mpsc::sync_channel::>(1); - let closure = move |ret: i32, data: &str| { - let _ = tx.send(if ret == RET_OK { - Ok(()) - } else { - Err(data.to_string()) - }); - }; - let raw = Box::into_raw(Box::new(closure)); - let cb = get_trampoline(unsafe { &*raw }); - - let ret = unsafe { ffi::logosdelivery_stop_node(self.ctx, cb, raw as *const c_void) }; - - if ret != RET_OK { - drop(unsafe { Box::from_raw(raw) }); - return Err(format!("logosdelivery_stop_node returned {ret}")); - } - let result = rx - .recv() - .unwrap_or(Err("callback channel disconnected".into())); - drop(unsafe { Box::from_raw(raw) }); - result - } -} - -impl Drop for LogosNodeCtx { - fn drop(&mut self) { - // stop+destroy must complete before _event_cb is freed. - // Rust drops fields after Drop::drop returns, so _event_cb outlives - // everything below — the event callback pointer stays valid throughout. - if let Err(e) = self.stop() { - tracing::warn!("logosdelivery_stop_node failed during drop: {e}"); - } - - let (tx, rx) = mpsc::sync_channel::<()>(1); - let closure = move |_: i32, _: &str| { - let _ = tx.send(()); - }; - let raw = Box::into_raw(Box::new(closure)); - let cb = get_trampoline(unsafe { &*raw }); - unsafe { ffi::logosdelivery_destroy(self.ctx, cb, raw as *const c_void) }; - let _ = rx.recv(); - drop(unsafe { Box::from_raw(raw) }); - } -} diff --git a/extensions/components/Cargo.toml b/extensions/components/Cargo.toml index 4de8dca..1346cae 100644 --- a/extensions/components/Cargo.toml +++ b/extensions/components/Cargo.toml @@ -2,6 +2,7 @@ name = "components" version = "0.1.0" edition = "2024" +links = "logosdelivery" [features] embedded_p2p_delivery = [] diff --git a/extensions/components/build.rs b/extensions/components/build.rs index 3b51fa8..71a8a6d 100644 --- a/extensions/components/build.rs +++ b/extensions/components/build.rs @@ -3,22 +3,24 @@ fn main() { println!("cargo::rustc-check-cfg=cfg(logos_delivery)"); let feature_enabled = std::env::var("CARGO_FEATURE_EMBEDDED_P2P_DELIVERY").is_ok(); - let lib_dir = std::env::var("LOGOS_DELIVERY_LIB_DIR"); + if !feature_enabled { + return; + } - let lib_dir = match lib_dir { - Ok(dir) => dir, - Err(_) if !feature_enabled => return, - Err(_) => { - // Feature is on but no library path — enable compilation, skip linking. - println!("cargo:rustc-cfg=logos_delivery"); - return; - } + let lib_dir = std::env::var("LOGOS_DELIVERY_LIB_DIR") + .ok() + .or_else(nix_build_logos_delivery); + + let Some(lib_dir) = lib_dir else { + // Feature is on but no library path — enable compilation, skip linking. + println!("cargo:rustc-cfg=logos_delivery"); + return; }; println!("cargo:rustc-cfg=logos_delivery"); - println!("cargo:rustc-link-search=native={lib_dir}"); println!("cargo:rustc-link-lib=dylib=logosdelivery"); + println!("cargo:LIB_DIR={lib_dir}"); let target_os = std::env::var("CARGO_CFG_TARGET_OS").unwrap_or_default(); match target_os.as_str() { @@ -26,3 +28,43 @@ fn main() { other => panic!("unsupported OS for logos-delivery transport: {other}"), } } + +fn nix_build_logos_delivery() -> Option { + let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").ok()?; + let flake_root = find_flake_root(&manifest_dir)?; + + println!("cargo:rerun-if-changed={flake_root}/flake.lock"); + + let output = std::process::Command::new("nix") + .args(["build", ".#logos-delivery", "--no-link", "--print-out-paths"]) + .current_dir(&flake_root) + .output() + .ok()?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + println!("cargo:warning=nix build .#logos-delivery failed: {stderr}"); + return None; + } + + let store_path = String::from_utf8(output.stdout).ok()?; + let lib_dir = format!("{}/lib", store_path.trim()); + + if std::path::Path::new(&lib_dir).exists() { + Some(lib_dir) + } else { + None + } +} + +fn find_flake_root(start: &str) -> Option { + let mut path = std::path::PathBuf::from(start); + loop { + if path.join("flake.nix").exists() { + return Some(path.to_string_lossy().into_owned()); + } + if !path.pop() { + return None; + } + } +} diff --git a/extensions/components/src/lib.rs b/extensions/components/src/lib.rs index 2ef540e..06d7509 100644 --- a/extensions/components/src/lib.rs +++ b/extensions/components/src/lib.rs @@ -8,5 +8,5 @@ pub use contact_registry::http::{HttpRegistry, HttpRegistryError}; pub use storage::*; pub use wakeup::*; -#[cfg(logos_delivery)] +#[cfg(feature = "embedded_p2p_delivery")] pub use delivery::{EmbeddedP2pDeliveryService, P2pConfig}; From bbf6a12b3f81cce788b22b30a810859aa06a0d51 Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Tue, 23 Jun 2026 23:01:10 -0700 Subject: [PATCH 05/13] Remove requirement for build.rs in chat-cli --- bin/chat-cli/build.rs | 15 ------ bin/chat-cli/src/main.rs | 67 -------------------------- extensions/components/build.rs | 82 +++++++++++++++++++++++++++----- extensions/components/src/lib.rs | 2 +- 4 files changed, 70 insertions(+), 96 deletions(-) delete mode 100644 bin/chat-cli/build.rs diff --git a/bin/chat-cli/build.rs b/bin/chat-cli/build.rs deleted file mode 100644 index 88db6de..0000000 --- a/bin/chat-cli/build.rs +++ /dev/null @@ -1,15 +0,0 @@ -fn main() { - println!("cargo::rustc-check-cfg=cfg(logos_delivery)"); - - let Some(lib_dir) = std::env::var("DEP_LOGOSDELIVERY_LIB_DIR").ok() else { - return; - }; - - println!("cargo:rustc-cfg=logos_delivery"); - - let target_os = std::env::var("CARGO_CFG_TARGET_OS").unwrap_or_default(); - match target_os.as_str() { - "macos" | "linux" => println!("cargo:rustc-link-arg=-Wl,-rpath,{lib_dir}"), - _ => {} - } -} diff --git a/bin/chat-cli/src/main.rs b/bin/chat-cli/src/main.rs index bd75484..99b9be3 100644 --- a/bin/chat-cli/src/main.rs +++ b/bin/chat-cli/src/main.rs @@ -179,73 +179,6 @@ where result } -fn run_logos_delivery(cli: Cli) -> Result<()> { - { - eprintln!("Starting logos-delivery node (preset={})...", cli.preset); - eprintln!("This may take a few seconds while connecting to the network."); - - let logos_cfg = P2pConfig { - preset: cli.preset.clone(), - tcp_port: cli.port, - ..Default::default() - }; - let delivery = P2pTransport( - EmbeddedP2pDeliveryService::start(logos_cfg) - .context("failed to start logos-delivery")?, - ); - - eprintln!("Node connected. Initializing chat client..."); - - let data_dir = cli - .db - .as_ref() - .and_then(|p| p.parent()) - .map(|p| p.to_path_buf()) - .unwrap_or_else(|| cli.data.clone()); - - let (client, events) = match cli.db { - Some(ref path) => { - let db_str = path - .to_str() - .context("db path contains non-UTF-8 characters")? - .to_string(); - - logos_chat::ChatClientBuilder::new() - .storage_config(logos_chat::StorageConfig::Encrypted { - path: db_str, - key: "chat-cli".to_string(), - }) - .transport(delivery) - .build() - .map_err(|e| anyhow::anyhow!("{e:?}")) - .context("failed to open persistent client")? - } - None => logos_chat::ChatClientBuilder::new() - .transport(delivery) - .build() - .map_err(|e| anyhow::anyhow!("{e:?}")) - .context("failed to open chat client")?, - }; - - let mut app = ChatApp::new(client, events, &cli.name, &data_dir)?; - - if cli.smoketest { - return Ok(()); - } - - let mut terminal = ui::init().context("failed to initialize terminal")?; - let result = run_app(&mut terminal, &mut app); - ui::restore().context("failed to restore terminal")?; - return result; - } - - #[cfg(not(logos_delivery))] - anyhow::bail!( - "logos-delivery transport is not available in this build.\n\ - Build with LOGOS_DELIVERY_LIB_DIR set to enable it." - ) -} - fn run_app(terminal: &mut ui::Tui, app: &mut ChatApp) -> Result<()> where I: IdentityProvider + Send, diff --git a/extensions/components/build.rs b/extensions/components/build.rs index 71a8a6d..08b8b82 100644 --- a/extensions/components/build.rs +++ b/extensions/components/build.rs @@ -1,32 +1,88 @@ +use std::fs; +use std::process::Command; + fn main() { println!("cargo:rerun-if-env-changed=LOGOS_DELIVERY_LIB_DIR"); println!("cargo::rustc-check-cfg=cfg(logos_delivery)"); - let feature_enabled = std::env::var("CARGO_FEATURE_EMBEDDED_P2P_DELIVERY").is_ok(); - if !feature_enabled { + if std::env::var_os("CARGO_FEATURE_EMBEDDED_P2P_DELIVERY").is_none() { return; } - let lib_dir = std::env::var("LOGOS_DELIVERY_LIB_DIR") + // Locate the native library: explicit override first, then build via nix. + let Some(lib_dir) = std::env::var("LOGOS_DELIVERY_LIB_DIR") .ok() - .or_else(nix_build_logos_delivery); - - let Some(lib_dir) = lib_dir else { - // Feature is on but no library path — enable compilation, skip linking. - println!("cargo:rustc-cfg=logos_delivery"); + .or_else(nix_build_logos_delivery) + else { + // Feature is on but the native library is unavailable (e.g. `cargo + // check` on a machine without nix). Skip the cfg so the FFI module is + // not compiled — this keeps `cargo check` working without producing + // unresolved symbols at link time. `EmbeddedP2pDeliveryService` is + // simply absent until the library can be found. return; }; println!("cargo:rustc-cfg=logos_delivery"); - println!("cargo:rustc-link-search=native={lib_dir}"); - println!("cargo:rustc-link-lib=dylib=logosdelivery"); - println!("cargo:LIB_DIR={lib_dir}"); + let out_dir = std::env::var("OUT_DIR").expect("OUT_DIR not set"); let target_os = std::env::var("CARGO_CFG_TARGET_OS").unwrap_or_default(); + + // The shipped library carries a relocatable install name (@rpath on macOS, + // $ORIGIN soname on Linux), which would force every downstream BINARY to + // inject its own RPATH. Cargo propagates `rustc-link-search` and + // `rustc-link-lib` across crates, but NOT `rustc-link-arg` (the rpath) — so + // that relocatable name is exactly what makes consumers need their own + // build.rs. Instead, stamp a private copy with an ABSOLUTE install name; + // the propagating search + lib directives are then sufficient and consumers + // need zero build-script glue. match target_os.as_str() { - "macos" | "linux" => println!("cargo:rustc-link-arg=-Wl,-rpath,{lib_dir}"), + "macos" => stamp_absolute_macos(&lib_dir, &out_dir), + "linux" => stamp_absolute_linux(&lib_dir, &out_dir), other => panic!("unsupported OS for logos-delivery transport: {other}"), } + + println!("cargo:rustc-link-search=native={out_dir}"); + println!("cargo:rustc-link-lib=dylib=logosdelivery"); +} + +/// Copy `liblogosdelivery.dylib` into `OUT_DIR` and rewrite its install name to +/// the absolute store path. The consumer records that absolute path, so dyld +/// loads the original file directly — whose own `@loader_path` RPATH resolves +/// `librln.dylib` beside it — with no RPATH needed on the consumer. +fn stamp_absolute_macos(lib_dir: &str, out_dir: &str) { + let src = format!("{lib_dir}/liblogosdelivery.dylib"); + let dst = format!("{out_dir}/liblogosdelivery.dylib"); + copy_writable(&src, &dst); + run("install_name_tool", &["-id", &src, &dst]); + println!("cargo:rerun-if-changed={src}"); +} + +/// Linux equivalent: an absolute `DT_SONAME` is recorded verbatim in the +/// consumer's `DT_NEEDED`, so `ld.so` loads it by path with no RPATH. Requires +/// `patchelf` at build time (provided by the nix devshell). +fn stamp_absolute_linux(lib_dir: &str, out_dir: &str) { + let src = format!("{lib_dir}/liblogosdelivery.so"); + let dst = format!("{out_dir}/liblogosdelivery.so"); + copy_writable(&src, &dst); + run("patchelf", &["--set-soname", &src, &dst]); + println!("cargo:rerun-if-changed={src}"); +} + +fn copy_writable(src: &str, dst: &str) { + fs::copy(src, dst).unwrap_or_else(|e| panic!("copy {src} -> {dst}: {e}")); + // Store-sourced files are read-only; make the copy writable so its install + // name / soname can be rewritten. + let mut perms = fs::metadata(dst).unwrap().permissions(); + perms.set_readonly(false); + fs::set_permissions(dst, perms).unwrap(); +} + +fn run(cmd: &str, args: &[&str]) { + let status = Command::new(cmd) + .args(args) + .status() + .unwrap_or_else(|e| panic!("failed to run `{cmd}`: {e}")); + assert!(status.success(), "`{cmd} {args:?}` failed with {status}"); } fn nix_build_logos_delivery() -> Option { @@ -35,7 +91,7 @@ fn nix_build_logos_delivery() -> Option { println!("cargo:rerun-if-changed={flake_root}/flake.lock"); - let output = std::process::Command::new("nix") + let output = Command::new("nix") .args(["build", ".#logos-delivery", "--no-link", "--print-out-paths"]) .current_dir(&flake_root) .output() diff --git a/extensions/components/src/lib.rs b/extensions/components/src/lib.rs index 06d7509..2ef540e 100644 --- a/extensions/components/src/lib.rs +++ b/extensions/components/src/lib.rs @@ -8,5 +8,5 @@ pub use contact_registry::http::{HttpRegistry, HttpRegistryError}; pub use storage::*; pub use wakeup::*; -#[cfg(feature = "embedded_p2p_delivery")] +#[cfg(logos_delivery)] pub use delivery::{EmbeddedP2pDeliveryService, P2pConfig}; From cba31be9e0c75f00fdac9b03362c23b085a83212 Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Tue, 23 Jun 2026 23:06:48 -0700 Subject: [PATCH 06/13] fix imports --- extensions/components/src/lib.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/extensions/components/src/lib.rs b/extensions/components/src/lib.rs index 2ef540e..92ad4a4 100644 --- a/extensions/components/src/lib.rs +++ b/extensions/components/src/lib.rs @@ -5,8 +5,6 @@ mod wakeup; pub use contact_registry::ephemeral::EphemeralRegistry; pub use contact_registry::http::{HttpRegistry, HttpRegistryError}; +pub use delivery::*; pub use storage::*; pub use wakeup::*; - -#[cfg(logos_delivery)] -pub use delivery::{EmbeddedP2pDeliveryService, P2pConfig}; From 40172c0cbfa0d9a29aff93513eba731532d13b4c Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Tue, 23 Jun 2026 23:20:48 -0700 Subject: [PATCH 07/13] update linux flake --- flake.nix | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flake.nix b/flake.nix index bffe3d2..cd3d706 100644 --- a/flake.nix +++ b/flake.nix @@ -50,7 +50,11 @@ pkgs.cmake pkgs.perl pkgs.protobuf - ]; + ] + # components/build.rs rewrites the dylib soname via patchelf on + # Linux so consumers link without their own build.rs. macOS uses + # install_name_tool, which ships with the toolchain. + ++ pkgs.lib.optionals pkgs.stdenv.isLinux [ pkgs.patchelf ]; buildInputs = [ logosDeliveryLib ]; shellHook = '' export LOGOS_DELIVERY_LIB_DIR="${logosDeliveryLib}/lib" From 641717e8fbc269bb5ac5a624129293b8459312e1 Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Tue, 23 Jun 2026 23:27:27 -0700 Subject: [PATCH 08/13] Linter fixes --- bin/chat-cli/src/main.rs | 2 +- extensions/components/build.rs | 15 ++++++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/bin/chat-cli/src/main.rs b/bin/chat-cli/src/main.rs index 99b9be3..61a2f27 100644 --- a/bin/chat-cli/src/main.rs +++ b/bin/chat-cli/src/main.rs @@ -113,7 +113,7 @@ fn main() -> Result<()> { ); println!("Node connected. Initializing chat client..."); - return run(transport, &cli); + run(transport, &cli) } } } diff --git a/extensions/components/build.rs b/extensions/components/build.rs index 08b8b82..5c026a7 100644 --- a/extensions/components/build.rs +++ b/extensions/components/build.rs @@ -69,12 +69,12 @@ fn stamp_absolute_linux(lib_dir: &str, out_dir: &str) { } fn copy_writable(src: &str, dst: &str) { + use std::os::unix::fs::PermissionsExt; + fs::copy(src, dst).unwrap_or_else(|e| panic!("copy {src} -> {dst}: {e}")); - // Store-sourced files are read-only; make the copy writable so its install + // Store-sourced files are read-only; restore owner write so the install // name / soname can be rewritten. - let mut perms = fs::metadata(dst).unwrap().permissions(); - perms.set_readonly(false); - fs::set_permissions(dst, perms).unwrap(); + fs::set_permissions(dst, fs::Permissions::from_mode(0o644)).unwrap(); } fn run(cmd: &str, args: &[&str]) { @@ -92,7 +92,12 @@ fn nix_build_logos_delivery() -> Option { println!("cargo:rerun-if-changed={flake_root}/flake.lock"); let output = Command::new("nix") - .args(["build", ".#logos-delivery", "--no-link", "--print-out-paths"]) + .args([ + "build", + ".#logos-delivery", + "--no-link", + "--print-out-paths", + ]) .current_dir(&flake_root) .output() .ok()?; From 48259c5daf7e812020e24af07b0af10491080499 Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Tue, 23 Jun 2026 23:54:26 -0700 Subject: [PATCH 09/13] fix build in linux --- extensions/components/build.rs | 69 +++++++++++++++++++++++++--------- 1 file changed, 52 insertions(+), 17 deletions(-) diff --git a/extensions/components/build.rs b/extensions/components/build.rs index 5c026a7..7ab87a7 100644 --- a/extensions/components/build.rs +++ b/extensions/components/build.rs @@ -1,4 +1,5 @@ use std::fs; +use std::path::{Path, PathBuf}; use std::process::Command; fn main() { @@ -9,11 +10,7 @@ fn main() { return; } - // Locate the native library: explicit override first, then build via nix. - let Some(lib_dir) = std::env::var("LOGOS_DELIVERY_LIB_DIR") - .ok() - .or_else(nix_build_logos_delivery) - else { + let Some(lib_dir) = locate_lib_dir() else { // Feature is on but the native library is unavailable (e.g. `cargo // check` on a machine without nix). Skip the cfg so the FFI module is // not compiled — this keeps `cargo check` working without producing @@ -45,33 +42,71 @@ fn main() { println!("cargo:rustc-link-lib=dylib=logosdelivery"); } +/// Locate the native library directory as an ABSOLUTE, canonical path. Prefers +/// `LOGOS_DELIVERY_LIB_DIR`, then falls back to building it via nix. Returns +/// `None` when neither is available (e.g. `cargo check` without nix). +fn locate_lib_dir() -> Option { + if let Ok(dir) = std::env::var("LOGOS_DELIVERY_LIB_DIR") { + if let Some(resolved) = resolve_lib_dir(&dir) { + return Some(resolved); + } + println!( + "cargo:warning=LOGOS_DELIVERY_LIB_DIR='{dir}' could not be resolved; \ + falling back to `nix build`" + ); + } + resolve_lib_dir(&nix_build_logos_delivery()?) +} + +/// Resolve a lib dir to an absolute, canonical path. Cargo runs build scripts +/// with the cwd set to the crate dir, but a relative value (e.g. CI's +/// `./result/lib`) is anchored at the flake/workspace root where `nix build` +/// drops `result`. Canonicalizing also follows the `result` symlink to the +/// immutable store path, so the stamped install name / soname stays stable. +fn resolve_lib_dir(dir: &str) -> Option { + let path = Path::new(dir); + let anchored = if path.is_absolute() { + path.to_path_buf() + } else { + let manifest = std::env::var("CARGO_MANIFEST_DIR").ok()?; + Path::new(&find_flake_root(&manifest)?).join(path) + }; + anchored.canonicalize().ok() +} + /// Copy `liblogosdelivery.dylib` into `OUT_DIR` and rewrite its install name to /// the absolute store path. The consumer records that absolute path, so dyld /// loads the original file directly — whose own `@loader_path` RPATH resolves /// `librln.dylib` beside it — with no RPATH needed on the consumer. -fn stamp_absolute_macos(lib_dir: &str, out_dir: &str) { - let src = format!("{lib_dir}/liblogosdelivery.dylib"); +fn stamp_absolute_macos(lib_dir: &Path, out_dir: &str) { + let src = lib_dir.join("liblogosdelivery.dylib"); let dst = format!("{out_dir}/liblogosdelivery.dylib"); - copy_writable(&src, &dst); - run("install_name_tool", &["-id", &src, &dst]); - println!("cargo:rerun-if-changed={src}"); + copy_writable(&src, Path::new(&dst)); + run("install_name_tool", &["-id", path_str(&src), &dst]); + println!("cargo:rerun-if-changed={}", src.display()); } /// Linux equivalent: an absolute `DT_SONAME` is recorded verbatim in the /// consumer's `DT_NEEDED`, so `ld.so` loads it by path with no RPATH. Requires /// `patchelf` at build time (provided by the nix devshell). -fn stamp_absolute_linux(lib_dir: &str, out_dir: &str) { - let src = format!("{lib_dir}/liblogosdelivery.so"); +fn stamp_absolute_linux(lib_dir: &Path, out_dir: &str) { + let src = lib_dir.join("liblogosdelivery.so"); let dst = format!("{out_dir}/liblogosdelivery.so"); - copy_writable(&src, &dst); - run("patchelf", &["--set-soname", &src, &dst]); - println!("cargo:rerun-if-changed={src}"); + copy_writable(&src, Path::new(&dst)); + run("patchelf", &["--set-soname", path_str(&src), &dst]); + println!("cargo:rerun-if-changed={}", src.display()); } -fn copy_writable(src: &str, dst: &str) { +fn path_str(p: &Path) -> &str { + p.to_str() + .unwrap_or_else(|| panic!("non-UTF-8 path: {}", p.display())) +} + +fn copy_writable(src: &Path, dst: &Path) { use std::os::unix::fs::PermissionsExt; - fs::copy(src, dst).unwrap_or_else(|e| panic!("copy {src} -> {dst}: {e}")); + fs::copy(src, dst) + .unwrap_or_else(|e| panic!("copy {} -> {}: {e}", src.display(), dst.display())); // Store-sourced files are read-only; restore owner write so the install // name / soname can be rewritten. fs::set_permissions(dst, fs::Permissions::from_mode(0o644)).unwrap(); From 8784a43c397d07f93860e5d9d9fadfa8c2d72508 Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Tue, 23 Jun 2026 23:57:50 -0700 Subject: [PATCH 10/13] Update docs --- .github/workflows/ci.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f1c1adf..ec0cbd9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,9 +20,10 @@ jobs: - run: rustup update stable && rustup default stable # hashgraph-like-consensus's build.rs shells out to protoc via prost-build. - run: sudo apt-get update && sudo apt-get install -y protobuf-compiler - # chat-cli's build.rs unconditionally links liblogosdelivery and requires - # LOGOS_DELIVERY_LIB_DIR. The smoketest job builds and exercises it under - # Nix; here we keep the toolchain-only job fast by skipping it. + # chat-cli pulls in components' embedded_p2p_delivery feature, whose + # build.rs links liblogosdelivery (built via Nix or LOGOS_DELIVERY_LIB_DIR). + # The smoketest job builds and exercises it under Nix; here we keep the + # toolchain-only job fast by skipping it. - run: cargo build --verbose --workspace --exclude chat-cli - run: cargo test --verbose --workspace --exclude chat-cli From 356a357c4e275d5b70dee02cfa274dcff377e81f Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Wed, 24 Jun 2026 00:02:31 -0700 Subject: [PATCH 11/13] Add CI rust cache --- .github/workflows/ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ec0cbd9..522e27d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,6 +18,7 @@ jobs: steps: - uses: actions/checkout@v4 - run: rustup update stable && rustup default stable + - uses: Swatinem/rust-cache@v2 # hashgraph-like-consensus's build.rs shells out to protoc via prost-build. - run: sudo apt-get update && sudo apt-get install -y protobuf-compiler # chat-cli pulls in components' embedded_p2p_delivery feature, whose @@ -34,6 +35,7 @@ jobs: - uses: actions/checkout@v4 - run: rustup update stable && rustup default stable - run: rustup component add clippy + - uses: Swatinem/rust-cache@v2 # hashgraph-like-consensus's build.rs shells out to protoc via prost-build. - run: sudo apt-get update && sudo apt-get install -y protobuf-compiler - run: cargo clippy --all-targets --all-features --workspace --exclude chat-cli -- -D warnings From cc5ae779a5c9ca5572e3e3d6420f033ea08ef505 Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Wed, 24 Jun 2026 00:11:19 -0700 Subject: [PATCH 12/13] Remove nix override --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 522e27d..092e786 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -75,7 +75,7 @@ jobs: # nim-zlib instead of reusing a cached output. No retry on purpose: a # single build must now succeed deterministically. Re-run the job a few # times for more samples. - run: nix build .#logos-delivery --override-input nixpkgs github:kaichaosun/nixpkgs/fix-gitfetch --print-build-logs + run: nix build .#logos-delivery --print-build-logs # Build and run chat-cli through the dev shell so it links against the # same Nix glibc as the prebuilt liblogosdelivery.so. A plain `cargo # build` uses the runner's system glibc, which is older than Nix's and From 58496ec1cd1249f383f14b85aa55265639ab97fe Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Wed, 24 Jun 2026 06:31:01 -0700 Subject: [PATCH 13/13] remove chat-cli ci envvar --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 092e786..2e44472 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -81,7 +81,7 @@ jobs: # build` uses the runner's system glibc, which is older than Nix's and # mismatches it at runtime (libc.so.6: version `GLIBC_ABI_DT_X86_64_PLT' # not found, required by Nix glibc's libm.so.6). - - name: Build chat-cli (logos-delivery) - run: nix develop -c bash -c 'LOGOS_DELIVERY_LIB_DIR=./result/lib cargo build --release -p chat-cli' + - name: Build chat-cli + run: nix develop -c bash -c 'cargo build -p chat-cli' - name: Run chat-cli smoketest run: nix develop -c ./target/release/chat-cli --name ci-test --smoketest