mirror of
https://github.com/logos-messaging/libchat.git
synced 2026-06-28 03:59:27 +00:00
Move logos_delivery to components
This commit is contained in:
parent
a5abefa314
commit
06e29b1dbf
@ -3,6 +3,9 @@ name = "components"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[features]
|
||||
logos-delivery = []
|
||||
|
||||
[dependencies]
|
||||
# Workspace dependencies (sorted)
|
||||
crypto = { workspace = true }
|
||||
@ -15,5 +18,6 @@ crossbeam-channel = { workspace = true }
|
||||
hex = "0.4.3"
|
||||
reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
thiserror = "2"
|
||||
tracing = "0.1"
|
||||
|
||||
28
extensions/components/build.rs
Normal file
28
extensions/components/build.rs
Normal file
@ -0,0 +1,28 @@
|
||||
fn main() {
|
||||
println!("cargo:rerun-if-env-changed=LOGOS_DELIVERY_LIB_DIR");
|
||||
println!("cargo::rustc-check-cfg=cfg(logos_delivery)");
|
||||
|
||||
let feature_enabled = std::env::var("CARGO_FEATURE_LOGOS_DELIVERY").is_ok();
|
||||
let lib_dir = std::env::var("LOGOS_DELIVERY_LIB_DIR");
|
||||
|
||||
let lib_dir = match lib_dir {
|
||||
Ok(dir) => dir,
|
||||
Err(_) if !feature_enabled => return,
|
||||
Err(_) => {
|
||||
// Feature is on but no library path — enable compilation, skip linking.
|
||||
println!("cargo:rustc-cfg=logos_delivery");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
println!("cargo:rustc-cfg=logos_delivery");
|
||||
|
||||
println!("cargo:rustc-link-search=native={lib_dir}");
|
||||
println!("cargo:rustc-link-lib=dylib=logosdelivery");
|
||||
|
||||
let target_os = std::env::var("CARGO_CFG_TARGET_OS").unwrap_or_default();
|
||||
match target_os.as_str() {
|
||||
"macos" | "linux" => println!("cargo:rustc-link-arg=-Wl,-rpath,{lib_dir}"),
|
||||
other => panic!("unsupported OS for logos-delivery transport: {other}"),
|
||||
}
|
||||
}
|
||||
@ -1,3 +1,9 @@
|
||||
mod local_broadcaster;
|
||||
|
||||
pub use local_broadcaster::LocalBroadcaster;
|
||||
|
||||
#[cfg(logos_delivery)]
|
||||
mod embedded_p2p_delivery;
|
||||
|
||||
#[cfg(logos_delivery)]
|
||||
pub use embedded_p2p_delivery::EmbeddedP2pDeliveryService;
|
||||
|
||||
314
extensions/components/src/delivery/embedded_p2p_delivery.rs
Normal file
314
extensions/components/src/delivery/embedded_p2p_delivery.rs
Normal file
@ -0,0 +1,314 @@
|
||||
//! logos-delivery backed [`client::DeliveryService`] implementation.
|
||||
//!
|
||||
//! `LogosDeliveryService` wraps an embedded logos-delivery node running on a
|
||||
//! dedicated `std::thread`. All interaction is via synchronous `std::sync::mpsc`
|
||||
//! channels.
|
||||
//!
|
||||
//! ## Content topic mapping
|
||||
//!
|
||||
//! `AddressedEnvelope::delivery_address` maps to logos-delivery content topic
|
||||
//! `/logos-chat/1/{delivery_address}/proto`.
|
||||
|
||||
pub(crate) mod sys;
|
||||
pub(crate) mod wrapper;
|
||||
|
||||
use std::sync::{Arc, Mutex, mpsc};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use base64::Engine;
|
||||
use base64::engine::general_purpose::STANDARD as BASE64;
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
use libchat::{AddressedEnvelope, DeliveryService};
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use wrapper::LogosNodeCtx;
|
||||
|
||||
pub fn content_topic_for(delivery_address: &str) -> String {
|
||||
format!("/logos-chat/1/{delivery_address}/proto")
|
||||
}
|
||||
|
||||
// ── Error ────────────────────────────────────────────────────────────────────
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum DeliveryError {
|
||||
#[error("node startup failed: {0}")]
|
||||
StartupFailed(String),
|
||||
#[error("publish failed: {0}")]
|
||||
PublishFailed(String),
|
||||
#[error("send channel closed")]
|
||||
ChannelClosed,
|
||||
}
|
||||
|
||||
// ── Internals ────────────────────────────────────────────────────────────────
|
||||
|
||||
struct OutboundCmd {
|
||||
message_json: String,
|
||||
reply: mpsc::SyncSender<Result<(), DeliveryError>>,
|
||||
}
|
||||
|
||||
type SubscriberList = Arc<Mutex<Vec<Sender<Vec<u8>>>>>;
|
||||
|
||||
// ── Config ───────────────────────────────────────────────────────────────────
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Config {
|
||||
pub preset: String,
|
||||
pub tcp_port: u16,
|
||||
pub log_level: String,
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
preset: "logos.dev".into(),
|
||||
tcp_port: 60000,
|
||||
log_level: "ERROR".into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Wire types ──────────────────────────────────────────────────────────────
|
||||
|
||||
/// Outbound message sent to the logos-delivery node.
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
||||
struct WakuMessage {
|
||||
#[serde(rename = "contentTopic")]
|
||||
content_topic: String,
|
||||
/// Base64-encoded payload.
|
||||
payload: String,
|
||||
ephemeral: bool,
|
||||
}
|
||||
|
||||
/// Top-level event envelope received from the logos-delivery node callback.
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
struct WakuEvent {
|
||||
#[serde(rename = "eventType")]
|
||||
event_type: String,
|
||||
message: Option<ReceivedMessage>,
|
||||
}
|
||||
|
||||
/// Message payload from a `message_received` event.
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
struct ReceivedMessage {
|
||||
#[serde(rename = "contentTopic")]
|
||||
content_topic: String,
|
||||
/// The node may deliver the payload as either a base64 string or a JSON
|
||||
/// array of byte values.
|
||||
payload: WakuPayload,
|
||||
}
|
||||
|
||||
/// Untagged union that handles both payload representations.
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
#[serde(untagged)]
|
||||
enum WakuPayload {
|
||||
Base64(String),
|
||||
Bytes(Vec<u8>),
|
||||
}
|
||||
|
||||
impl WakuPayload {
|
||||
fn decode(self) -> Option<Vec<u8>> {
|
||||
match self {
|
||||
WakuPayload::Base64(s) => BASE64.decode(s).ok(),
|
||||
WakuPayload::Bytes(b) => Some(b),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── EmbeddedP2pDeliveryService ──────────────────────────────────────────────────
|
||||
|
||||
/// logos-delivery backed delivery service. Cheap to clone — all clones share
|
||||
/// the same background node.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct EmbeddedP2pDeliveryService {
|
||||
outbound: mpsc::SyncSender<OutboundCmd>,
|
||||
#[allow(dead_code)]
|
||||
subscribers: SubscriberList,
|
||||
inbound_rx: Option<Receiver<Vec<u8>>>,
|
||||
}
|
||||
|
||||
impl EmbeddedP2pDeliveryService {
|
||||
/// Start the embedded logos-delivery node. The client drains inbound
|
||||
/// payloads via [`Transport::inbound`].
|
||||
pub fn start(cfg: Config) -> Result<Self, DeliveryError> {
|
||||
let (out_tx, out_rx) = mpsc::sync_channel::<OutboundCmd>(256);
|
||||
let subscribers: SubscriberList = Arc::new(Mutex::new(Vec::new()));
|
||||
let (ready_tx, ready_rx) = mpsc::channel::<Result<(), DeliveryError>>();
|
||||
// Create the inbound channel before spawning so the receiver is
|
||||
// registered inside the thread, before any event callback fires.
|
||||
let (inbound_tx, inbound_rx) = crossbeam_channel::bounded::<Vec<u8>>(1024);
|
||||
|
||||
let subs_for_thread = subscribers.clone();
|
||||
|
||||
let handle = thread::Builder::new()
|
||||
.name("logos-node".into())
|
||||
.spawn(move || {
|
||||
if let Err(panic) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
|
||||
Self::node_thread(cfg, out_rx, subs_for_thread, inbound_tx, ready_tx);
|
||||
})) {
|
||||
let msg = panic
|
||||
.downcast_ref::<&str>()
|
||||
.map(|s| s.to_string())
|
||||
.or_else(|| panic.downcast_ref::<String>().cloned())
|
||||
.unwrap_or_else(|| "unknown panic".into());
|
||||
error!("logos-node thread panicked: {msg}");
|
||||
}
|
||||
})
|
||||
.map_err(|e| DeliveryError::StartupFailed(e.to_string()))?;
|
||||
|
||||
// On failure, the node thread drops LogosNodeCtx (stop+destroy against
|
||||
// a half-initialized Nim node). Join it so the process doesn't begin
|
||||
// teardown mid-destroy — that race SIGSEGVs inside the Nim async loop.
|
||||
let ready = ready_rx.recv().unwrap_or_else(|_| {
|
||||
Err(DeliveryError::StartupFailed(
|
||||
"node thread exited before ready".into(),
|
||||
))
|
||||
});
|
||||
if let Err(e) = ready {
|
||||
let _ = handle.join();
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
outbound: out_tx,
|
||||
subscribers,
|
||||
inbound_rx: Some(inbound_rx),
|
||||
})
|
||||
}
|
||||
|
||||
fn node_thread(
|
||||
cfg: Config,
|
||||
out_rx: mpsc::Receiver<OutboundCmd>,
|
||||
subscribers: SubscriberList,
|
||||
inbound_tx: Sender<Vec<u8>>,
|
||||
ready_tx: mpsc::Sender<Result<(), DeliveryError>>,
|
||||
) {
|
||||
// discv5UdpPort defaults to 9000 in libwaku, so a second instance with
|
||||
// a distinct --port still collides on UDP. Bind it to tcp_port so a
|
||||
// single --port knob keeps both ports distinct across instances.
|
||||
let config_json = serde_json::json!({
|
||||
"logLevel": cfg.log_level,
|
||||
"mode": "Core",
|
||||
"preset": cfg.preset,
|
||||
"tcpPort": cfg.tcp_port,
|
||||
"discv5UdpPort": cfg.tcp_port,
|
||||
})
|
||||
.to_string();
|
||||
|
||||
let mut node = match LogosNodeCtx::new(&config_json) {
|
||||
Ok(n) => n,
|
||||
Err(e) => {
|
||||
let _ = ready_tx.send(Err(DeliveryError::StartupFailed(e)));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Register the inbound sender before installing the event callback so
|
||||
// there is no window where the callback is live but the channel is not
|
||||
// yet in the subscriber list.
|
||||
subscribers.lock().unwrap().push(inbound_tx);
|
||||
|
||||
let subs_for_cb = subscribers.clone();
|
||||
let event_closure = move |_ret: i32, data: &str| {
|
||||
if let Some(payload) = Self::parse_message_received(data) {
|
||||
let mut guard = match subs_for_cb.lock() {
|
||||
Ok(g) => g,
|
||||
Err(e) => {
|
||||
error!("subscriber mutex poisoned: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
guard.retain(|tx| match tx.try_send(payload.clone()) {
|
||||
Ok(()) => true,
|
||||
Err(crossbeam_channel::TrySendError::Full(_)) => true,
|
||||
Err(crossbeam_channel::TrySendError::Disconnected(_)) => false,
|
||||
});
|
||||
}
|
||||
};
|
||||
node.set_event_callback(event_closure);
|
||||
|
||||
if let Err(e) = node.start() {
|
||||
let _ = ready_tx.send(Err(DeliveryError::StartupFailed(e)));
|
||||
return;
|
||||
}
|
||||
info!("logos-delivery node started (preset={})", cfg.preset);
|
||||
|
||||
// FIXME: This unconditional sleep is a stand-in for proper
|
||||
// peer-connectivity detection. The right approach is to listen for a
|
||||
// `peer_connected` (or equivalent status-change) event from the node
|
||||
// callback and only proceed once at least one peer is reachable,
|
||||
// falling back to a configurable timeout. logos-delivery would need to
|
||||
// surface such an event via its callback mechanism for this to work.
|
||||
thread::sleep(Duration::from_secs(3));
|
||||
|
||||
let default_topic = content_topic_for("delivery_address");
|
||||
if let Err(e) = node.subscribe(&default_topic) {
|
||||
warn!("subscribe to {default_topic}: {e}");
|
||||
} else {
|
||||
info!("subscribed to {default_topic}");
|
||||
}
|
||||
|
||||
let _ = ready_tx.send(Ok(()));
|
||||
|
||||
while let Ok(cmd) = out_rx.recv() {
|
||||
let result = node
|
||||
.send(&cmd.message_json)
|
||||
.map(|_| ())
|
||||
.map_err(DeliveryError::PublishFailed);
|
||||
let _ = cmd.reply.try_send(result);
|
||||
}
|
||||
|
||||
info!("logos-node outbound loop finished");
|
||||
}
|
||||
|
||||
fn parse_message_received(data: &str) -> Option<Vec<u8>> {
|
||||
let event: WakuEvent = serde_json::from_str(data).ok()?;
|
||||
|
||||
if event.event_type != "message_received" {
|
||||
return None;
|
||||
}
|
||||
|
||||
let msg = event.message?;
|
||||
|
||||
if !msg.content_topic.starts_with("/logos-chat/1/") {
|
||||
return None;
|
||||
}
|
||||
|
||||
msg.payload.decode()
|
||||
}
|
||||
|
||||
pub fn inbound_queue(&mut self) -> Receiver<Vec<u8>> {
|
||||
self.inbound_rx
|
||||
.take()
|
||||
.expect("inbound_queue called more than once")
|
||||
}
|
||||
}
|
||||
|
||||
impl DeliveryService for EmbeddedP2pDeliveryService {
|
||||
type Error = DeliveryError;
|
||||
|
||||
fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), DeliveryError> {
|
||||
let msg = WakuMessage {
|
||||
content_topic: content_topic_for(&envelope.delivery_address),
|
||||
payload: BASE64.encode(&envelope.data),
|
||||
ephemeral: false,
|
||||
};
|
||||
let message_json =
|
||||
serde_json::to_string(&msg).map_err(|e| DeliveryError::PublishFailed(e.to_string()))?;
|
||||
|
||||
let (reply_tx, reply_rx) = mpsc::sync_channel(1);
|
||||
self.outbound
|
||||
.send(OutboundCmd {
|
||||
message_json,
|
||||
reply: reply_tx,
|
||||
})
|
||||
.map_err(|_| DeliveryError::ChannelClosed)?;
|
||||
|
||||
reply_rx.recv().map_err(|_| DeliveryError::ChannelClosed)?
|
||||
}
|
||||
|
||||
fn subscribe(&mut self, _: &str) -> Result<(), <Self as DeliveryService>::Error> {
|
||||
// This Service does not support filtering
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
102
extensions/components/src/delivery/embedded_p2p_delivery/sys.rs
Normal file
102
extensions/components/src/delivery/embedded_p2p_delivery/sys.rs
Normal file
@ -0,0 +1,102 @@
|
||||
//! Raw FFI declarations matching liblogosdelivery.h (trampoline pattern).
|
||||
//!
|
||||
//! No `#[link]` attribute — build.rs handles linking to liblogosdelivery.
|
||||
#![allow(unused)]
|
||||
|
||||
use std::os::raw::{c_char, c_int, c_void};
|
||||
use std::slice;
|
||||
|
||||
pub const RET_OK: i32 = 0;
|
||||
|
||||
pub type FFICallBack = unsafe extern "C" fn(c_int, *const c_char, usize, *const c_void);
|
||||
|
||||
unsafe extern "C" {
|
||||
pub fn logosdelivery_create_node(
|
||||
config_json: *const c_char,
|
||||
cb: FFICallBack,
|
||||
user_data: *const c_void,
|
||||
) -> *mut c_void;
|
||||
|
||||
pub fn logosdelivery_start_node(
|
||||
ctx: *mut c_void,
|
||||
cb: FFICallBack,
|
||||
user_data: *const c_void,
|
||||
) -> c_int;
|
||||
|
||||
pub fn logosdelivery_stop_node(
|
||||
ctx: *mut c_void,
|
||||
cb: FFICallBack,
|
||||
user_data: *const c_void,
|
||||
) -> c_int;
|
||||
|
||||
pub fn logosdelivery_destroy(
|
||||
ctx: *mut c_void,
|
||||
cb: FFICallBack,
|
||||
user_data: *const c_void,
|
||||
) -> c_int;
|
||||
|
||||
pub fn logosdelivery_subscribe(
|
||||
ctx: *mut c_void,
|
||||
cb: FFICallBack,
|
||||
user_data: *const c_void,
|
||||
content_topic: *const c_char,
|
||||
) -> c_int;
|
||||
|
||||
pub fn logosdelivery_unsubscribe(
|
||||
ctx: *mut c_void,
|
||||
cb: FFICallBack,
|
||||
user_data: *const c_void,
|
||||
content_topic: *const c_char,
|
||||
) -> c_int;
|
||||
|
||||
/// `message_json`: `{"contentTopic": "...", "payload": "<base64>", "ephemeral": false}`
|
||||
pub fn logosdelivery_send(
|
||||
ctx: *mut c_void,
|
||||
cb: FFICallBack,
|
||||
user_data: *const c_void,
|
||||
message_json: *const c_char,
|
||||
) -> c_int;
|
||||
|
||||
pub fn logosdelivery_set_event_callback(
|
||||
ctx: *mut c_void,
|
||||
cb: FFICallBack,
|
||||
user_data: *const c_void,
|
||||
);
|
||||
|
||||
pub fn logosdelivery_get_node_info(
|
||||
ctx: *mut c_void,
|
||||
cb: FFICallBack,
|
||||
user_data: *const c_void,
|
||||
node_info_id: *const c_char,
|
||||
) -> c_int;
|
||||
}
|
||||
|
||||
// ── Trampoline ───────────────────────────────────────────────────────────────
|
||||
|
||||
pub unsafe extern "C" fn trampoline<C>(
|
||||
return_val: c_int,
|
||||
buffer: *const c_char,
|
||||
buffer_len: usize,
|
||||
data: *const c_void,
|
||||
) where
|
||||
C: FnMut(i32, &str),
|
||||
{
|
||||
if data.is_null() {
|
||||
return;
|
||||
}
|
||||
let closure = unsafe { &mut *(data as *mut C) };
|
||||
if buffer.is_null() || buffer_len == 0 {
|
||||
closure(return_val, "");
|
||||
return;
|
||||
}
|
||||
let bytes = unsafe { slice::from_raw_parts(buffer as *const u8, buffer_len) };
|
||||
let s = String::from_utf8_lossy(bytes);
|
||||
closure(return_val, &s);
|
||||
}
|
||||
|
||||
pub fn get_trampoline<C>(_: &C) -> FFICallBack
|
||||
where
|
||||
C: FnMut(i32, &str),
|
||||
{
|
||||
trampoline::<C>
|
||||
}
|
||||
@ -0,0 +1,227 @@
|
||||
//! Safe synchronous wrapper around the raw liblogosdelivery FFI.
|
||||
//!
|
||||
//! # Why Box::into_raw for one-shot callbacks?
|
||||
//!
|
||||
//! `sendRequestToFFIThread` (nim-ffi) signals the caller as soon as the FFI
|
||||
//! thread *receives* the request, before it processes it. The actual result
|
||||
//! callback fires later, from the Nim async event loop, after the Rust call
|
||||
//! frame has returned and its stack variables are gone. Passing `&mut closure`
|
||||
//! as `user_data` therefore produces a dangling pointer by the time the
|
||||
//! callback fires — a use-after-free that manifests as a SIGSEGV when the
|
||||
//! operation fails and the callback tries to write an error into captured
|
||||
//! stack memory.
|
||||
//!
|
||||
//! Fix: heap-allocate each one-shot closure with `Box::into_raw`, synchronise
|
||||
//! via an `mpsc` channel (blocking until the callback fires), then drop the
|
||||
//! box. The pointer is valid for the entire async lifetime of the request.
|
||||
//!
|
||||
//! # Why store the event callback inside LogosNodeCtx?
|
||||
//!
|
||||
//! Rust drops locals in reverse declaration order. If the event-callback box
|
||||
//! were held by the caller (outside the node), it would be freed before the
|
||||
//! node's Drop runs stop+destroy. During stop/destroy the Nim async event
|
||||
//! loop can still fire the event callback, which would access freed memory.
|
||||
//!
|
||||
//! By storing the box as `_event_cb` inside `LogosNodeCtx`, Rust's field-drop
|
||||
//! order guarantees it is freed *after* Drop::drop returns (i.e. after
|
||||
//! stop+destroy complete), so the pointer is always valid when Nim calls it.
|
||||
|
||||
use std::ffi::CString;
|
||||
use std::os::raw::c_void;
|
||||
use std::sync::mpsc;
|
||||
|
||||
use super::sys::{self as ffi, RET_OK, get_trampoline};
|
||||
|
||||
/// Opaque handle to a logos-delivery node context.
|
||||
pub struct LogosNodeCtx {
|
||||
ctx: *mut c_void,
|
||||
/// Keeps the event-callback closure alive for the lifetime of the node.
|
||||
_event_cb: Option<Box<dyn std::any::Any + Send>>,
|
||||
}
|
||||
|
||||
// The logos-delivery ctx pointer is thread-safe (serialized calls inside C/Nim).
|
||||
unsafe impl Send for LogosNodeCtx {}
|
||||
unsafe impl Sync for LogosNodeCtx {}
|
||||
|
||||
impl LogosNodeCtx {
|
||||
pub fn new(config_json: &str) -> Result<Self, String> {
|
||||
let config_cstr = CString::new(config_json).map_err(|e| e.to_string())?;
|
||||
|
||||
let (tx, rx) = mpsc::sync_channel::<Result<(), String>>(1);
|
||||
let closure = move |ret: i32, data: &str| {
|
||||
let _ = tx.send(if ret == RET_OK {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(data.to_string())
|
||||
});
|
||||
};
|
||||
let raw = Box::into_raw(Box::new(closure));
|
||||
let cb = get_trampoline(unsafe { &*raw });
|
||||
|
||||
let ctx = unsafe {
|
||||
ffi::logosdelivery_create_node(config_cstr.as_ptr(), cb, raw as *const c_void)
|
||||
};
|
||||
|
||||
// create_node may call the callback synchronously (try_recv) or
|
||||
// asynchronously (recv). Handle both.
|
||||
let callback_result: Result<(), String> = if ctx.is_null() {
|
||||
rx.try_recv()
|
||||
.unwrap_or(Err("logosdelivery_create_node returned null".into()))
|
||||
} else {
|
||||
rx.recv()
|
||||
.unwrap_or(Err("callback channel disconnected".into()))
|
||||
};
|
||||
drop(unsafe { Box::from_raw(raw) });
|
||||
|
||||
callback_result.map(|_| Self {
|
||||
ctx,
|
||||
_event_cb: None,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn start(&self) -> Result<(), String> {
|
||||
let (tx, rx) = mpsc::sync_channel::<Result<(), String>>(1);
|
||||
let closure = move |ret: i32, data: &str| {
|
||||
let _ = tx.send(if ret == RET_OK {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(data.to_string())
|
||||
});
|
||||
};
|
||||
let raw = Box::into_raw(Box::new(closure));
|
||||
let cb = get_trampoline(unsafe { &*raw });
|
||||
|
||||
let ret = unsafe { ffi::logosdelivery_start_node(self.ctx, cb, raw as *const c_void) };
|
||||
|
||||
if ret != RET_OK {
|
||||
drop(unsafe { Box::from_raw(raw) });
|
||||
return Err(format!("logosdelivery_start_node returned {ret}"));
|
||||
}
|
||||
let result = rx
|
||||
.recv()
|
||||
.unwrap_or(Err("callback channel disconnected".into()));
|
||||
drop(unsafe { Box::from_raw(raw) });
|
||||
result
|
||||
}
|
||||
|
||||
pub fn subscribe(&self, content_topic: &str) -> Result<(), String> {
|
||||
let topic_cstr = CString::new(content_topic).map_err(|e| e.to_string())?;
|
||||
|
||||
let (tx, rx) = mpsc::sync_channel::<Result<(), String>>(1);
|
||||
let closure = move |ret: i32, data: &str| {
|
||||
let _ = tx.send(if ret == RET_OK {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(data.to_string())
|
||||
});
|
||||
};
|
||||
let raw = Box::into_raw(Box::new(closure));
|
||||
let cb = get_trampoline(unsafe { &*raw });
|
||||
|
||||
let ret = unsafe {
|
||||
ffi::logosdelivery_subscribe(self.ctx, cb, raw as *const c_void, topic_cstr.as_ptr())
|
||||
};
|
||||
|
||||
if ret != RET_OK {
|
||||
drop(unsafe { Box::from_raw(raw) });
|
||||
return Err(format!("logosdelivery_subscribe returned {ret}"));
|
||||
}
|
||||
let result = rx
|
||||
.recv()
|
||||
.unwrap_or(Err("callback channel disconnected".into()));
|
||||
drop(unsafe { Box::from_raw(raw) });
|
||||
result
|
||||
}
|
||||
|
||||
/// Returns the request ID on success.
|
||||
pub fn send(&self, message_json: &str) -> Result<String, String> {
|
||||
let msg_cstr = CString::new(message_json).map_err(|e| e.to_string())?;
|
||||
|
||||
let (tx, rx) = mpsc::sync_channel::<Result<String, String>>(1);
|
||||
let closure = move |ret: i32, data: &str| {
|
||||
let _ = tx.send(if ret == RET_OK {
|
||||
Ok(data.to_string())
|
||||
} else {
|
||||
Err(data.to_string())
|
||||
});
|
||||
};
|
||||
let raw = Box::into_raw(Box::new(closure));
|
||||
let cb = get_trampoline(unsafe { &*raw });
|
||||
|
||||
let ret = unsafe {
|
||||
ffi::logosdelivery_send(self.ctx, cb, raw as *const c_void, msg_cstr.as_ptr())
|
||||
};
|
||||
|
||||
if ret != RET_OK {
|
||||
drop(unsafe { Box::from_raw(raw) });
|
||||
return Err(format!("logosdelivery_send returned {ret}"));
|
||||
}
|
||||
let result = rx
|
||||
.recv()
|
||||
.unwrap_or(Err("callback channel disconnected".into()));
|
||||
drop(unsafe { Box::from_raw(raw) });
|
||||
result
|
||||
}
|
||||
|
||||
/// Stores the event callback inside the node so it is dropped *after*
|
||||
/// stop+destroy in Drop, keeping the pointer valid for the node's lifetime.
|
||||
pub fn set_event_callback<C>(&mut self, closure: C)
|
||||
where
|
||||
C: FnMut(i32, &str) + Send + 'static,
|
||||
{
|
||||
let mut boxed = Box::new(closure);
|
||||
let cb = get_trampoline(&*boxed);
|
||||
let user_data = &mut *boxed as *mut C as *const c_void;
|
||||
unsafe {
|
||||
ffi::logosdelivery_set_event_callback(self.ctx, cb, user_data);
|
||||
}
|
||||
// Move the box into self; the heap address (user_data) is unaffected.
|
||||
self._event_cb = Some(boxed);
|
||||
}
|
||||
|
||||
pub fn stop(&self) -> Result<(), String> {
|
||||
let (tx, rx) = mpsc::sync_channel::<Result<(), String>>(1);
|
||||
let closure = move |ret: i32, data: &str| {
|
||||
let _ = tx.send(if ret == RET_OK {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(data.to_string())
|
||||
});
|
||||
};
|
||||
let raw = Box::into_raw(Box::new(closure));
|
||||
let cb = get_trampoline(unsafe { &*raw });
|
||||
|
||||
let ret = unsafe { ffi::logosdelivery_stop_node(self.ctx, cb, raw as *const c_void) };
|
||||
|
||||
if ret != RET_OK {
|
||||
drop(unsafe { Box::from_raw(raw) });
|
||||
return Err(format!("logosdelivery_stop_node returned {ret}"));
|
||||
}
|
||||
let result = rx
|
||||
.recv()
|
||||
.unwrap_or(Err("callback channel disconnected".into()));
|
||||
drop(unsafe { Box::from_raw(raw) });
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for LogosNodeCtx {
|
||||
fn drop(&mut self) {
|
||||
// stop+destroy must complete before _event_cb is freed.
|
||||
// Rust drops fields after Drop::drop returns, so _event_cb outlives
|
||||
// everything below — the event callback pointer stays valid throughout.
|
||||
if let Err(e) = self.stop() {
|
||||
tracing::warn!("logosdelivery_stop_node failed during drop: {e}");
|
||||
}
|
||||
|
||||
let (tx, rx) = mpsc::sync_channel::<()>(1);
|
||||
let closure = move |_: i32, _: &str| {
|
||||
let _ = tx.send(());
|
||||
};
|
||||
let raw = Box::into_raw(Box::new(closure));
|
||||
let cb = get_trampoline(unsafe { &*raw });
|
||||
unsafe { ffi::logosdelivery_destroy(self.ctx, cb, raw as *const c_void) };
|
||||
let _ = rx.recv();
|
||||
drop(unsafe { Box::from_raw(raw) });
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user