diff --git a/bin/chat-cli/src/app.rs b/bin/chat-cli/src/app.rs index de9e47b..23b39a9 100644 --- a/bin/chat-cli/src/app.rs +++ b/bin/chat-cli/src/app.rs @@ -5,7 +5,7 @@ use std::sync::mpsc; use anyhow::Result; use arboard::Clipboard; -use logos_chat::{ChatClient, ConversationIdOwned, DeliveryService}; +use logos_chat::{ChatClient, ConversationIdOwned, DeliveryService, TopicHandler}; use serde::{Deserialize, Serialize}; use crate::utils::now; @@ -43,7 +43,9 @@ pub struct AppState { pub struct ChatApp { pub client: ChatClient, - inbound: mpsc::Receiver>, + inbound: mpsc::Receiver<(String, Vec)>, + /// Handlers consulted before the chat fall-through; first match wins. + topic_handlers: Vec>, pub state: AppState, /// Ephemeral command output — not persisted, cleared on chat switch. command_output: Vec, @@ -56,7 +58,7 @@ pub struct ChatApp { impl ChatApp { pub fn new( client: ChatClient, - inbound: mpsc::Receiver>, + inbound: mpsc::Receiver<(String, Vec)>, user_name: &str, data_dir: &Path, ) -> Result { @@ -77,6 +79,7 @@ impl ChatApp { Ok(Self { client, inbound, + topic_handlers: Vec::new(), state, command_output: Vec::new(), input: String::new(), @@ -86,6 +89,14 @@ impl ChatApp { }) } + /// Register a handler consulted before the chat fall-through. + /// Handlers are tried in registration order; the first whose `matches` + /// returns `true` claims the payload. + #[allow(dead_code)] + pub fn register_topic_handler(&mut self, handler: Box) { + self.topic_handlers.push(handler); + } + fn load_state(path: &Path) -> AppState { if path.exists() && let Ok(contents) = fs::read_to_string(path) @@ -142,7 +153,16 @@ impl ChatApp { } pub fn process_incoming(&mut self) -> Result<()> { - while let Ok(payload) = self.inbound.try_recv() { + while let Ok((topic, payload)) = self.inbound.try_recv() { + // Offer to non-chat handlers first; chat is the catch-all. + if let Some(handler) = self + .topic_handlers + .iter_mut() + .find(|h| h.matches(&topic)) + { + handler.handle(&topic, &payload); + continue; + } match self.client.receive(&payload) { Ok(Some(content)) => { let chat_id = &content.conversation_id; diff --git a/bin/chat-cli/src/main.rs b/bin/chat-cli/src/main.rs index 366b100..15c9209 100644 --- a/bin/chat-cli/src/main.rs +++ b/bin/chat-cli/src/main.rs @@ -93,7 +93,7 @@ fn main() -> Result<()> { fn run( transport: D, - inbound: mpsc::Receiver>, + inbound: mpsc::Receiver<(String, Vec)>, cli: &Cli, ) -> Result<()> { let db_path = cli diff --git a/bin/chat-cli/src/transport/file.rs b/bin/chat-cli/src/transport/file.rs index 097af47..5f35cc5 100644 --- a/bin/chat-cli/src/transport/file.rs +++ b/bin/chat-cli/src/transport/file.rs @@ -24,9 +24,10 @@ impl FileTransport { /// /// Messages are written to `{transport_dir}/{delivery_address}/{hours_since_epoch}.bin` /// as length-prefixed frames (`[u32 BE length][payload bytes]`). The background - /// thread reads all files under `transport_dir` and forwards every frame to - /// the returned channel; `client.receive()` discards frames it cannot decrypt. - pub fn new(transport_dir: &Path) -> io::Result<(Self, mpsc::Receiver>)> { + /// thread reads all files under `transport_dir` and forwards every frame — + /// tagged with its `delivery_address` subdir — to the returned channel; + /// `client.receive()` discards frames it cannot decrypt. + pub fn new(transport_dir: &Path) -> io::Result<(Self, mpsc::Receiver<(String, Vec)>)> { fs::create_dir_all(transport_dir)?; let (tx, rx) = mpsc::sync_channel(1024); @@ -77,14 +78,14 @@ fn current_hour() -> u64 { / 3600 } -fn poll_reader(transport_dir: PathBuf, tx: mpsc::SyncSender>) { +fn poll_reader(transport_dir: PathBuf, tx: mpsc::SyncSender<(String, Vec)>) { // Maps absolute file path → number of bytes already consumed. let mut offsets: BTreeMap = BTreeMap::new(); loop { let bin_files = collect_bin_files(&transport_dir); - for path in bin_files { + for (addr, path) in bin_files { let offset = offsets.entry(path.clone()).or_insert(0); let file = match File::open(&path) { @@ -106,7 +107,7 @@ fn poll_reader(transport_dir: PathBuf, tx: mpsc::SyncSender>) { if reader.read_exact(&mut payload).is_err() { break; // partial frame — wait for writer to finish } - let _ = tx.try_send(payload); + let _ = tx.try_send((addr.clone(), payload)); *offset += (4 + len) as u64; } } @@ -115,9 +116,10 @@ fn poll_reader(transport_dir: PathBuf, tx: mpsc::SyncSender>) { } } -/// Walk `transport_dir/*/` and collect all `*.bin` files, sorted by path +/// Walk `transport_dir/*/` and collect all `*.bin` files paired with their +/// `delivery_address` (the subdirectory name), sorted by path /// (address subdir first, then filename = hour order). -fn collect_bin_files(transport_dir: &Path) -> Vec { +fn collect_bin_files(transport_dir: &Path) -> Vec<(String, PathBuf)> { let mut files = Vec::new(); let Ok(addr_entries) = fs::read_dir(transport_dir) else { return files; @@ -127,13 +129,17 @@ fn collect_bin_files(transport_dir: &Path) -> Vec { if !addr_path.is_dir() { continue; } + let Some(addr) = addr_path.file_name().and_then(|s| s.to_str()) else { + continue; + }; + let addr = addr.to_string(); let Ok(file_entries) = fs::read_dir(&addr_path) else { continue; }; for file_entry in file_entries.flatten() { let p = file_entry.path(); if p.extension().is_some_and(|e| e == "bin") { - files.push(p); + files.push((addr.clone(), p)); } } } diff --git a/bin/chat-cli/src/transport/logos_delivery.rs b/bin/chat-cli/src/transport/logos_delivery.rs index 1b6dce4..1cd7b2b 100644 --- a/bin/chat-cli/src/transport/logos_delivery.rs +++ b/bin/chat-cli/src/transport/logos_delivery.rs @@ -7,7 +7,8 @@ //! ## Content topic mapping //! //! `AddressedEnvelope::delivery_address` maps to logos-delivery content topic -//! `/logos-chat/1/{delivery_address}/proto`. +//! `/logos-chat/1/{delivery_address}/proto`. Inbound payloads are tagged with +//! the recovered `delivery_address` so callers can dispatch by topic. pub(crate) mod sys; pub(crate) mod wrapper; @@ -27,6 +28,16 @@ pub fn content_topic_for(delivery_address: &str) -> String { format!("/logos-chat/1/{delivery_address}/proto") } +const CONTENT_TOPIC_PREFIX: &str = "/logos-chat/1/"; +const CONTENT_TOPIC_SUFFIX: &str = "/proto"; + +fn delivery_address_from_topic(content_topic: &str) -> Option { + content_topic + .strip_prefix(CONTENT_TOPIC_PREFIX)? + .strip_suffix(CONTENT_TOPIC_SUFFIX) + .map(str::to_owned) +} + // ── Error ──────────────────────────────────────────────────────────────────── #[derive(Debug, thiserror::Error)] @@ -35,18 +46,29 @@ pub enum DeliveryError { StartupFailed(String), #[error("publish failed: {0}")] PublishFailed(String), + #[error("subscribe failed: {0}")] + SubscribeFailed(String), #[error("send channel closed")] ChannelClosed, } // ── Internals ──────────────────────────────────────────────────────────────── -struct OutboundCmd { - message_json: String, - reply: mpsc::SyncSender>, +enum NodeCmd { + Publish { + message_json: String, + reply: mpsc::SyncSender>, + }, + Subscribe { + content_topic: String, + reply: mpsc::SyncSender>, + }, } -type SubscriberList = Arc>>>>; +/// Inbound payloads carry the recovered delivery_address alongside bytes so +/// callers can route by topic before decoding. +type InboundTx = mpsc::SyncSender<(String, Vec)>; +type SubscriberList = Arc>>; // ── Config ─────────────────────────────────────────────────────────────────── @@ -120,21 +142,21 @@ impl WakuPayload { /// the same background node. #[derive(Clone, Debug)] pub struct Service { - outbound: mpsc::SyncSender, + outbound: mpsc::SyncSender, #[allow(dead_code)] subscribers: SubscriberList, } impl Service { /// Start the embedded logos-delivery node. Returns the service and a - /// receiver for inbound raw payloads. - pub fn start(cfg: Config) -> Result<(Self, mpsc::Receiver>), DeliveryError> { - let (out_tx, out_rx) = mpsc::sync_channel::(256); + /// receiver for inbound `(delivery_address, payload)` pairs. + pub fn start(cfg: Config) -> Result<(Self, mpsc::Receiver<(String, Vec)>), DeliveryError> { + let (out_tx, out_rx) = mpsc::sync_channel::(256); let subscribers: SubscriberList = Arc::new(Mutex::new(Vec::new())); let (ready_tx, ready_rx) = mpsc::channel::>(); // Create the inbound channel before spawning so the receiver is // registered inside the thread, before any event callback fires. - let (inbound_tx, inbound_rx) = mpsc::sync_channel::>(1024); + let (inbound_tx, inbound_rx) = mpsc::sync_channel::<(String, Vec)>(1024); let subs_for_thread = subscribers.clone(); @@ -178,9 +200,9 @@ impl Service { fn node_thread( cfg: Config, - out_rx: mpsc::Receiver, + out_rx: mpsc::Receiver, subscribers: SubscriberList, - inbound_tx: mpsc::SyncSender>, + inbound_tx: InboundTx, ready_tx: mpsc::Sender>, ) { // discv5UdpPort defaults to 9000 in libwaku, so a second instance with @@ -210,7 +232,7 @@ impl Service { let subs_for_cb = subscribers.clone(); let event_closure = move |_ret: i32, data: &str| { - if let Some(payload) = Self::parse_message_received(data) { + if let Some(tagged) = Self::parse_message_received(data) { let mut guard = match subs_for_cb.lock() { Ok(g) => g, Err(e) => { @@ -218,7 +240,7 @@ impl Service { return; } }; - guard.retain(|tx| match tx.try_send(payload.clone()) { + guard.retain(|tx| match tx.try_send(tagged.clone()) { Ok(()) => true, Err(mpsc::TrySendError::Full(_)) => true, Err(mpsc::TrySendError::Disconnected(_)) => false, @@ -241,27 +263,41 @@ impl Service { // surface such an event via its callback mechanism for this to work. thread::sleep(Duration::from_secs(3)); - let default_topic = content_topic_for("delivery_address"); - if let Err(e) = node.subscribe(&default_topic) { - warn!("subscribe to {default_topic}: {e}"); - } else { - info!("subscribed to {default_topic}"); - } - let _ = ready_tx.send(Ok(())); while let Ok(cmd) = out_rx.recv() { - let result = node - .send(&cmd.message_json) - .map(|_| ()) - .map_err(DeliveryError::PublishFailed); - let _ = cmd.reply.try_send(result); + match cmd { + NodeCmd::Publish { + message_json, + reply, + } => { + let result = node + .send(&message_json) + .map(|_| ()) + .map_err(DeliveryError::PublishFailed); + let _ = reply.try_send(result); + } + NodeCmd::Subscribe { + content_topic, + reply, + } => { + let result = node + .subscribe(&content_topic) + .map_err(DeliveryError::SubscribeFailed); + if let Err(ref e) = result { + warn!("subscribe to {content_topic}: {e}"); + } else { + info!("subscribed to {content_topic}"); + } + let _ = reply.try_send(result); + } + } } info!("logos-node outbound loop finished"); } - fn parse_message_received(data: &str) -> Option> { + fn parse_message_received(data: &str) -> Option<(String, Vec)> { let event: WakuEvent = serde_json::from_str(data).ok()?; if event.event_type != "message_received" { @@ -269,12 +305,9 @@ impl Service { } let msg = event.message?; - - if !msg.content_topic.starts_with("/logos-chat/1/") { - return None; - } - - msg.payload.decode() + let addr = delivery_address_from_topic(&msg.content_topic)?; + let payload = msg.payload.decode()?; + Some((addr, payload)) } } @@ -292,7 +325,7 @@ impl DeliveryService for Service { let (reply_tx, reply_rx) = mpsc::sync_channel(1); self.outbound - .send(OutboundCmd { + .send(NodeCmd::Publish { message_json, reply: reply_tx, }) @@ -301,8 +334,16 @@ impl DeliveryService for Service { reply_rx.recv().map_err(|_| DeliveryError::ChannelClosed)? } - fn subscribe(&mut self, _: &str) -> Result<(), ::Error> { - // This Service does not support filtering - Ok(()) + fn subscribe(&mut self, delivery_address: &str) -> Result<(), DeliveryError> { + let content_topic = content_topic_for(delivery_address); + let (reply_tx, reply_rx) = mpsc::sync_channel(1); + self.outbound + .send(NodeCmd::Subscribe { + content_topic, + reply: reply_tx, + }) + .map_err(|_| DeliveryError::ChannelClosed)?; + + reply_rx.recv().map_err(|_| DeliveryError::ChannelClosed)? } } diff --git a/core/account/src/lib.rs b/core/account/src/lib.rs index c33c296..39a3889 100644 --- a/core/account/src/lib.rs +++ b/core/account/src/lib.rs @@ -1,5 +1,3 @@ -#[cfg(feature = "dev")] mod account; -#[cfg(feature = "dev")] pub use account::TestLogosAccount; diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index a0cac6f..54d18d8 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -1,10 +1,12 @@ mod client; mod delivery_in_process; mod errors; +mod topic_handler; pub use client::ChatClient; pub use delivery_in_process::{Cursor, InProcessDelivery, MessageBus}; pub use errors::ClientError; +pub use topic_handler::TopicHandler; // Re-export types callers need to interact with ChatClient pub use libchat::{ diff --git a/crates/client/src/topic_handler.rs b/crates/client/src/topic_handler.rs new file mode 100644 index 0000000..b58a3a3 --- /dev/null +++ b/crates/client/src/topic_handler.rs @@ -0,0 +1,19 @@ +/// Routes inbound transport payloads to per-topic handlers. +/// +/// Transports (`DeliveryService` implementations) deliver `(delivery_address, +/// payload)` pairs into an mpsc receiver. The receive loop walks a list of +/// handlers and offers each payload to the first one whose `matches` returns +/// `true`. The chat path (`ChatClient::receive`) is the default fall-through +/// when no handler claims a topic. +/// +/// Handlers own their own state and side-effects: results are surfaced via +/// channels or fields the handler controls, not through the trait. This keeps +/// the trait narrow enough for new topic-scoped services (key-package registry, +/// presence, broadcasts) to plug in without coupling to chat-specific types. +pub trait TopicHandler { + /// Returns `true` if this handler should process `delivery_address`. + fn matches(&self, delivery_address: &str) -> bool; + + /// Process a payload addressed to a topic this handler matched. + fn handle(&mut self, delivery_address: &str, payload: &[u8]); +}