From a5abefa31405531f87ddb58dda45946d0177ad9e Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Tue, 23 Jun 2026 12:02:01 -0700 Subject: [PATCH] ChatClient migration (#145) * Simplify client * Fixups * Update Cli-Client to use builder * undeprecate legacy convos * Allow Storage config in builder * bug fixes * Clippy fix * fixes --- Cargo.lock | 1 + bin/chat-cli/src/app.rs | 18 +- bin/chat-cli/src/main.rs | 55 +++-- bin/chat-cli/src/ui.rs | 71 +++--- .../src/conversation/group_v1.rs | 91 ++++---- core/conversations/src/lib.rs | 2 +- crates/client/Cargo.toml | 1 + .../client/examples/message-exchange/main.rs | 21 +- crates/client/src/builder.rs | 215 ++++++++++++++++++ crates/client/src/client.rs | 137 ++++------- crates/client/src/lib.rs | 6 +- crates/client/tests/saro_and_raya.rs | 136 ++++++----- 12 files changed, 491 insertions(+), 263 deletions(-) create mode 100644 crates/client/src/builder.rs diff --git a/Cargo.lock b/Cargo.lock index ba9d9df..a28de64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3605,6 +3605,7 @@ dependencies = [ "logos-account", "parking_lot", "shared-traits", + "storage", "tempfile", "thiserror", "tracing", diff --git a/bin/chat-cli/src/app.rs b/bin/chat-cli/src/app.rs index 239ee7a..534515a 100644 --- a/bin/chat-cli/src/app.rs +++ b/bin/chat-cli/src/app.rs @@ -5,7 +5,7 @@ use std::path::{Path, PathBuf}; use anyhow::Result; use arboard::Clipboard; use crossbeam_channel::Receiver; -use logos_chat::{ChatClient, EphemeralRegistry, Event, RegistrationService, Transport}; +use logos_chat::{ChatClient, ChatStore, Event, IdentityProvider, RegistrationService, Transport}; use serde::{Deserialize, Serialize}; use crate::utils::now; @@ -41,8 +41,14 @@ pub struct AppState { pub active_chat: Option, } -pub struct ChatApp { - pub client: ChatClient, +pub struct ChatApp +where + I: IdentityProvider + Send + 'static, + T: Transport, + R: RegistrationService + Send + 'static, + S: ChatStore + Send + 'static, +{ + pub client: ChatClient, events: Receiver, pub state: AppState, /// Ephemeral command output — not persisted, cleared on chat switch. @@ -53,13 +59,15 @@ pub struct ChatApp { state_path: PathBuf, } -impl ChatApp +impl ChatApp where + I: IdentityProvider + Send, T: Transport, R: RegistrationService + Send + 'static, + S: ChatStore + Send, { pub fn new( - client: ChatClient, + client: ChatClient, events: Receiver, user_name: &str, data_dir: &Path, diff --git a/bin/chat-cli/src/main.rs b/bin/chat-cli/src/main.rs index 842faf4..9a35b45 100644 --- a/bin/chat-cli/src/main.rs +++ b/bin/chat-cli/src/main.rs @@ -8,7 +8,10 @@ use std::path::{Path, PathBuf}; use anyhow::{Context, Result}; use clap::{Parser, ValueEnum}; use crossbeam_channel::Receiver; -use logos_chat::{ChatClient, Event, HttpRegistry, RegistrationService, StorageConfig, Transport}; +use logos_chat::{ + ChatClient, ChatClientBuilder, ChatStore, Event, HttpRegistry, IdentityProvider, + RegistrationService, StorageConfig, Transport, +}; use app::ChatApp; @@ -113,14 +116,20 @@ fn run(transport: T, cli: &Cli) -> Result<()> { match cli.registry_url.as_deref() { Some(url) => { let registry = HttpRegistry::new(url); - let (client, events) = - ChatClient::open_with_registry(cli.name.clone(), storage, transport, registry) - .map_err(|e| anyhow::anyhow!("{e:?}")) - .context("failed to open chat client with HTTP registry")?; + let (client, events) = ChatClientBuilder::new() + .transport(transport) + .storage_config(storage) + .registration(registry) + .build() + .map_err(|e| anyhow::anyhow!("{e:?}")) + .context("failed to open chat client with HTTP registry")?; launch_tui(client, events, cli) } None => { - let (client, events) = ChatClient::open(cli.name.clone(), storage, transport) + let (client, events) = ChatClientBuilder::new() + .transport(transport) + .storage_config(storage) + .build() .map_err(|e| anyhow::anyhow!("{e:?}")) .context("failed to open chat client")?; launch_tui(client, events, cli) @@ -128,10 +137,16 @@ fn run(transport: T, cli: &Cli) -> Result<()> { } } -fn launch_tui(client: ChatClient, events: Receiver, cli: &Cli) -> Result<()> +fn launch_tui( + client: ChatClient, + events: Receiver, + cli: &Cli, +) -> Result<()> where + I: IdentityProvider + Send, T: Transport, R: RegistrationService + Send + 'static, + S: ChatStore + Send, { let mut app = ChatApp::new(client, events, &cli.name, &cli.data)?; @@ -176,18 +191,22 @@ fn run_logos_delivery(cli: Cli) -> Result<()> { .to_str() .context("db path contains non-UTF-8 characters")? .to_string(); - logos_chat::ChatClient::open( - cli.name.clone(), - logos_chat::StorageConfig::Encrypted { + + logos_chat::ChatClientBuilder::new() + .storage_config(logos_chat::StorageConfig::Encrypted { path: db_str, key: "chat-cli".to_string(), - }, - delivery, - ) - .map_err(|e| anyhow::anyhow!("{e:?}")) - .context("failed to open persistent client")? + }) + .transport(delivery) + .build() + .map_err(|e| anyhow::anyhow!("{e:?}")) + .context("failed to open persistent client")? } - None => logos_chat::ChatClient::new(cli.name.clone(), delivery), + None => logos_chat::ChatClientBuilder::new() + .transport(delivery) + .build() + .map_err(|e| anyhow::anyhow!("{e:?}")) + .context("failed to open chat client")?, }; let mut app = ChatApp::new(client, events, &cli.name, &data_dir)?; @@ -209,10 +228,12 @@ fn run_logos_delivery(cli: Cli) -> Result<()> { ) } -fn run_app(terminal: &mut ui::Tui, app: &mut ChatApp) -> Result<()> +fn run_app(terminal: &mut ui::Tui, app: &mut ChatApp) -> Result<()> where + I: IdentityProvider + Send, T: Transport, R: RegistrationService + Send + 'static, + S: ChatStore + Send, { loop { app.process_incoming()?; diff --git a/bin/chat-cli/src/ui.rs b/bin/chat-cli/src/ui.rs index e1b9813..7dd2241 100644 --- a/bin/chat-cli/src/ui.rs +++ b/bin/chat-cli/src/ui.rs @@ -16,7 +16,7 @@ use ratatui::{ widgets::{Block, Borders, List, ListItem, Paragraph, Wrap}, }; -use logos_chat::{RegistrationService, Transport}; +use logos_chat::{ChatStore, IdentityProvider, RegistrationService, Transport}; use crate::app::ChatApp; @@ -38,10 +38,13 @@ pub fn restore() -> io::Result<()> { } /// Draw the UI. -pub fn draw( - frame: &mut Frame, - app: &ChatApp, -) { +pub fn draw(frame: &mut Frame, app: &ChatApp) +where + I: IdentityProvider + Send + 'static, + D: Transport + Send + 'static, + R: RegistrationService + Send + 'static, + S: ChatStore + Send + 'static, +{ let chunks = Layout::default() .direction(Direction::Vertical) .constraints([ @@ -58,11 +61,13 @@ pub fn draw( draw_status(frame, app, chunks[3]); } -fn draw_header( - frame: &mut Frame, - app: &ChatApp, - area: Rect, -) { +fn draw_header(frame: &mut Frame, app: &ChatApp, area: Rect) +where + I: IdentityProvider + Send + 'static, + D: Transport + Send + 'static, + R: RegistrationService + Send + 'static, + S: ChatStore + Send + 'static, +{ let title = match app.current_session() { Some(session) => { let id = &session.chat_id[..8.min(session.chat_id.len())]; @@ -85,11 +90,13 @@ fn draw_header( frame.render_widget(header, area); } -fn draw_messages( - frame: &mut Frame, - app: &ChatApp, - area: Rect, -) { +fn draw_messages(frame: &mut Frame, app: &ChatApp, area: Rect) +where + I: IdentityProvider + Send + 'static, + D: Transport + Send + 'static, + R: RegistrationService + Send + 'static, + S: ChatStore + Send + 'static, +{ let remote_name = app .current_session() .map(|s| s.display_name()) @@ -175,11 +182,13 @@ fn draw_messages( frame.render_stateful_widget(messages_widget, area, &mut list_state); } -fn draw_input( - frame: &mut Frame, - app: &ChatApp, - area: Rect, -) { +fn draw_input(frame: &mut Frame, app: &ChatApp, area: Rect) +where + I: IdentityProvider + Send + 'static, + D: Transport + Send + 'static, + R: RegistrationService + Send + 'static, + S: ChatStore + Send + 'static, +{ // Inner width: area minus borders (2). let inner_width = area.width.saturating_sub(2) as usize; let input_len = app.input.len(); @@ -206,11 +215,13 @@ fn draw_input( frame.set_cursor_position((cursor_x, area.y + 1)); } -fn draw_status( - frame: &mut Frame, - app: &ChatApp, - area: Rect, -) { +fn draw_status(frame: &mut Frame, app: &ChatApp, area: Rect) +where + I: IdentityProvider + Send + 'static, + D: Transport + Send + 'static, + R: RegistrationService + Send + 'static, + S: ChatStore + Send + 'static, +{ let status = Paragraph::new(app.status.as_str()) .style(Style::default().fg(Color::Gray)) .block(Block::default().title(" Status ").borders(Borders::ALL)) @@ -220,9 +231,13 @@ fn draw_status( } /// Handle keyboard events. -pub fn handle_events( - app: &mut ChatApp, -) -> io::Result { +pub fn handle_events(app: &mut ChatApp) -> io::Result +where + I: IdentityProvider + Send + 'static, + D: Transport + Send + 'static, + R: RegistrationService + Send + 'static, + S: ChatStore + Send + 'static, +{ // Poll for events with a short timeout to allow checking incoming messages if event::poll(std::time::Duration::from_millis(100))? && let Event::Key(key) = event::read()? diff --git a/core/conversations/src/conversation/group_v1.rs b/core/conversations/src/conversation/group_v1.rs index dbb87f7..b562a4a 100644 --- a/core/conversations/src/conversation/group_v1.rs +++ b/core/conversations/src/conversation/group_v1.rs @@ -9,12 +9,15 @@ use openmls::prelude::tls_codec::Deserialize; use openmls::prelude::*; use prost::Message as _; use shared_traits::IdentIdRef; +use std::collections::VecDeque; +use tracing::debug; use crate::account_directory::{AccountDirectory, resolve_device_ids}; use crate::conversation::ConversationIdRef; use crate::inbox_v2::MlsProvider; use crate::service_context::{ExternalServices, ServiceContext}; +use crate::utils::{blake2b_hex, hash_size}; use crate::{ DeliveryService, IdentityProvider, conversation::{ChatError, Convo, GroupConvo, Identified}, @@ -23,9 +26,13 @@ use crate::{ types::AddressedEncryptedPayload, }; +const OUTBOUND_HASH_CACHE_SIZE: usize = 25; + pub struct GroupV1Convo { mls_group: MlsGroup, convo_id: String, + // Cache outbound message Id's to filter out re-entrant messages + outbound_msgs: VecDeque, } impl std::fmt::Debug for GroupV1Convo { @@ -54,6 +61,7 @@ impl GroupV1Convo { Ok(Self { mls_group, convo_id, + outbound_msgs: VecDeque::new(), }) } @@ -76,6 +84,7 @@ impl GroupV1Convo { Ok(Self { mls_group, convo_id, + outbound_msgs: VecDeque::new(), }) } @@ -93,6 +102,7 @@ impl GroupV1Convo { Ok(GroupV1Convo { mls_group, convo_id, + outbound_msgs: VecDeque::new(), }) } @@ -100,8 +110,6 @@ impl GroupV1Convo { fn subscribe(ds: &mut impl DeliveryService, convo_id: &str) -> Result<(), ChatError> { ds.subscribe(&Self::delivery_address_from_id(convo_id)) .map_err(ChatError::generic)?; - ds.subscribe(&Self::ctrl_delivery_address_from_id(convo_id)) - .map_err(ChatError::generic)?; Ok(()) } @@ -129,18 +137,6 @@ impl GroupV1Convo { Self::delivery_address_from_id(&self.convo_id) } - fn ctrl_delivery_address_from_id(convo_id: &str) -> String { - let hash = Blake2b::::new() - .chain_update("ctrl_delivery_addr|") - .chain_update(convo_id) - .finalize(); - hex::encode(hash) - } - - fn ctrl_delivery_address(&self) -> String { - Self::ctrl_delivery_address_from_id(&self.convo_id) - } - /// Resolve an account to a KeyPackage for *every* device it authorizes. /// /// First resolves the account to its device ids through the account @@ -178,8 +174,8 @@ impl GroupV1Convo { fn send_message( &mut self, content: &[u8], - cx: &ServiceContext, - ) -> Result, ChatError> { + cx: &mut ServiceContext, + ) -> Result<(), ChatError> { let sender_id = cx.mls_identity.id().as_str(); let reliable = cx.causal.on_send(&self.convo_id, sender_id, content); let wire = reliable.encode_to_vec(); @@ -189,16 +185,38 @@ impl GroupV1Convo { .create_message(&cx.mls_provider, &cx.mls_identity, &wire) .unwrap(); - let a = AddressedEncryptedPayload { + let msg_bytes = mls_message_out.to_bytes().unwrap(); + self.send_payload(cx, msg_bytes) + } + + // Publish outboubound payloads to the DeliveryService + fn send_payload( + &mut self, + cx: &mut ServiceContext, + msg_bytes: Vec, + ) -> Result<(), ChatError> { + // Hash and Cache to detect inbound messages + let msg_hash = blake2b_hex::(&[&msg_bytes]); + self.outbound_msgs.push_back(msg_hash); + if self.outbound_msgs.len() > OUTBOUND_HASH_CACHE_SIZE { + let _ = self.outbound_msgs.remove(0); + } + + // Wrap in Payload frames + let aep = AddressedEncryptedPayload { delivery_address: self.delivery_address(), data: EncryptedPayload { encryption: Some(encrypted_payload::Encryption::Plaintext(Plaintext { - payload: mls_message_out.to_bytes().unwrap().into(), + payload: msg_bytes.into(), })), }, }; + let env = aep.into_envelope(self.convo_id.clone()); - Ok(vec![a]) + // Send via DS + cx.ds + .publish(env) + .map_err(|e| ChatError::Delivery(e.to_string())) } } @@ -214,13 +232,7 @@ impl Convo for GroupV1Convo { cx: &mut ServiceContext, content: &[u8], ) -> Result<(), ChatError> { - let payloads = self.send_message(content, cx)?; - for payload in payloads { - cx.ds - .publish(payload.into_envelope(self.id().into())) - .map_err(|e| ChatError::Delivery(e.to_string()))?; - } - Ok(()) + self.send_message(content, cx) } fn handle_frame( @@ -238,7 +250,14 @@ impl Convo for GroupV1Convo { } }; - let mls_message = + // Bail early if we sent this message + let msg_hash = blake2b_hex::(&[bytes.as_ref()]); + if self.outbound_msgs.contains(&msg_hash) { + debug!("Dropping message, sent from self"); + return Ok(ConvoOutcome::empty(self.convo_id.to_string())); + } + + let mls_message: MlsMessageIn = MlsMessageIn::tls_deserialize_exact_bytes(&bytes).map_err(ChatError::generic)?; let protocol_message: ProtocolMessage = mls_message @@ -332,22 +351,6 @@ impl GroupConvo for GroupV1Convo { .invite_user(&mut cx.ds, account_id, &welcome)?; } - let encrypted_payload = EncryptedPayload { - encryption: Some(encrypted_payload::Encryption::Plaintext(Plaintext { - payload: commit.to_bytes()?.into(), - })), - }; - - let addr_enc_payload = AddressedEncryptedPayload { - delivery_address: self.ctrl_delivery_address(), - data: encrypted_payload, - }; - // Prepare commit message - // TODO: (P1) Make GroupConvos agnostic to framing so its less error prone and more - let env = addr_enc_payload.into_envelope(self.convo_id.clone()); - - cx.ds - .publish(env) - .map_err(|e| ChatError::Generic(format!("Publish: {e}"))) + self.send_payload(cx, commit.to_bytes()?) } } diff --git a/core/conversations/src/lib.rs b/core/conversations/src/lib.rs index edc093e..f247933 100644 --- a/core/conversations/src/lib.rs +++ b/core/conversations/src/lib.rs @@ -29,6 +29,6 @@ pub use outcomes::{ pub use service_context::ExternalServices; pub use service_traits::{DeliveryService, RegistrationService, WakeupService}; pub use shared_traits::{IdentId, IdentIdRef, IdentityProvider}; -pub use storage::ConversationKind; +pub use storage::{ChatStore, ConversationKind}; pub use types::AddressedEnvelope; pub use utils::{hex_trunc, trunc}; diff --git a/crates/client/Cargo.toml b/crates/client/Cargo.toml index ec34895..02f6fc2 100644 --- a/crates/client/Cargo.toml +++ b/crates/client/Cargo.toml @@ -13,6 +13,7 @@ components = { workspace = true } crossbeam-channel = { workspace = true } crypto = { workspace = true } libchat = { workspace = true } +storage = { workspace = true } logos-account = { workspace = true, features = ["dev"]} shared-traits = { workspace = true } diff --git a/crates/client/examples/message-exchange/main.rs b/crates/client/examples/message-exchange/main.rs index 05c69ee..ea755d5 100644 --- a/crates/client/examples/message-exchange/main.rs +++ b/crates/client/examples/message-exchange/main.rs @@ -1,19 +1,28 @@ -use logos_chat::{ChatClient, Event, InProcessDelivery, MessageBus}; +use components::EphemeralRegistry; +use logos_chat::{ChatClientBuilder, Event, InProcessDelivery, MessageBus}; use std::time::Duration; fn main() { let bus = MessageBus::default(); - let saro_delivery = InProcessDelivery::new(bus.clone()); - let raya_delivery = InProcessDelivery::new(bus); + let reg = EphemeralRegistry::new(); - let (mut saro, saro_events) = ChatClient::new("saro", saro_delivery); - let (mut raya, raya_events) = ChatClient::new("raya", raya_delivery); + let (mut saro, saro_events) = ChatClientBuilder::new() + .transport(InProcessDelivery::new(bus.clone())) + .registration(reg.clone()) + .build() + .unwrap(); + + let (mut raya, raya_events) = ChatClientBuilder::new() + .transport(InProcessDelivery::new(bus)) + .registration(reg) + .build() + .unwrap(); let raya_bundle = raya.create_intro_bundle().unwrap(); + #[allow(deprecated)] saro.create_conversation(&raya_bundle, b"hello raya") .unwrap(); - // Raya's worker delivers the new conversation, then its initial message. let raya_convo_id = match raya_events.recv_timeout(Duration::from_secs(5)).unwrap() { Event::ConversationStarted { convo_id, .. } => convo_id, other => panic!("expected ConversationStarted, got {other:?}"), diff --git a/crates/client/src/builder.rs b/crates/client/src/builder.rs new file mode 100644 index 0000000..e093dfe --- /dev/null +++ b/crates/client/src/builder.rs @@ -0,0 +1,215 @@ +use components::EphemeralRegistry; +use crossbeam_channel::Receiver; +use libchat::{ChatError, ChatStorage, IdentityProvider, RegistrationService, StorageConfig}; +use storage::ChatStore; + +use crate::Transport; +use crate::client::ChatClient; +use crate::delegate::DelegateSigner; +use crate::errors::ClientError; +use crate::event::Event; + +/// Marker for a builder field that has not been configured; the corresponding +/// component will be filled in with a sensible default when `build()` is called. +pub struct Unset; + +pub struct ChatClientBuilder { + ident: I, + transport: T, + registration: R, + storage: S, +} + +impl Default for ChatClientBuilder { + fn default() -> Self { + Self { + ident: Unset, + transport: Unset, + registration: Unset, + storage: Unset, + } + } +} + +impl ChatClientBuilder { + pub fn new() -> Self { + Self::default() + } +} + +impl ChatClientBuilder { + pub fn ident(self, ident: NI) -> ChatClientBuilder { + ChatClientBuilder { + ident, + transport: self.transport, + registration: self.registration, + storage: self.storage, + } + } + + pub fn transport(self, transport: NT) -> ChatClientBuilder { + ChatClientBuilder { + ident: self.ident, + transport, + registration: self.registration, + storage: self.storage, + } + } + + pub fn registration(self, registration: NR) -> ChatClientBuilder { + ChatClientBuilder { + ident: self.ident, + transport: self.transport, + registration, + storage: self.storage, + } + } + + pub fn storage(self, storage: NS) -> ChatClientBuilder { + ChatClientBuilder { + ident: self.ident, + transport: self.transport, + registration: self.registration, + storage, + } + } + + pub fn storage_config(self, config: StorageConfig) -> ChatClientBuilder { + let storage = ChatStorage::new(config) + .map_err(ChatError::from) + .expect("Storage config file should be valid"); + + ChatClientBuilder { + ident: self.ident, + transport: self.transport, + registration: self.registration, + storage, + } + } +} + +type Built = Result<(ChatClient, Receiver), ClientError>; + +// All four explicitly provided. +impl ChatClientBuilder +where + I: IdentityProvider + Send + 'static, + T: Transport + Send + 'static, + R: RegistrationService + Send + 'static, + S: ChatStore + Send + 'static, +{ + pub fn build(self) -> Built { + ChatClient::new(self.ident, self.transport, self.registration, self.storage) + } +} + +// Transport only; I, R, S all default. +impl ChatClientBuilder { + pub fn build(self) -> Built { + ChatClient::new( + DelegateSigner::random(), + self.transport, + EphemeralRegistry::new(), + ChatStorage::in_memory(), + ) + } +} + +// I and T; R and S default. +impl ChatClientBuilder +where + I: IdentityProvider + Send + 'static, + T: Transport + Send + 'static, +{ + pub fn build(self) -> Built { + ChatClient::new( + self.ident, + self.transport, + EphemeralRegistry::new(), + ChatStorage::in_memory(), + ) + } +} + +// T and R; I and S default. +impl ChatClientBuilder +where + T: Transport + Send + 'static, + R: RegistrationService + Send + 'static, +{ + pub fn build(self) -> Built { + ChatClient::new( + DelegateSigner::random(), + self.transport, + self.registration, + ChatStorage::in_memory(), + ) + } +} + +// T and S; I and R default. +impl ChatClientBuilder +where + T: Transport + Send + 'static, + S: ChatStore + Send + 'static, +{ + pub fn build(self) -> Built { + ChatClient::new( + DelegateSigner::random(), + self.transport, + EphemeralRegistry::new(), + self.storage, + ) + } +} + +// I, T, and R; S defaults. +impl ChatClientBuilder +where + I: IdentityProvider + Send + 'static, + T: Transport + Send + 'static, + R: RegistrationService + Send + 'static, +{ + pub fn build(self) -> Built { + ChatClient::new( + self.ident, + self.transport, + self.registration, + ChatStorage::in_memory(), + ) + } +} + +// T, R, and S; I defaults. +impl ChatClientBuilder +where + T: Transport + Send + 'static, + R: RegistrationService + Send + 'static, + S: ChatStore + Send + 'static, +{ + pub fn build(self) -> Built { + ChatClient::new( + DelegateSigner::random(), + self.transport, + self.registration, + self.storage, + ) + } +} + +// I, T, and S; R defaults. +impl ChatClientBuilder +where + I: IdentityProvider + Send + 'static, + T: Transport + Send + 'static, + S: ChatStore + Send + 'static, +{ + pub fn build(self) -> Built { + ChatClient::new( + self.ident, + self.transport, + EphemeralRegistry::new(), + self.storage, + ) + } +} diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index c6dbcf6..fee12a9 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -1,21 +1,21 @@ use std::sync::Arc; use std::thread::{self, JoinHandle}; -use components::{EphemeralRegistry, ThreadedWakeupService, WakeupEvent}; +use components::{ThreadedWakeupService, WakeupEvent}; use crossbeam_channel::{Receiver, Sender, select}; use crypto::Ed25519VerifyingKey; use libchat::{ - AccountDirectory, ChatError, ChatStorage, ConversationId, ConvoOutcome, Core, DeliveryService, - IdentId, IdentIdRef, InboxOutcome, Introduction, PayloadOutcome, RegistrationService, - StorageConfig, + AccountDirectory, ConversationId, ConvoOutcome, Core, DeliveryService, IdentId, IdentIdRef, + IdentityProvider, InboxOutcome, Introduction, PayloadOutcome, RegistrationService, }; use parking_lot::Mutex; +use storage::ChatStore; -use crate::delegate::{DelegateCredential, DelegateSigner}; +use crate::delegate::DelegateCredential; use crate::errors::ClientError; use crate::event::Event; -type ClientCore = Core<(DelegateSigner, T, R, ThreadedWakeupService, ChatStorage)>; +type ClientCore = Core<(I, T, R, ThreadedWakeupService, S)>; type AccountAddressRef<'a> = &'a str; type LocalSignerId = IdentId; @@ -40,120 +40,50 @@ pub trait Transport: DeliveryService + Send + 'static { /// caller's thread: they briefly lock the core, invoke it, and return — no /// message-passing round-trip. The `Arc`/`Mutex`/threads live entirely here; /// the core never mentions threads. -pub struct ChatClient { +pub struct ChatClient +where + I: IdentityProvider + Send + 'static, + T: Transport + Send + 'static, + R: RegistrationService + Send + 'static, + S: ChatStore + Send + 'static, +{ /// `parking_lot::Mutex` for its eventual fairness: an inbound burst can't /// starve caller operations of the lock. - core: Arc>>, + core: Arc>>, /// Dropped on `Drop` to wake the worker's `select!` and shut it down. shutdown: Option>, worker: Option>, + address: String, } -// ── Default-registry constructors ──────────────────────────────────────────── - -impl ChatClient { - /// Create an in-memory, ephemeral client. Identity is lost on drop. - pub fn new(_: impl Into, mut transport: T) -> (Self, Receiver) { - let inbound = transport.inbound(); - let delegate = DelegateSigner::random(); - - let (wakeup_tx, wakeup_rx) = crossbeam_channel::unbounded(); - let wakeup_service = ThreadedWakeupService::new(wakeup_tx); - let core = Core::new_with_name( - delegate, - transport, - EphemeralRegistry::new(), - wakeup_service, - ChatStorage::in_memory(), - ) - .unwrap(); - Self::spawn(core, inbound, wakeup_rx) - } - - /// Open or create a persistent client backed by `StorageConfig`. - /// - /// If an identity already exists in storage it is loaded; otherwise a new - /// one is created and saved. - pub fn open( - _: impl Into, - config: StorageConfig, - mut transport: T, - ) -> Result<(Self, Receiver), ClientError> { - let store = ChatStorage::new(config).map_err(ChatError::from)?; - let inbound = transport.inbound(); - let delegate = DelegateSigner::random(); - let (wakeup_tx, wakeup_rx) = crossbeam_channel::unbounded(); - let wakeup_service = ThreadedWakeupService::new(wakeup_tx); - let core = Core::new_from_store( - delegate, - transport, - EphemeralRegistry::new(), - wakeup_service, - store, - )?; - Ok(Self::spawn(core, inbound, wakeup_rx)) - } -} - -// ── Caller-supplied registry + shared methods ──────────────────────────────── - -impl ChatClient +// -- GenericChatClient +impl ChatClient where + I: IdentityProvider + Send + 'static, T: Transport + Send + 'static, R: RegistrationService + Send + 'static, + S: ChatStore + Send + 'static, { - /// Open or create a persistent client with a caller-supplied registration - /// service. Use this to swap in a network-backed registry (e.g. the - /// testnet KeyPackage Registry) in place of the default in-memory store. - /// - /// Submits this account's KeyPackage to the registry as the last step of - /// construction. The default in-memory `open` path skips this call, but - /// when a real registry is wired in we want each session to publish so - /// other clients can fetch it. - pub fn open_with_registry( - _: impl Into, - config: StorageConfig, - mut transport: T, - registry: R, - ) -> Result<(Self, Receiver), ClientError> - where - T: Transport, - { - let store = ChatStorage::new(config).map_err(ChatError::from)?; - let inbound = transport.inbound(); - let delegate = DelegateSigner::random(); - let (wakeup_tx, wakeup_rx) = crossbeam_channel::unbounded(); - let wakeup_service = ThreadedWakeupService::new(wakeup_tx); - let mut core = Core::new_from_store(delegate, transport, registry, wakeup_service, store)?; - core.register_keypackage()?; - Ok(Self::spawn(core, inbound, wakeup_rx)) - } - - /// Create a client with ephemeral storage with the provided Transport and RegistrationService. - pub fn new_ephemeral( - delegate: DelegateSigner, + pub fn new( + ident: I, mut transport: T, reg: R, + storage: S, ) -> Result<(Self, Receiver), ClientError> { let inbound = transport.inbound(); let (wakeup_tx, wakeup_rx) = crossbeam_channel::unbounded(); let wakeup_service = ThreadedWakeupService::new(wakeup_tx); - let core = Core::new_with_name( - delegate, - transport, - reg, - wakeup_service, - ChatStorage::in_memory(), - )?; + let core = Core::new_with_name(ident, transport, reg, wakeup_service, storage)?; Ok(Self::spawn(core, inbound, wakeup_rx)) } fn spawn( - core: ClientCore, + core: ClientCore, inbound: Receiver>, wakeup_events: Receiver, ) -> (Self, Receiver) { + let address = core.ident_id().to_string(); let core = Arc::new(Mutex::new(core)); let (event_tx, event_rx) = crossbeam_channel::unbounded(); let (shutdown_tx, shutdown_rx) = crossbeam_channel::bounded::<()>(0); @@ -168,11 +98,16 @@ where core, shutdown: Some(shutdown_tx), worker: Some(worker), + address, }, event_rx, ) } + pub fn addr(&self) -> AccountAddressRef<'_> { + &self.address + } + /// Returns the installation name (identity label) of this client. pub fn installation_name(&self) -> String { self.core.lock().installation_name().to_string() @@ -237,7 +172,13 @@ where } } -impl Drop for ChatClient { +impl Drop for ChatClient +where + I: IdentityProvider + Send + 'static, + T: Transport + Send + 'static, + R: RegistrationService + Send + 'static, + S: ChatStore + Send + 'static, +{ fn drop(&mut self) { // Dropping the sender disconnects the worker's shutdown channel, waking // its `select!` so it can exit; then we join it. @@ -251,8 +192,8 @@ impl Drop for ChatClient { /// Background loop: block until an inbound payload or shutdown arrives, drive /// the core on each payload, and forward events. No polling — `select!` parks /// the thread until one of the channels is ready. -fn worker_loop( - core: Arc>>, +fn worker_loop( + core: Arc>>, inbound: Receiver>, wakeup_events: Receiver, shutdown: Receiver<()>, diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index 67cec22..a349036 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -1,9 +1,11 @@ +mod builder; mod client; mod delegate; mod delivery_in_process; mod errors; mod event; +pub use builder::{ChatClientBuilder, Unset}; pub use client::{ChatClient, Transport}; pub use delegate::DelegateSigner; pub use delivery_in_process::{InProcessDelivery, MessageBus}; @@ -12,8 +14,8 @@ pub use event::Event; // Re-export types callers need to interact with ChatClient. pub use libchat::{ - AddressedEnvelope, ConversationClass, ConversationId, DeliveryService, RegistrationService, - StorageConfig, + AddressedEnvelope, ChatStore, ConversationClass, ConversationId, DeliveryService, + IdentityProvider, RegistrationService, StorageConfig, }; // Re-export bundled registry implementations so callers can pick one without diff --git a/crates/client/tests/saro_and_raya.rs b/crates/client/tests/saro_and_raya.rs index 0b80a00..d3a3637 100644 --- a/crates/client/tests/saro_and_raya.rs +++ b/crates/client/tests/saro_and_raya.rs @@ -6,8 +6,8 @@ use crypto::Ed25519VerifyingKey; use libchat::{AccountDirectory, IdentityProvider, SignedDeviceBundle, encode_bundle_payload}; use logos_account::TestLogosAccount; use logos_chat::{ - AddressedEnvelope, ChatClient, DelegateSigner, DeliveryService, Event, InProcessDelivery, - MessageBus, StorageConfig, Transport, + AddressedEnvelope, ChatClient, ChatClientBuilder, DelegateSigner, DeliveryService, Event, + InProcessDelivery, MessageBus, Transport, }; /// Publish a signed device bundle endorsing `device` as a device of `account`, @@ -27,6 +27,24 @@ fn publish_device_bundle( reg.publish(&bundle).unwrap(); } +#[allow(clippy::type_complexity)] +fn create_test_client( + message_bus: MessageBus, + reg: EphemeralRegistry, +) -> Result< + ( + ChatClient, + Receiver, + ), + logos_chat::ClientError, +> { + let d = InProcessDelivery::new(message_bus); + ChatClientBuilder::new() + .transport(d) + .registration(reg) + .build() +} + /// Block until the next event arrives and matches; panic on timeout/mismatch. fn expect_event(events: &Receiver, label: &str, mut f: F) -> T where @@ -41,8 +59,35 @@ where #[test] fn direct_v1_integration() { let bus = MessageBus::default(); - let saro_delivery = InProcessDelivery::new(bus.clone()); - let raya_delivery = InProcessDelivery::new(bus); + let reg_service = EphemeralRegistry::new(); + + let (mut saro, _saro_events) = + create_test_client(bus.clone(), reg_service.clone()).expect("client create"); + let (raya, raya_events) = + create_test_client(bus.clone(), reg_service.clone()).expect("client create"); + + let convo_id = saro.create_direct_conversation(raya.addr()).unwrap(); + + // The invite payload yields ConversationStarted then MessageReceived. + expect_event(&raya_events, "ConversationStarted", |e| match e { + Event::ConversationStarted { convo_id, .. } => Ok(convo_id), + other => Err(other), + }); + + saro.send_message(&convo_id, b"Hey from saro") + .expect("payload mismatch"); + expect_event(&raya_events, "MessageReceived", |e| match e { + Event::MessageReceived { content, .. } => { + assert_eq!(content.as_slice(), b"Hey from saro"); + Ok(()) + } + other => Err(other), + }); +} + +#[test] +fn direct_v1_standalone_integration() { + let bus = MessageBus::default(); let mut reg_service = EphemeralRegistry::new(); @@ -58,16 +103,14 @@ fn direct_v1_integration() { let mut raya_delegate = DelegateSigner::random(); raya_delegate.associate(hex::encode(raya_account.public_key().as_ref())); publish_device_bundle(&mut reg_service, &raya_account, raya_delegate.public_key()); - let raya_delegate_id = raya_delegate.id().clone(); let (mut saro, _saro_events) = - ChatClient::new_ephemeral(saro_delegate, saro_delivery, reg_service.clone()).unwrap(); - let (_raya, raya_events) = - ChatClient::new_ephemeral(raya_delegate, raya_delivery, reg_service.clone()).unwrap(); + create_test_client(bus.clone(), reg_service.clone()).expect("client create"); + let (raya, raya_events) = + create_test_client(bus.clone(), reg_service.clone()).expect("client create"); - let convo_id = saro - .create_direct_conversation(raya_delegate_id.as_str()) - .unwrap(); + let raya_addr = raya.addr(); + let convo_id = saro.create_direct_conversation(raya_addr).unwrap(); // The invite payload yields ConversationStarted then MessageReceived. expect_event(&raya_events, "ConversationStarted", |e| match e { @@ -89,22 +132,26 @@ fn direct_v1_integration() { #[test] fn saro_raya_message_exchange() { let bus = MessageBus::default(); - let saro_delivery = InProcessDelivery::new(bus.clone()); - let raya_delivery = InProcessDelivery::new(bus); + let reg_service = EphemeralRegistry::new(); - let (mut saro, saro_events) = ChatClient::new("saro", saro_delivery); - let (mut raya, raya_events) = ChatClient::new("raya", raya_delivery); + let (mut saro, saro_events) = + create_test_client(bus.clone(), reg_service.clone()).expect("client create"); + let (mut raya, raya_events) = + create_test_client(bus.clone(), reg_service.clone()).expect("client create"); - let raya_bundle = raya.create_intro_bundle().unwrap(); let saro_convo_id = saro - .create_conversation(&raya_bundle, b"hello raya") - .unwrap(); + .create_direct_conversation(raya.addr()) + .expect("convo create"); - // The invite payload yields ConversationStarted then MessageReceived. + // Wait for raya to process the Welcome and subscribe to the convo delivery + // address before saro sends — MessageBus only fans out to current subscribers, + // so a message sent before raya subscribes would be silently dropped. let raya_convo_id = expect_event(&raya_events, "ConversationStarted", |e| match e { Event::ConversationStarted { convo_id, .. } => Ok(convo_id), other => Err(other), }); + + saro.send_message(&saro_convo_id, b"hello raya").unwrap(); expect_event(&raya_events, "MessageReceived", |e| match e { Event::MessageReceived { convo_id, content } => { assert_eq!(convo_id, raya_convo_id); @@ -201,8 +248,9 @@ impl Transport for FailingDelivery { #[test] fn dropping_client_shuts_down_worker() { - let delivery = InProcessDelivery::new(MessageBus::default()); - let (client, events) = ChatClient::new("saro", delivery); + let (client, events) = + create_test_client(MessageBus::default(), EphemeralRegistry::new()).expect("client create"); + drop(client); // Drop joins the worker; once joined its Sender is gone, so recv // reports the channel as disconnected. @@ -213,32 +261,17 @@ fn dropping_client_shuts_down_worker() { )); } -#[test] -fn publish_failure_surfaces_as_error() { - // A real raya just to mint a valid intro bundle. - let raya_delivery = InProcessDelivery::new(MessageBus::default()); - let (mut raya, _raya_events) = ChatClient::new("raya", raya_delivery); - let bundle = raya.create_intro_bundle().unwrap(); - - // FailingDelivery never receives; keep the inbound sender alive so the - // worker doesn't exit early on a disconnected channel. - let delivery = FailingDelivery::new(); - let _keep_inbound = delivery.inbound_sender(); - let (mut saro, _saro_events) = ChatClient::new("saro", delivery); - let result = saro.create_conversation(&bundle, b"hello"); - assert!( - result.is_err(), - "publish failure should surface as an error on the synchronous call" - ); -} - #[test] fn malformed_inbound_surfaces_as_error_event() { // Feed the worker's inbound channel bytes that can't be decoded and assert // it emits an InboundError instead of silently dropping the failure. let delivery = FailingDelivery::new(); let inbound_tx = delivery.inbound_sender(); - let (_saro, events) = ChatClient::new("saro", delivery); + + let (_client, events) = ChatClientBuilder::new() + .transport(delivery) + .build() + .expect("client create"); inbound_tx.send(b"not a valid payload".to_vec()).unwrap(); @@ -250,24 +283,3 @@ fn malformed_inbound_surfaces_as_error_event() { other => Err(other), }); } - -#[test] -fn open_persistent_client() { - let dir = tempfile::tempdir().unwrap(); - let db_path = dir.path().join("test.db").to_string_lossy().to_string(); - let config = StorageConfig::File(db_path); - - let delivery1 = InProcessDelivery::new(MessageBus::default()); - let (client1, _events1) = ChatClient::open("saro", config.clone(), delivery1).unwrap(); - let name1 = client1.installation_name(); - drop(client1); - - let delivery2 = InProcessDelivery::new(MessageBus::default()); - let (client2, _events2) = ChatClient::open("saro", config, delivery2).unwrap(); - let name2 = client2.installation_name(); - - assert_eq!( - name1, name2, - "installation name should persist across restarts" - ); -}