feat: key registry based on content topic

This commit is contained in:
kaichaosun 2026-05-26 20:43:43 +08:00
parent 279477cdeb
commit adf715139d
No known key found for this signature in database
GPG Key ID: 223E0F992F4F03BF
7 changed files with 139 additions and 53 deletions

View File

@ -5,7 +5,7 @@ use std::sync::mpsc;
use anyhow::Result;
use arboard::Clipboard;
use logos_chat::{ChatClient, ConversationIdOwned, DeliveryService};
use logos_chat::{ChatClient, ConversationIdOwned, DeliveryService, TopicHandler};
use serde::{Deserialize, Serialize};
use crate::utils::now;
@ -43,7 +43,9 @@ pub struct AppState {
pub struct ChatApp<D: DeliveryService> {
pub client: ChatClient<D>,
inbound: mpsc::Receiver<Vec<u8>>,
inbound: mpsc::Receiver<(String, Vec<u8>)>,
/// Handlers consulted before the chat fall-through; first match wins.
topic_handlers: Vec<Box<dyn TopicHandler>>,
pub state: AppState,
/// Ephemeral command output — not persisted, cleared on chat switch.
command_output: Vec<DisplayMessage>,
@ -56,7 +58,7 @@ pub struct ChatApp<D: DeliveryService> {
impl<D: DeliveryService + 'static> ChatApp<D> {
pub fn new(
client: ChatClient<D>,
inbound: mpsc::Receiver<Vec<u8>>,
inbound: mpsc::Receiver<(String, Vec<u8>)>,
user_name: &str,
data_dir: &Path,
) -> Result<Self> {
@ -77,6 +79,7 @@ impl<D: DeliveryService + 'static> ChatApp<D> {
Ok(Self {
client,
inbound,
topic_handlers: Vec::new(),
state,
command_output: Vec::new(),
input: String::new(),
@ -86,6 +89,14 @@ impl<D: DeliveryService + 'static> ChatApp<D> {
})
}
/// Register a handler consulted before the chat fall-through.
/// Handlers are tried in registration order; the first whose `matches`
/// returns `true` claims the payload.
#[allow(dead_code)]
pub fn register_topic_handler(&mut self, handler: Box<dyn TopicHandler>) {
self.topic_handlers.push(handler);
}
fn load_state(path: &Path) -> AppState {
if path.exists()
&& let Ok(contents) = fs::read_to_string(path)
@ -142,7 +153,16 @@ impl<D: DeliveryService + 'static> ChatApp<D> {
}
pub fn process_incoming(&mut self) -> Result<()> {
while let Ok(payload) = self.inbound.try_recv() {
while let Ok((topic, payload)) = self.inbound.try_recv() {
// Offer to non-chat handlers first; chat is the catch-all.
if let Some(handler) = self
.topic_handlers
.iter_mut()
.find(|h| h.matches(&topic))
{
handler.handle(&topic, &payload);
continue;
}
match self.client.receive(&payload) {
Ok(Some(content)) => {
let chat_id = &content.conversation_id;

View File

@ -93,7 +93,7 @@ fn main() -> Result<()> {
fn run<D: DeliveryService + 'static>(
transport: D,
inbound: mpsc::Receiver<Vec<u8>>,
inbound: mpsc::Receiver<(String, Vec<u8>)>,
cli: &Cli,
) -> Result<()> {
let db_path = cli

View File

@ -24,9 +24,10 @@ impl FileTransport {
///
/// Messages are written to `{transport_dir}/{delivery_address}/{hours_since_epoch}.bin`
/// as length-prefixed frames (`[u32 BE length][payload bytes]`). The background
/// thread reads all files under `transport_dir` and forwards every frame to
/// the returned channel; `client.receive()` discards frames it cannot decrypt.
pub fn new(transport_dir: &Path) -> io::Result<(Self, mpsc::Receiver<Vec<u8>>)> {
/// thread reads all files under `transport_dir` and forwards every frame —
/// tagged with its `delivery_address` subdir — to the returned channel;
/// `client.receive()` discards frames it cannot decrypt.
pub fn new(transport_dir: &Path) -> io::Result<(Self, mpsc::Receiver<(String, Vec<u8>)>)> {
fs::create_dir_all(transport_dir)?;
let (tx, rx) = mpsc::sync_channel(1024);
@ -77,14 +78,14 @@ fn current_hour() -> u64 {
/ 3600
}
fn poll_reader(transport_dir: PathBuf, tx: mpsc::SyncSender<Vec<u8>>) {
fn poll_reader(transport_dir: PathBuf, tx: mpsc::SyncSender<(String, Vec<u8>)>) {
// Maps absolute file path → number of bytes already consumed.
let mut offsets: BTreeMap<PathBuf, u64> = BTreeMap::new();
loop {
let bin_files = collect_bin_files(&transport_dir);
for path in bin_files {
for (addr, path) in bin_files {
let offset = offsets.entry(path.clone()).or_insert(0);
let file = match File::open(&path) {
@ -106,7 +107,7 @@ fn poll_reader(transport_dir: PathBuf, tx: mpsc::SyncSender<Vec<u8>>) {
if reader.read_exact(&mut payload).is_err() {
break; // partial frame — wait for writer to finish
}
let _ = tx.try_send(payload);
let _ = tx.try_send((addr.clone(), payload));
*offset += (4 + len) as u64;
}
}
@ -115,9 +116,10 @@ fn poll_reader(transport_dir: PathBuf, tx: mpsc::SyncSender<Vec<u8>>) {
}
}
/// Walk `transport_dir/*/` and collect all `*.bin` files, sorted by path
/// Walk `transport_dir/*/` and collect all `*.bin` files paired with their
/// `delivery_address` (the subdirectory name), sorted by path
/// (address subdir first, then filename = hour order).
fn collect_bin_files(transport_dir: &Path) -> Vec<PathBuf> {
fn collect_bin_files(transport_dir: &Path) -> Vec<(String, PathBuf)> {
let mut files = Vec::new();
let Ok(addr_entries) = fs::read_dir(transport_dir) else {
return files;
@ -127,13 +129,17 @@ fn collect_bin_files(transport_dir: &Path) -> Vec<PathBuf> {
if !addr_path.is_dir() {
continue;
}
let Some(addr) = addr_path.file_name().and_then(|s| s.to_str()) else {
continue;
};
let addr = addr.to_string();
let Ok(file_entries) = fs::read_dir(&addr_path) else {
continue;
};
for file_entry in file_entries.flatten() {
let p = file_entry.path();
if p.extension().is_some_and(|e| e == "bin") {
files.push(p);
files.push((addr.clone(), p));
}
}
}

View File

@ -7,7 +7,8 @@
//! ## Content topic mapping
//!
//! `AddressedEnvelope::delivery_address` maps to logos-delivery content topic
//! `/logos-chat/1/{delivery_address}/proto`.
//! `/logos-chat/1/{delivery_address}/proto`. Inbound payloads are tagged with
//! the recovered `delivery_address` so callers can dispatch by topic.
pub(crate) mod sys;
pub(crate) mod wrapper;
@ -27,6 +28,16 @@ pub fn content_topic_for(delivery_address: &str) -> String {
format!("/logos-chat/1/{delivery_address}/proto")
}
const CONTENT_TOPIC_PREFIX: &str = "/logos-chat/1/";
const CONTENT_TOPIC_SUFFIX: &str = "/proto";
fn delivery_address_from_topic(content_topic: &str) -> Option<String> {
content_topic
.strip_prefix(CONTENT_TOPIC_PREFIX)?
.strip_suffix(CONTENT_TOPIC_SUFFIX)
.map(str::to_owned)
}
// ── Error ────────────────────────────────────────────────────────────────────
#[derive(Debug, thiserror::Error)]
@ -35,18 +46,29 @@ pub enum DeliveryError {
StartupFailed(String),
#[error("publish failed: {0}")]
PublishFailed(String),
#[error("subscribe failed: {0}")]
SubscribeFailed(String),
#[error("send channel closed")]
ChannelClosed,
}
// ── Internals ────────────────────────────────────────────────────────────────
struct OutboundCmd {
message_json: String,
reply: mpsc::SyncSender<Result<(), DeliveryError>>,
enum NodeCmd {
Publish {
message_json: String,
reply: mpsc::SyncSender<Result<(), DeliveryError>>,
},
Subscribe {
content_topic: String,
reply: mpsc::SyncSender<Result<(), DeliveryError>>,
},
}
type SubscriberList = Arc<Mutex<Vec<mpsc::SyncSender<Vec<u8>>>>>;
/// Inbound payloads carry the recovered delivery_address alongside bytes so
/// callers can route by topic before decoding.
type InboundTx = mpsc::SyncSender<(String, Vec<u8>)>;
type SubscriberList = Arc<Mutex<Vec<InboundTx>>>;
// ── Config ───────────────────────────────────────────────────────────────────
@ -120,21 +142,21 @@ impl WakuPayload {
/// the same background node.
#[derive(Clone, Debug)]
pub struct Service {
outbound: mpsc::SyncSender<OutboundCmd>,
outbound: mpsc::SyncSender<NodeCmd>,
#[allow(dead_code)]
subscribers: SubscriberList,
}
impl Service {
/// Start the embedded logos-delivery node. Returns the service and a
/// receiver for inbound raw payloads.
pub fn start(cfg: Config) -> Result<(Self, mpsc::Receiver<Vec<u8>>), DeliveryError> {
let (out_tx, out_rx) = mpsc::sync_channel::<OutboundCmd>(256);
/// receiver for inbound `(delivery_address, payload)` pairs.
pub fn start(cfg: Config) -> Result<(Self, mpsc::Receiver<(String, Vec<u8>)>), DeliveryError> {
let (out_tx, out_rx) = mpsc::sync_channel::<NodeCmd>(256);
let subscribers: SubscriberList = Arc::new(Mutex::new(Vec::new()));
let (ready_tx, ready_rx) = mpsc::channel::<Result<(), DeliveryError>>();
// Create the inbound channel before spawning so the receiver is
// registered inside the thread, before any event callback fires.
let (inbound_tx, inbound_rx) = mpsc::sync_channel::<Vec<u8>>(1024);
let (inbound_tx, inbound_rx) = mpsc::sync_channel::<(String, Vec<u8>)>(1024);
let subs_for_thread = subscribers.clone();
@ -178,9 +200,9 @@ impl Service {
fn node_thread(
cfg: Config,
out_rx: mpsc::Receiver<OutboundCmd>,
out_rx: mpsc::Receiver<NodeCmd>,
subscribers: SubscriberList,
inbound_tx: mpsc::SyncSender<Vec<u8>>,
inbound_tx: InboundTx,
ready_tx: mpsc::Sender<Result<(), DeliveryError>>,
) {
// discv5UdpPort defaults to 9000 in libwaku, so a second instance with
@ -210,7 +232,7 @@ impl Service {
let subs_for_cb = subscribers.clone();
let event_closure = move |_ret: i32, data: &str| {
if let Some(payload) = Self::parse_message_received(data) {
if let Some(tagged) = Self::parse_message_received(data) {
let mut guard = match subs_for_cb.lock() {
Ok(g) => g,
Err(e) => {
@ -218,7 +240,7 @@ impl Service {
return;
}
};
guard.retain(|tx| match tx.try_send(payload.clone()) {
guard.retain(|tx| match tx.try_send(tagged.clone()) {
Ok(()) => true,
Err(mpsc::TrySendError::Full(_)) => true,
Err(mpsc::TrySendError::Disconnected(_)) => false,
@ -241,27 +263,41 @@ impl Service {
// surface such an event via its callback mechanism for this to work.
thread::sleep(Duration::from_secs(3));
let default_topic = content_topic_for("delivery_address");
if let Err(e) = node.subscribe(&default_topic) {
warn!("subscribe to {default_topic}: {e}");
} else {
info!("subscribed to {default_topic}");
}
let _ = ready_tx.send(Ok(()));
while let Ok(cmd) = out_rx.recv() {
let result = node
.send(&cmd.message_json)
.map(|_| ())
.map_err(DeliveryError::PublishFailed);
let _ = cmd.reply.try_send(result);
match cmd {
NodeCmd::Publish {
message_json,
reply,
} => {
let result = node
.send(&message_json)
.map(|_| ())
.map_err(DeliveryError::PublishFailed);
let _ = reply.try_send(result);
}
NodeCmd::Subscribe {
content_topic,
reply,
} => {
let result = node
.subscribe(&content_topic)
.map_err(DeliveryError::SubscribeFailed);
if let Err(ref e) = result {
warn!("subscribe to {content_topic}: {e}");
} else {
info!("subscribed to {content_topic}");
}
let _ = reply.try_send(result);
}
}
}
info!("logos-node outbound loop finished");
}
fn parse_message_received(data: &str) -> Option<Vec<u8>> {
fn parse_message_received(data: &str) -> Option<(String, Vec<u8>)> {
let event: WakuEvent = serde_json::from_str(data).ok()?;
if event.event_type != "message_received" {
@ -269,12 +305,9 @@ impl Service {
}
let msg = event.message?;
if !msg.content_topic.starts_with("/logos-chat/1/") {
return None;
}
msg.payload.decode()
let addr = delivery_address_from_topic(&msg.content_topic)?;
let payload = msg.payload.decode()?;
Some((addr, payload))
}
}
@ -292,7 +325,7 @@ impl DeliveryService for Service {
let (reply_tx, reply_rx) = mpsc::sync_channel(1);
self.outbound
.send(OutboundCmd {
.send(NodeCmd::Publish {
message_json,
reply: reply_tx,
})
@ -301,8 +334,16 @@ impl DeliveryService for Service {
reply_rx.recv().map_err(|_| DeliveryError::ChannelClosed)?
}
fn subscribe(&mut self, _: &str) -> Result<(), <Self as DeliveryService>::Error> {
// This Service does not support filtering
Ok(())
fn subscribe(&mut self, delivery_address: &str) -> Result<(), DeliveryError> {
let content_topic = content_topic_for(delivery_address);
let (reply_tx, reply_rx) = mpsc::sync_channel(1);
self.outbound
.send(NodeCmd::Subscribe {
content_topic,
reply: reply_tx,
})
.map_err(|_| DeliveryError::ChannelClosed)?;
reply_rx.recv().map_err(|_| DeliveryError::ChannelClosed)?
}
}

View File

@ -1,5 +1,3 @@
#[cfg(feature = "dev")]
mod account;
#[cfg(feature = "dev")]
pub use account::TestLogosAccount;

View File

@ -1,10 +1,12 @@
mod client;
mod delivery_in_process;
mod errors;
mod topic_handler;
pub use client::ChatClient;
pub use delivery_in_process::{Cursor, InProcessDelivery, MessageBus};
pub use errors::ClientError;
pub use topic_handler::TopicHandler;
// Re-export types callers need to interact with ChatClient
pub use libchat::{

View File

@ -0,0 +1,19 @@
/// Routes inbound transport payloads to per-topic handlers.
///
/// Transports (`DeliveryService` implementations) deliver `(delivery_address,
/// payload)` pairs into an mpsc receiver. The receive loop walks a list of
/// handlers and offers each payload to the first one whose `matches` returns
/// `true`. The chat path (`ChatClient::receive`) is the default fall-through
/// when no handler claims a topic.
///
/// Handlers own their own state and side-effects: results are surfaced via
/// channels or fields the handler controls, not through the trait. This keeps
/// the trait narrow enough for new topic-scoped services (key-package registry,
/// presence, broadcasts) to plug in without coupling to chat-specific types.
pub trait TopicHandler {
/// Returns `true` if this handler should process `delivery_address`.
fn matches(&self, delivery_address: &str) -> bool;
/// Process a payload addressed to a topic this handler matched.
fn handle(&mut self, delivery_address: &str, payload: &[u8]);
}