From c677cc93344bb8e74a71cf73f6bde2741c4c527f Mon Sep 17 00:00:00 2001 From: osmaczko <33099791+osmaczko@users.noreply.github.com> Date: Thu, 28 May 2026 23:51:15 +0200 Subject: [PATCH] feat: introduce client event system (#106) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore(flake): accept extra system attr; add perl for openssl-sys build forAllSystems calls the lambda with {system, pkgs}; strict destructuring requires `..` to ignore the system attribute. `pkgs.perl` is needed because openssl-sys is pulled vendored via libsqlite3-sys / rusqlite / chat-sqlite, and its `perl Configure` step needs FindBin.pm, which Fedora's system perl doesn't ship. * feat: introduce client event system - Core processing yields a `PayloadOutcome` enum — `Empty`, `Convo`, or `Inbox`. `ConvoOutcome` carries a conversation id and an optional decrypted `Content`; `InboxOutcome` adds a `NewConversation` (id + `ConversationClass`) for a peer-initiated conversation. - Client translates `PayloadOutcome` into app-facing `Vec` (`ConversationStarted`, `MessageReceived`) at the boundary, so the application loop sees discrete events rather than core types. - MLS group welcomes produce a `ConversationStarted` event with no initial content, fixing the silent-group-join case where the inbox layer dropped the observation. - C FFI exposes an `EventList` opaque type with indexed accessors and an `Invalid` sentinel for out-of-bounds / non-applicable reads. - Symmetric `Inbox` / `InboxV2` handlers: both return `Result` and own the persistence + ephemeral-key cleanup for the conversations they create. - Updated and simplified `docs/adr/0001-client-event-system.md`. * chore(flake): bump nixpkgs to nixos-unstable-small Temporary. The two crates.io UA fixes (NixOS/nixpkgs#512735 for fetchCargoVendor's python-requests UA, NixOS/nixpkgs#524985 for importCargoLock's curl UA) haven't propagated to nixos-unstable yet. Switch to nixos-unstable-small and force logos-delivery to follow so the smoketest gets the same fix. Revert once nixos-unstable catches up. Refs: - https://github.com/rust-lang/crates.io/issues/13482 - https://github.com/rust-lang/crates.io/issues/13783 - https://crates.io/data-access --- .github/workflows/ci.yml | 9 +- .gitignore | 2 +- bin/chat-cli/src/app.rs | 79 +++-- core/conversations/src/context.rs | 76 ++--- core/conversations/src/conversation.rs | 26 +- .../src/conversation/group_test.rs | 41 --- .../src/conversation/group_v1.rs | 31 +- .../src/conversation/privatev1.rs | 42 ++- core/conversations/src/inbox/handler.rs | 52 ++- core/conversations/src/inbox_v2.rs | 21 +- core/conversations/src/lib.rs | 9 +- core/conversations/src/outcomes.rs | 81 +++++ core/conversations/src/types.rs | 9 - .../tests/mls_integration.rs | 106 ++++-- .../tests/private_integration.rs | 97 ++++-- .../examples/message-exchange/src/main.c | 105 +++--- crates/client-ffi/src/api.rs | 213 ++++++++---- .../client/examples/message-exchange/main.rs | 33 +- crates/client/src/client.rs | 70 +++- crates/client/src/event.rs | 27 ++ crates/client/src/lib.rs | 6 +- crates/client/tests/saro_and_raya.rs | 65 ++-- docs/adr/0001-client-event-system.md | 303 +++--------------- flake.lock | 48 +-- flake.nix | 12 +- 25 files changed, 851 insertions(+), 712 deletions(-) delete mode 100644 core/conversations/src/conversation/group_test.rs create mode 100644 core/conversations/src/outcomes.rs create mode 100644 crates/client/src/event.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 66c49ba..32163cd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -73,10 +73,15 @@ jobs: working-directory: crates/client-ffi/examples/message-exchange - name: Build logos-delivery run: nix build .#logos-delivery + # Build and run chat-cli through the dev shell so it links against the + # same Nix glibc as the prebuilt liblogosdelivery.so. A plain `cargo + # build` uses the runner's system glibc, which is older than Nix's and + # mismatches it at runtime (libc.so.6: version `GLIBC_ABI_DT_X86_64_PLT' + # not found, required by Nix glibc's libm.so.6). - name: Build chat-cli (logos-delivery) - run: LOGOS_DELIVERY_LIB_DIR=./result/lib cargo build --release -p chat-cli + run: nix develop -c bash -c 'LOGOS_DELIVERY_LIB_DIR=./result/lib cargo build --release -p chat-cli' - name: Run chat-cli smoketest - run: ./target/release/chat-cli --name ci-test --smoketest + run: nix develop -c ./target/release/chat-cli --name ci-test --smoketest nix-build: name: Nix Build diff --git a/.gitignore b/.gitignore index 9ccb3e1..2df56b0 100644 --- a/.gitignore +++ b/.gitignore @@ -39,4 +39,4 @@ result crates/client-ffi/client_ffi.h # Compiled C FFI example binary -examples/c-ffi/c-client +crates/client-ffi/examples/message-exchange/c-client diff --git a/bin/chat-cli/src/app.rs b/bin/chat-cli/src/app.rs index de9e47b..155c3a5 100644 --- a/bin/chat-cli/src/app.rs +++ b/bin/chat-cli/src/app.rs @@ -5,7 +5,7 @@ use std::sync::mpsc; use anyhow::Result; use arboard::Clipboard; -use logos_chat::{ChatClient, ConversationIdOwned, DeliveryService}; +use logos_chat::{ChatClient, DeliveryService, Event}; use serde::{Deserialize, Serialize}; use crate::utils::now; @@ -144,41 +144,57 @@ impl ChatApp { pub fn process_incoming(&mut self) -> Result<()> { while let Ok(payload) = self.inbound.try_recv() { match self.client.receive(&payload) { - Ok(Some(content)) => { - let chat_id = &content.conversation_id; - - if !self.state.chats.contains_key(chat_id) && content.is_new_convo { - let session = ChatSession { - chat_id: chat_id.clone(), - nickname: None, - messages: Vec::new(), - }; - self.state.chats.insert(chat_id.clone(), session); - let label = chat_id[..8.min(chat_id.len())].to_string(); - self.set_active_chat(Some(chat_id.clone())); - self.status = format!("New chat ({label})! Use /nickname to name it."); + Ok(events) => { + for event in events { + self.handle_event(event); } - - if !content.data.is_empty() { - let text = String::from_utf8_lossy(&content.data).to_string(); - if let Some(session) = self.state.chats.get_mut(chat_id) { - session.messages.push(DisplayMessage { - from_self: false, - content: text, - timestamp: now(), - }); - } - } - self.save_state()?; } - Ok(None) => {} - Err(e) => tracing::warn!("receive error: {e:?}"), + Err(e) => { + tracing::warn!("receive error: {e:?}"); + self.status = format!("Could not decrypt incoming message: {e}"); + } } } Ok(()) } + fn handle_event(&mut self, event: Event) { + match event { + Event::ConversationStarted { convo_id, .. } => { + let chat_id = convo_id.to_string(); + if self.state.chats.contains_key(&chat_id) { + return; + } + self.state.chats.insert( + chat_id.clone(), + ChatSession { + chat_id: chat_id.clone(), + nickname: None, + messages: Vec::new(), + }, + ); + let label = &chat_id[..8.min(chat_id.len())]; + self.status = format!("New chat ({label})! Use /nickname to name it."); + self.set_active_chat(Some(chat_id)); + } + Event::MessageReceived { + convo_id, content, .. + } => { + let chat_id = convo_id.to_string(); + let Some(session) = self.state.chats.get_mut(&chat_id) else { + return; + }; + session.messages.push(DisplayMessage { + from_self: false, + content: String::from_utf8_lossy(&content).into_owned(), + timestamp: now(), + }); + } + _ => {} + } + } + pub fn send_message(&mut self, content: &str) -> Result<()> { let chat_id = self .state @@ -186,10 +202,8 @@ impl ChatApp { .clone() .ok_or_else(|| anyhow::anyhow!("No active chat. Use /connect or /switch first."))?; - let convo_id: ConversationIdOwned = chat_id.as_str().into(); - self.client - .send_message(&convo_id, content.as_bytes()) + .send_message(&chat_id, content.as_bytes()) .map_err(|e| anyhow::anyhow!("{e:?}"))?; if let Some(session) = self.state.chats.get_mut(&chat_id) { @@ -253,12 +267,11 @@ impl ChatApp { return Ok(Some("Usage: /connect ".to_string())); } let initial = format!("Hello from {}!", self.user_name); - let convo_id = self + let chat_id = self .client .create_conversation(args.as_bytes(), initial.as_bytes()) .map_err(|e| anyhow::anyhow!("{e:?}"))?; - let chat_id = convo_id.to_string(); let label = chat_id[..8.min(chat_id.len())].to_string(); let mut session = ChatSession { chat_id: chat_id.clone(), diff --git a/core/conversations/src/context.rs b/core/conversations/src/context.rs index 58bbc97..e9143fa 100644 --- a/core/conversations/src/context.rs +++ b/core/conversations/src/context.rs @@ -1,5 +1,4 @@ use std::cell::{Ref, RefMut}; -use std::sync::Arc; use std::{cell::RefCell, rc::Rc}; use crate::account::LogosAccount; @@ -8,17 +7,18 @@ use crate::conversation::{Convo, GroupConvo}; use crate::{DeliveryService, RegistrationService}; use crate::{ - conversation::{Conversation, Id, PrivateV1Convo}, + conversation::{Id, PrivateV1Convo}, errors::ChatError, inbox::Inbox, inbox_v2::InboxV2, + outcomes::{ConvoOutcome, InboxOutcome, PayloadOutcome}, proto::{EncryptedPayload, EnvelopeV1, Message}, - types::{AccountId, AddressedEnvelope, ContentData}, + types::{AccountId, AddressedEnvelope}, }; use crypto::{Identity, PublicKey}; use storage::{ChatStore, ConversationKind}; -pub use crate::conversation::{ConversationId, ConversationIdOwned}; +pub use crate::conversation::ConversationId; pub use crate::inbox::Introduction; // This is the main entry point to the conversations api. @@ -163,7 +163,7 @@ where &mut self, remote_bundle: &Introduction, content: &[u8], - ) -> Result<(ConversationIdOwned, Vec), ChatError> { + ) -> Result<(ConversationId, Vec), ChatError> { let (mut convo, payloads) = self .inbox .invite_to_private_convo(remote_bundle, content, Rc::clone(&self.store)) @@ -198,12 +198,9 @@ where Ok(Box::new(convo)) } - pub fn list_conversations(&self) -> Result, ChatError> { + pub fn list_conversations(&self) -> Result, ChatError> { let records = self.store.borrow().load_conversations()?; - Ok(records - .into_iter() - .map(|r| Arc::from(r.local_convo_id.as_str())) - .collect()) + Ok(records.into_iter().map(|r| r.local_convo_id).collect()) } pub fn take_missing_messages(&self) -> Vec { @@ -212,7 +209,7 @@ where pub fn send_content( &mut self, - convo_id: ConversationId, + convo_id: &str, content: &[u8], ) -> Result, ChatError> { let mut convo = self.load_convo(convo_id)?; @@ -225,62 +222,44 @@ where } // Decode bytes and send to protocol for processing. - pub fn handle_payload(&mut self, payload: &[u8]) -> Result, ChatError> { + pub fn handle_payload(&mut self, payload: &[u8]) -> Result { let env = EnvelopeV1::decode(payload)?; // TODO: Impl Conversation hinting let convo_id = env.conversation_hint; match convo_id { - c if c == self.inbox.id() => self.dispatch_to_inbox(&env.payload), - c if c == self.pq_inbox.id() => self.dispatch_to_inbox2(&env.payload), + c if c == self.inbox.id() => self.dispatch_to_inbox(&env.payload).map(Into::into), + c if c == self.pq_inbox.id() => self.dispatch_to_inbox2(&env.payload).map(Into::into), c if self.store.borrow().has_conversation(&c)? => { - self.dispatch_to_convo(&c, &env.payload) + self.dispatch_to_convo(&c, &env.payload).map(Into::into) } - _ => Ok(Some(ContentData { - conversation_id: "".into(), - data: vec![], - is_new_convo: false, - })), + _ => Ok(PayloadOutcome::Empty), } } - // Dispatch encrypted payload to Inbox, and register the created Conversation - fn dispatch_to_inbox( - &mut self, - enc_payload_bytes: &[u8], - ) -> Result, ChatError> { + // Dispatch encrypted payload to Inbox. The Inbox persists the newly + // created conversation and consumes the ephemeral key internally. + fn dispatch_to_inbox(&mut self, enc_payload_bytes: &[u8]) -> Result { // EncryptedPayloads are not used by GroupConvos at this time, else this can be performed in `handle_payload` // TODO: (P1) reconcile envelope parsing between Covno and GroupConvo let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?; let public_key_hex = Inbox::::extract_ephemeral_key_hex(&enc_payload)?; - let (convo, content) = - self.inbox - .handle_frame(enc_payload, &public_key_hex, Rc::clone(&self.store))?; - - match convo { - Conversation::Private(mut convo) => convo.persist()?, - }; - - self.store - .borrow_mut() - .remove_ephemeral_key(&public_key_hex)?; - Ok(content) + self.inbox + .handle_frame(enc_payload, &public_key_hex, Rc::clone(&self.store)) } - // Dispatch encrypted payload to Inbox, and register the created Conversation - fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result, ChatError> { - self.pq_inbox.handle_frame(payload)?; - - Ok(None) + // Dispatch encrypted payload to the post-quantum inbox. + fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result { + self.pq_inbox.handle_frame(payload) } // Dispatch encrypted payload to its corresponding conversation fn dispatch_to_convo( &mut self, - convo_id: ConversationId, + convo_id: &str, enc_payload_bytes: &[u8], - ) -> Result, ChatError> { + ) -> Result { let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?; let mut convo = self.load_convo(convo_id)?; convo.handle_frame(enc_payload) @@ -291,15 +270,12 @@ where Ok(intro.into()) } - pub fn get_convo( - &mut self, - convo_id: ConversationId, - ) -> Result>, ChatError> { + pub fn get_convo(&mut self, convo_id: &str) -> Result>, ChatError> { self.load_group_convo(convo_id) } /// Loads a conversation from DB by constructing it from metadata. - fn load_convo(&mut self, convo_id: ConversationId) -> Result, ChatError> { + fn load_convo(&mut self, convo_id: &str) -> Result, ChatError> { let record = self .store .borrow() @@ -327,7 +303,7 @@ where fn load_group_convo( &mut self, - convo_id: ConversationId, + convo_id: &str, ) -> Result>, ChatError> { let record = self .store diff --git a/core/conversations/src/conversation.rs b/core/conversations/src/conversation.rs index 702ca93..8b81fe6 100644 --- a/core/conversations/src/conversation.rs +++ b/core/conversations/src/conversation.rs @@ -3,23 +3,22 @@ mod privatev1; use crate::{ DeliveryService, + outcomes::ConvoOutcome, service_traits::KeyPackageProvider, - types::{AccountId, AddressedEncryptedPayload, ContentData}, + types::{AccountId, AddressedEncryptedPayload}, }; use chat_proto::logoschat::encryption::EncryptedPayload; use std::fmt::Debug; -use std::sync::Arc; -use storage::{ConversationKind, ConversationStore, RatchetStore}; +use storage::ConversationKind; pub use crate::errors::ChatError; pub use group_v1::{GroupV1Convo, IdentityProvider}; pub use privatev1::PrivateV1Convo; -pub type ConversationId<'a> = &'a str; -pub type ConversationIdOwned = Arc; +pub type ConversationId = String; pub trait Id: Debug { - fn id(&self) -> ConversationId<'_>; + fn id(&self) -> &str; } pub trait Convo: Id + Debug { @@ -28,13 +27,10 @@ pub trait Convo: Id + Debug { /// Decrypts and processes an incoming encrypted frame. /// - /// Returns `Ok(Some(ContentData))` if the frame contains user content, - /// `Ok(None)` for protocol frames (e.g., placeholders), or an error if - /// decryption or frame parsing fails. - fn handle_frame( - &mut self, - enc_payload: EncryptedPayload, - ) -> Result, ChatError>; + /// Returns the [`ConvoOutcome`] describing what the frame produced; its + /// `content` is `None` for protocol-only frames (placeholders, MLS + /// commits). Errors only on decryption or frame-parsing failure. + fn handle_frame(&mut self, enc_payload: EncryptedPayload) -> Result; fn remote_id(&self) -> String; @@ -49,7 +45,3 @@ pub trait GroupConvo: Convo { // sends the payload directly. fn send_content(&mut self, content: &[u8]) -> Result<(), ChatError>; } - -pub enum Conversation { - Private(PrivateV1Convo), -} diff --git a/core/conversations/src/conversation/group_test.rs b/core/conversations/src/conversation/group_test.rs deleted file mode 100644 index e77984f..0000000 --- a/core/conversations/src/conversation/group_test.rs +++ /dev/null @@ -1,41 +0,0 @@ -use crate::{ - conversation::{ChatError, ConversationId, Convo, Id}, - proto::EncryptedPayload, - types::{AddressedEncryptedPayload, ContentData}, -}; - -#[derive(Debug)] -pub struct GroupTestConvo {} - -impl GroupTestConvo { - pub fn new() -> Self { - Self {} - } -} - -impl Id for GroupTestConvo { - fn id(&self) -> ConversationId<'_> { - // implementation - "grouptest" - } -} - -impl Convo for GroupTestConvo { - fn send_message( - &mut self, - _content: &[u8], - ) -> Result, ChatError> { - Ok(vec![]) - } - - fn handle_frame( - &mut self, - _encoded_payload: EncryptedPayload, - ) -> Result, ChatError> { - Ok(None) - } - - fn remote_id(&self) -> String { - self.id().to_string() - } -} diff --git a/core/conversations/src/conversation/group_v1.rs b/core/conversations/src/conversation/group_v1.rs index 4bb704d..ca0c498 100644 --- a/core/conversations/src/conversation/group_v1.rs +++ b/core/conversations/src/conversation/group_v1.rs @@ -20,9 +20,10 @@ use crate::causal_history::CausalHistoryStore; use crate::types::AccountId; use crate::{ DeliveryService, - conversation::{ChatError, ConversationId, Convo, GroupConvo, Id}, + conversation::{ChatError, Convo, GroupConvo, Id}, + outcomes::{Content, ConvoOutcome}, service_traits::KeyPackageProvider, - types::{AddressedEncryptedPayload, ContentData}, + types::AddressedEncryptedPayload, }; /// Provides the identity information needed to participate in an MLS group. @@ -264,7 +265,7 @@ where DS: DeliveryService, KP: KeyPackageProvider, { - fn id(&self) -> ConversationId<'_> { + fn id(&self) -> &str { &self.convo_id } } @@ -306,7 +307,7 @@ where fn handle_frame( &mut self, encoded_payload: EncryptedPayload, - ) -> Result, ChatError> { + ) -> Result { let bytes = match encoded_payload.encryption { Some(encrypted_payload::Encryption::Plaintext(pt)) => pt.payload, _ => { @@ -329,7 +330,7 @@ where if protocol_message.epoch() < self.mls_group.epoch() { // TODO: (P1) Add logging for messages arriving from past epoch. - return Ok(None); + return Ok(ConvoOutcome::empty(self.id().to_string())); } let processed = self @@ -337,27 +338,29 @@ where .process_message(provider, protocol_message) .map_err(ChatError::generic)?; - match processed.into_content() { + let content = match processed.into_content() { ProcessedMessageContent::ApplicationMessage(msg) => { let reliable = ReliablePayload::decode(msg.into_bytes().as_slice())?; self.causal.on_receive(&self.convo_id, &reliable); - Ok(Some(ContentData { - conversation_id: hex::encode(self.mls_group.group_id().as_slice()), - data: reliable.content.to_vec(), - is_new_convo: false, - })) + Some(Content { + bytes: reliable.content.to_vec(), + }) } ProcessedMessageContent::StagedCommitMessage(commit) => { self.mls_group .merge_staged_commit(provider, *commit) .map_err(ChatError::generic)?; - Ok(None) + None } _ => { // TODO: (P2) Log unknown message type - Ok(None) + None } - } + }; + Ok(ConvoOutcome { + convo_id: self.id().to_string(), + content, + }) } fn remote_id(&self) -> String { diff --git a/core/conversations/src/conversation/privatev1.rs b/core/conversations/src/conversation/privatev1.rs index f4f39e5..f631df6 100644 --- a/core/conversations/src/conversation/privatev1.rs +++ b/core/conversations/src/conversation/privatev1.rs @@ -8,16 +8,16 @@ use chat_proto::logoschat::{ }; use crypto::{PrivateKey, PublicKey, SymmetricKey32}; use double_ratchets::{Header, InstallationKeyPair, RatchetState, restore_ratchet_state}; -use prost::{Message, bytes::Bytes}; -use std::{cell::RefCell, fmt::Debug, rc::Rc, sync::Arc}; +use prost::{Message as _, bytes::Bytes}; +use std::{cell::RefCell, fmt::Debug, rc::Rc}; use storage::{ConversationKind, ConversationMeta, ConversationStore}; use crate::{ - context::ConversationIdOwned, conversation::{ChatError, ConversationId, Convo, Id}, errors::EncryptionError, + outcomes::{Content, ConvoOutcome}, proto, - types::{AddressedEncryptedPayload, ContentData}, + types::AddressedEncryptedPayload, utils::timestamp_millis, }; use double_ratchets::{to_ratchet_record, to_skipped_key_records}; @@ -181,17 +181,8 @@ impl PrivateV1Convo { Ok(PrivateV1Frame::decode(content_bytes.as_slice()).unwrap()) } - // Handler for application content - fn handle_content(&self, data: Vec) -> Option { - Some(ContentData { - conversation_id: self.id().into(), - data, - is_new_convo: false, - }) - } - /// Persists a conversation's metadata and ratchet state to DB. - pub fn persist(&mut self) -> Result { + pub fn persist(&mut self) -> Result { let convo_info = ConversationMeta { local_convo_id: self.id().to_string(), remote_convo_id: self.remote_id(), @@ -199,7 +190,7 @@ impl PrivateV1Convo { }; self.store.borrow_mut().save_conversation(&convo_info)?; self.save_ratchet_state(&mut *self.store.borrow_mut())?; - Ok(Arc::from(self.id())) + Ok(self.id().to_string()) } pub fn save_ratchet_state(&self, storage: &mut T) -> Result<(), ChatError> { @@ -208,10 +199,16 @@ impl PrivateV1Convo { storage.save_ratchet_state(&self.local_convo_id, &record, &skipped_keys)?; Ok(()) } + + fn handle_content(&self, bytes: Bytes) -> Content { + Content { + bytes: bytes.into(), + } + } } impl Id for PrivateV1Convo { - fn id(&self) -> ConversationId<'_> { + fn id(&self) -> &str { &self.local_convo_id } } @@ -241,7 +238,7 @@ impl Convo for PrivateV1Convo { fn handle_frame( &mut self, encoded_payload: EncryptedPayload, - ) -> Result, ChatError> { + ) -> Result { // Extract expected frame let frame = self .decrypt(encoded_payload) @@ -253,13 +250,14 @@ impl Convo for PrivateV1Convo { self.save_ratchet_state(&mut *self.store.borrow_mut())?; - // Handle FrameTypes - let output = match frame_type { - FrameType::Content(bytes) => self.handle_content(bytes.into()), + let content = match frame_type { + FrameType::Content(bytes) => Some(self.handle_content(bytes)), FrameType::Placeholder(_) => None, }; - - Ok(output) + Ok(ConvoOutcome { + convo_id: self.id().to_string(), + content, + }) } fn remote_id(&self) -> String { diff --git a/core/conversations/src/inbox/handler.rs b/core/conversations/src/inbox/handler.rs index ca02240..a19257b 100644 --- a/core/conversations/src/inbox/handler.rs +++ b/core/conversations/src/inbox/handler.rs @@ -10,11 +10,12 @@ use storage::{ConversationStore, EphemeralKeyStore, RatchetStore}; use crypto::{PrekeyBundle, SymmetricKey32}; use crate::context::Introduction; -use crate::conversation::{ChatError, Conversation, ConversationId, Convo, Id, PrivateV1Convo}; +use crate::conversation::{ChatError, Convo, Id, PrivateV1Convo}; use crate::crypto::{CopyBytes, PrivateKey, PublicKey}; use crate::inbox::handshake::InboxHandshake; +use crate::outcomes::{ConversationClass, InboxOutcome, NewConversation}; use crate::proto; -use crate::types::{AddressedEncryptedPayload, ContentData}; +use crate::types::AddressedEncryptedPayload; use crypto::Identity; /// Compute the deterministic Delivery_address for an installation @@ -119,14 +120,18 @@ impl Inbox { Ok((convo, payloads)) } - /// Handles an incoming inbox frame. The caller must provide the ephemeral private key - /// looked up from storage. Returns the created conversation and optional content data. + /// Handles an incoming inbox frame. The caller must provide the ephemeral + /// private key hex looked up from storage. Persists the created + /// conversation and consumes the ephemeral key. Returns the + /// [`InboxOutcome`] describing what was observed — for a successful + /// invite, a `new_conversation` and the initial `ConvoOutcome` carrying + /// the first message. pub fn handle_frame( &self, enc_payload: EncryptedPayload, public_key_hex: &str, private_store: Rc>, - ) -> Result<(Conversation, Option), ChatError> { + ) -> Result { let ephemeral_key = self .store .borrow() @@ -143,7 +148,7 @@ impl Inbox { let (seed_key, frame) = self.perform_handshake(&ephemeral_key, header, handshake.payload)?; - match frame.frame_type.unwrap() { + let result = match frame.frame_type.unwrap() { proto::inbox_v1_frame::FrameType::InvitePrivateV1(_invite_private_v1) => { let mut convo = PrivateV1Convo::new_responder(private_store, seed_key, &ephemeral_key); @@ -152,18 +157,31 @@ impl Inbox { return Err(ChatError::Protocol("missing initial encpayload".into())); }; - // Set is_new_convo for content data - let content = match convo.handle_frame(enc_payload)? { - Some(v) => ContentData { - is_new_convo: true, - ..v - }, - None => return Err(ChatError::Protocol("expected contentData".into())), - }; + let initial = convo.handle_frame(enc_payload)?; + if initial.content.is_none() { + return Err(ChatError::Protocol( + "expected initial message in invite".into(), + )); + } - Ok((Conversation::Private(convo), Some(content))) + let new_conversation = NewConversation { + convo_id: initial.convo_id.clone(), + class: ConversationClass::Private, + }; + convo.persist()?; + + InboxOutcome { + new_conversation, + initial: Some(initial), + } } - } + }; + + self.store + .borrow_mut() + .remove_ephemeral_key(public_key_hex)?; + + Ok(result) } /// Extracts the ephemeral key hex from an incoming encrypted payload @@ -250,7 +268,7 @@ impl Inbox { } impl Id for Inbox { - fn id(&self) -> ConversationId<'_> { + fn id(&self) -> &str { &self.local_convo_id } } diff --git a/core/conversations/src/inbox_v2.rs b/core/conversations/src/inbox_v2.rs index fd1080e..affcddb 100644 --- a/core/conversations/src/inbox_v2.rs +++ b/core/conversations/src/inbox_v2.rs @@ -7,6 +7,7 @@ use openmls::prelude::*; use openmls_libcrux_crypto::Provider as LibcruxProvider; use prost::{Message, Oneof}; use storage::ChatStore; +use storage::ConversationKind; use storage::ConversationMeta; use crate::AddressedEnvelope; @@ -16,9 +17,11 @@ use crate::RegistrationService; use crate::account::LogosAccount; use crate::causal_history::CausalHistoryStore; use crate::causal_history::MissingMessage; +use crate::conversation::ConversationId; use crate::conversation::GroupConvo; use crate::conversation::group_v1::MlsContext; -use crate::conversation::{GroupV1Convo, IdentityProvider}; +use crate::conversation::{GroupV1Convo, Id, IdentityProvider}; +use crate::outcomes::{ConversationClass, InboxOutcome, NewConversation}; use crate::types::AccountId; use crate::utils::{blake2b_hex, hash_size}; pub struct PqMlsContext { @@ -152,7 +155,7 @@ where ) } - pub fn handle_frame(&self, payload_bytes: &[u8]) -> Result<(), ChatError> { + pub fn handle_frame(&self, payload_bytes: &[u8]) -> Result { let inbox_frame = InboxV2Frame::decode(payload_bytes)?; let Some(payload) = inbox_frame.payload else { @@ -172,14 +175,14 @@ where let meta = ConversationMeta { local_convo_id: convo.id().to_string(), remote_convo_id: "0".into(), - kind: storage::ConversationKind::GroupV1, + kind: ConversationKind::GroupV1, }; self.store.borrow_mut().save_conversation(&meta)?; // TODO: (P1) Persist state Ok(()) } - fn handle_heavy_invite(&self, invite: GroupV1HeavyInvite) -> Result<(), ChatError> { + fn handle_heavy_invite(&self, invite: GroupV1HeavyInvite) -> Result { let (msg_in, _rest) = MlsMessageIn::tls_deserialize_bytes(invite.welcome_bytes.as_slice())?; let MlsMessageBodyIn::Welcome(welcome) = msg_in.extract() else { @@ -197,7 +200,15 @@ where self.causal.clone(), welcome, )?; - self.persist_convo(convo) + let convo_id: ConversationId = convo.id().to_string(); + self.persist_convo(convo)?; + Ok(InboxOutcome { + new_conversation: NewConversation { + convo_id, + class: ConversationClass::Group, + }, + initial: None, + }) } fn create_keypackage(&self) -> Result { diff --git a/core/conversations/src/lib.rs b/core/conversations/src/lib.rs index 3439907..7bf1c82 100644 --- a/core/conversations/src/lib.rs +++ b/core/conversations/src/lib.rs @@ -6,6 +6,7 @@ mod crypto; mod errors; mod inbox; mod inbox_v2; +mod outcomes; mod proto; mod service_traits; mod types; @@ -15,9 +16,13 @@ pub use account::LogosAccount; pub use causal_history::MissingMessage; pub use chat_sqlite::ChatStorage; pub use chat_sqlite::StorageConfig; -pub use context::{Context, ConversationId, ConversationIdOwned, Introduction}; +pub use context::{Context, ConversationId, Introduction}; pub use conversation::GroupConvo; pub use errors::ChatError; +pub use outcomes::{ + Content, ConversationClass, ConvoOutcome, InboxOutcome, NewConversation, PayloadOutcome, +}; pub use service_traits::{DeliveryService, IdentityProvider, RegistrationService}; -pub use types::{AccountId, AddressedEnvelope, ContentData}; +pub use storage::ConversationKind; +pub use types::{AccountId, AddressedEnvelope}; pub use utils::hex_trunc; diff --git a/core/conversations/src/outcomes.rs b/core/conversations/src/outcomes.rs new file mode 100644 index 0000000..3209da8 --- /dev/null +++ b/core/conversations/src/outcomes.rs @@ -0,0 +1,81 @@ +//! Observations a single inbound payload produces. +//! +//! - [`ConvoOutcome`] — an optional [`Content`] on a single existing +//! conversation. +//! - [`InboxOutcome`] — a newly observed conversation, optionally with an +//! initial [`ConvoOutcome`]. +//! - [`PayloadOutcome`] — the union of the above, plus `Empty`. + +use storage::ConversationKind; + +use crate::conversation::ConversationId; + +#[derive(Debug, Clone)] +pub struct Content { + pub bytes: Vec, +} + +#[derive(Debug, Clone)] +pub struct ConvoOutcome { + pub convo_id: ConversationId, + pub content: Option, +} + +impl ConvoOutcome { + pub fn empty(convo_id: ConversationId) -> Self { + Self { + convo_id, + content: None, + } + } +} + +#[derive(Debug, Clone)] +pub struct NewConversation { + pub convo_id: ConversationId, + pub class: ConversationClass, +} + +#[derive(Debug, Clone)] +pub struct InboxOutcome { + pub new_conversation: NewConversation, + pub initial: Option, +} + +#[derive(Debug, Clone, Default)] +pub enum PayloadOutcome { + #[default] + Empty, + Convo(ConvoOutcome), + Inbox(InboxOutcome), +} + +impl From for PayloadOutcome { + fn from(c: ConvoOutcome) -> Self { + Self::Convo(c) + } +} + +impl From for PayloadOutcome { + fn from(i: InboxOutcome) -> Self { + Self::Inbox(i) + } +} + +/// Stable across protocol versions of the same conversation shape. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConversationClass { + Private, + Group, +} + +impl ConversationClass { + /// `Unknown(_)` yields `None`. + pub fn from_kind(kind: &ConversationKind) -> Option { + match kind { + ConversationKind::PrivateV1 => Some(Self::Private), + ConversationKind::GroupV1 => Some(Self::Group), + ConversationKind::Unknown(_) => None, + } + } +} diff --git a/core/conversations/src/types.rs b/core/conversations/src/types.rs index 9564234..b6c5bfd 100644 --- a/core/conversations/src/types.rs +++ b/core/conversations/src/types.rs @@ -48,15 +48,6 @@ impl Debug for AddressedEnvelope { } } -// This struct represents the result of processed inbound data. -// It wraps content payload with a conversation_id -#[derive(Debug)] -pub struct ContentData { - pub conversation_id: String, - pub data: Vec, - pub is_new_convo: bool, -} - // Internal type Definitions // Used by Conversations to attach addresses to outbound encrypted payloads diff --git a/core/integration_tests_core/tests/mls_integration.rs b/core/integration_tests_core/tests/mls_integration.rs index 03119e7..7330cf9 100644 --- a/core/integration_tests_core/tests/mls_integration.rs +++ b/core/integration_tests_core/tests/mls_integration.rs @@ -1,38 +1,61 @@ use std::ops::{Deref, DerefMut}; use components::{EphemeralRegistry, LocalBroadcaster, MemStore}; -use libchat::{ContentData, Context, GroupConvo, hex_trunc}; +use libchat::{ + Content, Context, ConversationClass, ConvoOutcome, GroupConvo, NewConversation, PayloadOutcome, + hex_trunc, +}; + +type ResultCallback = Box; // Simple client Functionality for testing struct Client { inner: Context, - on_content: Option>, + on_result: Option, + new_conversations: Vec, + received_messages: Vec<(libchat::ConversationId, Content)>, } impl Client { fn init( ctx: Context, - cb: Option, + cb: Option, ) -> Self { Client { inner: ctx, - on_content: cb.map(|f| Box::new(f) as Box), + on_result: cb.map(|f| Box::new(f) as ResultCallback), + new_conversations: Vec::new(), + received_messages: Vec::new(), } } fn process_messages(&mut self) { - let messages: Vec<_> = { + let payloads: Vec<_> = { let mut ds = self.ds(); std::iter::from_fn(|| ds.poll()).collect() }; - for data in messages { - let res = self.handle_payload(&data).unwrap(); - if let Some(cb) = &self.on_content - && let Some(content_data) = res - { - cb(content_data); + for data in payloads { + let result = self.handle_payload(&data).unwrap(); + if let Some(cb) = &self.on_result { + cb(&result); } + match result { + PayloadOutcome::Empty => {} + PayloadOutcome::Convo(co) => self.absorb_convo_outcome(co), + PayloadOutcome::Inbox(io) => { + self.new_conversations.push(io.new_conversation); + if let Some(initial) = io.initial { + self.absorb_convo_outcome(initial); + } + } + } + } + } + + fn absorb_convo_outcome(&mut self, outcome: ConvoOutcome) { + if let Some(content) = outcome.content { + self.received_messages.push((outcome.convo_id, content)); } } @@ -60,15 +83,32 @@ impl DerefMut for Client { } // Higher order function to handle printing -fn pretty_print(prefix: impl Into) -> Box { +fn pretty_print(prefix: impl Into) -> ResultCallback { let prefix = prefix.into(); - Box::new(move |c: ContentData| { - let cid = hex_trunc(c.conversation_id.as_bytes()); - let content = String::from_utf8(c.data).unwrap(); - println!("{} ({:?}) {}", prefix, cid, content) + Box::new(move |result: &PayloadOutcome| match result { + PayloadOutcome::Empty => {} + PayloadOutcome::Inbox(io) => { + let cid = hex_trunc(io.new_conversation.convo_id.as_bytes()); + println!( + "{prefix} ({cid:?}) [conversation started: {:?}]", + io.new_conversation.class + ); + if let Some(initial) = &io.initial { + print_contents(&prefix, initial); + } + } + PayloadOutcome::Convo(co) => print_contents(&prefix, co), }) } +fn print_contents(prefix: &str, outcome: &ConvoOutcome) { + let cid = hex_trunc(outcome.convo_id.as_bytes()); + if let Some(content) = &outcome.content { + let text = String::from_utf8_lossy(&content.bytes); + println!("{prefix} ({cid:?}) {text}"); + } +} + fn process(clients: &mut Vec) { for client in clients { client.process_messages(); @@ -95,21 +135,33 @@ fn create_group() { let raya_id = clients[RAYA].account_id().clone(); let s_convo = clients[SARO].create_group_convo(&[&raya_id]).unwrap(); - let convo_id = s_convo.id(); + let convo_id = s_convo.id().to_string(); // Raya can read this message because // 1) It was sent after add_members was committed, and // 2) LocalBroadcaster provides historical messages. clients[SARO] - .convo(convo_id) + .convo(&convo_id) .send_content(b"ok who broke the group chat again") .unwrap(); process(&mut clients); + // Raya should observe exactly one new Group conversation from the + // welcome, even though no initial content arrives with it. + let raya_started = clients[RAYA] + .new_conversations + .iter() + .filter(|nc| matches!(nc.class, ConversationClass::Group)) + .count(); + assert_eq!( + raya_started, 1, + "Raya should have observed exactly one new Group conversation for the welcome" + ); + clients[RAYA] - .convo(convo_id) + .convo(&convo_id) .send_content(b"it was literally working five minutes ago") .unwrap(); @@ -121,21 +173,31 @@ fn create_group() { let pax_id = clients[PAX].account_id().clone(); clients[SARO] - .convo(convo_id) + .convo(&convo_id) .add_member(&[&pax_id]) .unwrap(); process(&mut clients); + let pax_started = clients[PAX] + .new_conversations + .iter() + .filter(|nc| matches!(nc.class, ConversationClass::Group)) + .count(); + assert_eq!( + pax_started, 1, + "Pax should have observed exactly one new Group conversation for the welcome" + ); + clients[PAX] - .convo(convo_id) + .convo(&convo_id) .send_content(b"ngl the key rotation is cooked") .unwrap(); process(&mut clients); clients[SARO] - .convo(convo_id) + .convo(&convo_id) .send_content(b"bro we literally just added you to the group ") .unwrap(); diff --git a/core/integration_tests_core/tests/private_integration.rs b/core/integration_tests_core/tests/private_integration.rs index 165ba76..3dd9f3f 100644 --- a/core/integration_tests_core/tests/private_integration.rs +++ b/core/integration_tests_core/tests/private_integration.rs @@ -1,5 +1,5 @@ use chat_sqlite::{ChatStorage, StorageConfig}; -use libchat::{Context, Introduction}; +use libchat::{Context, ConversationClass, Introduction, PayloadOutcome}; use storage::{ConversationStore, IdentityStore}; use tempfile::tempdir; @@ -13,12 +13,14 @@ fn send_and_verify( ) { let payloads = sender.send_content(convo_id, content).unwrap(); let payload = payloads.first().unwrap(); - let received = receiver - .handle_payload(&payload.data) - .unwrap() - .expect("expected content"); - assert_eq!(content, received.data.as_slice()); - assert!(!received.is_new_convo); + let result = receiver.handle_payload(&payload.data).unwrap(); + let PayloadOutcome::Convo(co) = result else { + panic!("steady-state send should yield PayloadOutcome::Convo, got {result:?}"); + }; + let content_out = co + .content + .expect("steady-state send should yield one content"); + assert_eq!(content, content_out.bytes.as_slice()); } #[test] @@ -38,16 +40,23 @@ fn ctx_integration() { let mut content = vec![10]; let (saro_convo_id, payloads) = saro.create_private_convo(&intro, &content).unwrap(); - // Raya receives initial message + // Raya receives the invite + initial message let payload = payloads.first().unwrap(); - let initial_content = raya - .handle_payload(&payload.data) - .unwrap() - .expect("expected initial content"); - - let raya_convo_id = initial_content.conversation_id; - assert_eq!(content, initial_content.data); - assert!(initial_content.is_new_convo); + let initial = raya.handle_payload(&payload.data).unwrap(); + let PayloadOutcome::Inbox(io) = initial else { + panic!("invite must yield PayloadOutcome::Inbox, got {initial:?}"); + }; + assert!(matches!( + io.new_conversation.class, + ConversationClass::Private + )); + let initial_co = io.initial.expect("invite must include initial content"); + assert_eq!(io.new_conversation.convo_id, initial_co.convo_id); + let initial_content = initial_co + .content + .expect("invite must include initial message"); + assert_eq!(content, initial_content.bytes); + let raya_convo_id = io.new_conversation.convo_id.clone(); // Exchange messages back and forth for _ in 0..10 { @@ -107,8 +116,14 @@ fn conversation_metadata_persistence() { let (_, payloads) = bob.create_private_convo(&intro, b"hi").unwrap(); let payload = payloads.first().unwrap(); - let content = alice.handle_payload(&payload.data).unwrap().unwrap(); - assert!(content.is_new_convo); + let result = alice.handle_payload(&payload.data).unwrap(); + let PayloadOutcome::Inbox(io) = result else { + panic!("invite must yield PayloadOutcome::Inbox, got {result:?}"); + }; + assert!(matches!( + io.new_conversation.class, + ConversationClass::Private + )); let convos = alice.store().load_conversations().unwrap(); assert_eq!(convos.len(), 1); @@ -128,16 +143,27 @@ fn conversation_full_flow() { let (bob_convo_id, payloads) = bob.create_private_convo(&intro, b"hello").unwrap(); let payload = payloads.first().unwrap(); - let content = alice.handle_payload(&payload.data).unwrap().unwrap(); - let alice_convo_id = content.conversation_id; + let result = alice.handle_payload(&payload.data).unwrap(); + let PayloadOutcome::Inbox(io) = result else { + panic!("invite must yield PayloadOutcome::Inbox, got {result:?}"); + }; + let alice_convo_id = io.new_conversation.convo_id.clone(); let payloads = alice.send_content(&alice_convo_id, b"reply 1").unwrap(); let payload = payloads.first().unwrap(); - bob.handle_payload(&payload.data).unwrap().unwrap(); + let result = bob.handle_payload(&payload.data).unwrap(); + assert_eq!( + expect_convo(result).content.expect("message content").bytes, + b"reply 1" + ); let payloads = bob.send_content(&bob_convo_id, b"reply 2").unwrap(); let payload = payloads.first().unwrap(); - alice.handle_payload(&payload.data).unwrap().unwrap(); + let result = alice.handle_payload(&payload.data).unwrap(); + assert_eq!( + expect_convo(result).content.expect("message content").bytes, + b"reply 2" + ); // Verify conversation list let convo_ids = alice.list_conversations().unwrap(); @@ -146,18 +172,25 @@ fn conversation_full_flow() { // Continue exchanging messages let payloads = bob.send_content(&bob_convo_id, b"more messages").unwrap(); let payload = payloads.first().unwrap(); - let content = alice - .handle_payload(&payload.data) - .expect("should decrypt") - .expect("should have content"); - assert_eq!(content.data, b"more messages"); + let result = alice.handle_payload(&payload.data).expect("should decrypt"); + assert_eq!( + expect_convo(result).content.expect("message content").bytes, + b"more messages" + ); // Alice can also send back let payloads = alice.send_content(&alice_convo_id, b"alice reply").unwrap(); let payload = payloads.first().unwrap(); - let content = bob - .handle_payload(&payload.data) - .unwrap() - .expect("bob should receive"); - assert_eq!(content.data, b"alice reply"); + let result = bob.handle_payload(&payload.data).unwrap(); + assert_eq!( + expect_convo(result).content.expect("message content").bytes, + b"alice reply" + ); +} + +fn expect_convo(result: PayloadOutcome) -> libchat::ConvoOutcome { + match result { + PayloadOutcome::Convo(co) => co, + other => panic!("expected PayloadOutcome::Convo, got {other:?}"), + } } diff --git a/crates/client-ffi/examples/message-exchange/src/main.c b/crates/client-ffi/examples/message-exchange/src/main.c index 2c49861..4929a73 100644 --- a/crates/client-ffi/examples/message-exchange/src/main.c +++ b/crates/client-ffi/examples/message-exchange/src/main.c @@ -84,19 +84,38 @@ static int32_t deliver_cb( /* ------------------------------------------------------------------ * Helper: pop one envelope from the bus and push it into receiver. - * Returns a heap-allocated result; caller frees with - * push_inbound_result_free(). + * Returns a heap-allocated event list; caller frees with + * event_list_free(). * ------------------------------------------------------------------ */ -static PushInboundResult_t *route(ClientHandle_t *receiver) +static EventList_t *route(ClientHandle_t *receiver) { const uint8_t *data; size_t len; int ok = queue_pop(&bus, &data, &len); assert(ok && "expected an envelope in the bus"); - PushInboundResult_t *r = client_receive(receiver, SLICE(data, len)); - assert(push_inbound_result_error_code(r) == 0 && "push_inbound failed"); - return r; + EventList_t *evs = client_receive(receiver, SLICE(data, len)); + assert(event_list_error_code(evs) == 0 && "client_receive failed"); + return evs; +} + +/* ------------------------------------------------------------------ + * Helper: locate the first MessageReceived event in a list and copy + * its content into the caller-supplied buffer. Returns -1 if not found. + * ------------------------------------------------------------------ */ +static int find_message(EventList_t *evs, char *out, size_t out_cap, size_t *out_len) +{ + size_t n = event_list_len(evs); + for (size_t i = 0; i < n; ++i) { + if (event_list_kind_at(evs, i) == EVENT_KIND_MESSAGE_RECEIVED) { + slice_ref_uint8_t s = event_list_content_at(evs, i); + assert(s.len <= out_cap && "content buffer too small"); + memcpy(out, s.ptr, s.len); + *out_len = s.len; + return (int)i; + } + } + return -1; } /* ------------------------------------------------------------------ @@ -125,19 +144,23 @@ int main(void) assert(create_convo_result_error_code(saro_convo) == 0); create_intro_result_free(raya_intro); - /* Route saro -> raya */ - PushInboundResult_t *recv = route(raya); + /* Route saro -> raya: expect [ConversationStarted, MessageReceived] */ + EventList_t *evs = route(raya); + assert(event_list_len(evs) == 2 && "expected 2 events for invite"); + assert(event_list_kind_at(evs, 0) == EVENT_KIND_CONVERSATION_STARTED + && "first event should be ConversationStarted"); + assert(event_list_conversation_class_at(evs, 0) == FFI_CONVERSATION_CLASS_PRIVATE + && "expected Private convo class"); - assert(push_inbound_result_has_content(recv) && "expected content from saro"); - assert(push_inbound_result_is_new_convo(recv) && "expected new-conversation flag"); + char msg[64]; + size_t msg_len; + int idx = find_message(evs, msg, sizeof(msg), &msg_len); + assert(idx >= 0 && "expected MessageReceived from saro"); + assert(msg_len == 10 && memcmp(msg, "hello raya", 10) == 0); + printf("Raya received: \"%.*s\"\n", (int)msg_len, msg); - slice_ref_uint8_t content = push_inbound_result_content(recv); - assert(content.len == 10); - assert(memcmp(content.ptr, "hello raya", 10) == 0); - printf("Raya received: \"%.*s\"\n", (int)content.len, content.ptr); - - /* Copy Raya's convo_id before freeing recv */ - slice_ref_uint8_t cid_ref = push_inbound_result_convo_id(recv); + /* Copy Raya's convo_id from the ConversationStarted event */ + slice_ref_uint8_t cid_ref = event_list_convo_id_at(evs, 0); uint8_t raya_cid[256]; size_t raya_cid_len = cid_ref.len; if (raya_cid_len >= sizeof(raya_cid)) { @@ -145,37 +168,37 @@ int main(void) return 1; } memcpy(raya_cid, cid_ref.ptr, raya_cid_len); - push_inbound_result_free(recv); + event_list_free(evs); /* Raya replies */ ErrorCode_t rc = client_send_message( raya, SLICE(raya_cid, raya_cid_len), STR("hi saro")); assert(rc == ERROR_CODE_NONE); - recv = route(saro); - assert(push_inbound_result_has_content(recv) && "expected content from raya"); - assert(!push_inbound_result_is_new_convo(recv) && "unexpected new-convo flag"); - content = push_inbound_result_content(recv); - assert(content.len == 7); - assert(memcmp(content.ptr, "hi saro", 7) == 0); - printf("Saro received: \"%.*s\"\n", (int)content.len, content.ptr); - push_inbound_result_free(recv); + evs = route(saro); + assert(event_list_len(evs) == 1 && "expected MessageReceived only"); + assert(event_list_kind_at(evs, 0) == EVENT_KIND_MESSAGE_RECEIVED); + idx = find_message(evs, msg, sizeof(msg), &msg_len); + assert(idx >= 0); + assert(msg_len == 7 && memcmp(msg, "hi saro", 7) == 0); + printf("Saro received: \"%.*s\"\n", (int)msg_len, msg); + event_list_free(evs); /* Multiple back-and-forth rounds */ slice_ref_uint8_t saro_cid = create_convo_result_id(saro_convo); for (int i = 0; i < 3; i++) { - char msg[32]; - int mlen = snprintf(msg, sizeof(msg), "msg %d", i); + char text[32]; + int tlen = snprintf(text, sizeof(text), "msg %d", i); - rc = client_send_message(saro, saro_cid, SLICE(msg, (size_t)mlen)); + rc = client_send_message(saro, saro_cid, SLICE(text, (size_t)tlen)); assert(rc == ERROR_CODE_NONE); - recv = route(raya); - assert(push_inbound_result_has_content(recv)); - content = push_inbound_result_content(recv); - assert((int)content.len == mlen); - assert(memcmp(content.ptr, msg, (size_t)mlen) == 0); - push_inbound_result_free(recv); + evs = route(raya); + idx = find_message(evs, msg, sizeof(msg), &msg_len); + assert(idx >= 0); + assert((int)msg_len == tlen); + assert(memcmp(msg, text, (size_t)tlen) == 0); + event_list_free(evs); char reply[32]; int rlen = snprintf(reply, sizeof(reply), "reply %d", i); @@ -184,12 +207,12 @@ int main(void) raya, SLICE(raya_cid, raya_cid_len), SLICE(reply, (size_t)rlen)); assert(rc == ERROR_CODE_NONE); - recv = route(saro); - assert(push_inbound_result_has_content(recv)); - content = push_inbound_result_content(recv); - assert((int)content.len == rlen); - assert(memcmp(content.ptr, reply, (size_t)rlen) == 0); - push_inbound_result_free(recv); + evs = route(saro); + idx = find_message(evs, msg, sizeof(msg), &msg_len); + assert(idx >= 0); + assert((int)msg_len == rlen); + assert(memcmp(msg, reply, (size_t)rlen) == 0); + event_list_free(evs); } /* Cleanup */ diff --git a/crates/client-ffi/src/api.rs b/crates/client-ffi/src/api.rs index d08eab5..fc3ab54 100644 --- a/crates/client-ffi/src/api.rs +++ b/crates/client-ffi/src/api.rs @@ -1,8 +1,7 @@ use safer_ffi::prelude::*; -use std::sync::Arc; use crate::delivery::{CDelivery, DeliverFn}; -use logos_chat::{ChatClient, ClientError}; +use logos_chat::{ChatClient, ClientError, ConversationClass, Event}; // --------------------------------------------------------------------------- // Opaque client handle @@ -21,9 +20,47 @@ pub struct ClientHandle(pub(crate) ChatClient); pub enum ErrorCode { None = 0, BadUtf8 = -1, + /// Failure parsing or processing an introduction bundle. BadIntro = -2, DeliveryFail = -3, UnknownError = -4, + /// Failure decoding, decrypting, or processing an inbound payload. + BadPayload = -5, +} + +// --------------------------------------------------------------------------- +// Event taxonomy (C-side view of Event) +// --------------------------------------------------------------------------- + +#[derive_ReprC] +#[repr(i32)] +#[derive(Clone, Copy)] +pub enum EventKind { + /// Sentinel returned by `event_list_kind_at` for out-of-bounds indices. + /// Never the kind of a real event row. + Invalid = -1, + ConversationStarted = 0, + MessageReceived = 1, +} + +#[derive_ReprC] +#[repr(i32)] +#[derive(Clone, Copy)] +pub enum FfiConversationClass { + /// Sentinel for accessor calls that don't apply to the queried row + /// (out-of-bounds, or a non-`ConversationStarted` event). + Invalid = -1, + Private = 0, + Group = 1, +} + +impl From for FfiConversationClass { + fn from(c: ConversationClass) -> Self { + match c { + ConversationClass::Private => FfiConversationClass::Private, + ConversationClass::Group => FfiConversationClass::Group, + } + } } // --------------------------------------------------------------------------- @@ -44,14 +81,61 @@ pub struct CreateConvoResult { convo_id: Option, } +/// An ordered list of events with a status code. Inspect `error_code` (zero +/// on success) before iterating with `event_list_len` and the indexed +/// accessors. #[derive_ReprC] #[repr(opaque)] -pub struct PushInboundResult { +pub struct EventList { error_code: i32, - has_content: bool, - is_new_convo: bool, - convo_id: Option, - content: Option>, + events: Vec, +} + +enum EventRow { + ConversationStarted { + convo_id: String, + class: FfiConversationClass, + }, + MessageReceived { + convo_id: String, + content: Vec, + }, +} + +impl EventRow { + /// Translate an [`Event`] into the FFI row shape, or `None` for variants + /// without an FFI representation. + fn from_event(event: Event) -> Option { + match event { + Event::ConversationStarted { + convo_id, class, .. + } => Some(EventRow::ConversationStarted { + convo_id: convo_id.to_string(), + class: class.into(), + }), + Event::MessageReceived { + convo_id, content, .. + } => Some(EventRow::MessageReceived { + convo_id: convo_id.to_string(), + content, + }), + _ => None, + } + } + + fn convo_id(&self) -> &str { + match self { + EventRow::ConversationStarted { convo_id, .. } + | EventRow::MessageReceived { convo_id, .. } => convo_id, + } + } + + fn content(&self) -> &[u8] { + match self { + EventRow::MessageReceived { content, .. } => content, + _ => &[], + } + } } // --------------------------------------------------------------------------- @@ -159,7 +243,7 @@ fn client_create_conversation( { Ok(convo_id) => CreateConvoResult { error_code: ErrorCode::None as i32, - convo_id: Some(convo_id.to_string()), + convo_id: Some(convo_id), }, Err(ClientError::Chat(_)) => CreateConvoResult { error_code: ErrorCode::BadIntro as i32, @@ -205,8 +289,7 @@ fn client_send_message( Ok(s) => s, Err(_) => return ErrorCode::BadUtf8, }; - let convo_id_owned: logos_chat::ConversationIdOwned = Arc::from(id_str); - match handle.0.send_message(&convo_id_owned, content.as_slice()) { + match handle.0.send_message(id_str, content.as_slice()) { Ok(()) => ErrorCode::None, Err(ClientError::Delivery(_)) => ErrorCode::DeliveryFail, Err(_) => ErrorCode::UnknownError, @@ -214,72 +297,90 @@ fn client_send_message( } // --------------------------------------------------------------------------- -// Push inbound +// Receive (process inbound, get event list back) // --------------------------------------------------------------------------- -/// Decrypt an inbound payload. `has_content` is false for protocol frames. -/// Free with `push_inbound_result_free`. +/// Decrypt an inbound payload. Returns the events the payload produced; +/// the list may be empty for protocol-only frames. Free with +/// `event_list_free`. #[ffi_export] fn client_receive( handle: &mut ClientHandle, payload: c_slice::Ref<'_, u8>, -) -> repr_c::Box { +) -> repr_c::Box { let result = match handle.0.receive(payload.as_slice()) { - Ok(Some(cd)) => PushInboundResult { + Ok(events) => EventList { error_code: ErrorCode::None as i32, - has_content: true, - is_new_convo: cd.is_new_convo, - convo_id: Some(cd.conversation_id), - content: Some(cd.data), + events: events + .into_iter() + .filter_map(EventRow::from_event) + .collect(), }, - Ok(None) => PushInboundResult { - error_code: ErrorCode::None as i32, - has_content: false, - is_new_convo: false, - convo_id: None, - content: None, + Err(ClientError::Chat(_)) => EventList { + error_code: ErrorCode::BadPayload as i32, + events: Vec::new(), }, - Err(_) => PushInboundResult { - error_code: ErrorCode::UnknownError as i32, - has_content: false, - is_new_convo: false, - convo_id: None, - content: None, + Err(ClientError::Delivery(_)) => EventList { + error_code: ErrorCode::DeliveryFail as i32, + events: Vec::new(), }, }; Box::new(result).into() } #[ffi_export] -fn push_inbound_result_error_code(r: &PushInboundResult) -> i32 { - r.error_code +fn event_list_error_code(list: &EventList) -> i32 { + list.error_code } #[ffi_export] -fn push_inbound_result_has_content(r: &PushInboundResult) -> bool { - r.has_content +fn event_list_len(list: &EventList) -> usize { + list.events.len() +} + +/// Returns `EventKind::Invalid` for out-of-bounds indices. +#[ffi_export] +fn event_list_kind_at(list: &EventList, idx: usize) -> EventKind { + match list.events.get(idx) { + Some(EventRow::ConversationStarted { .. }) => EventKind::ConversationStarted, + Some(EventRow::MessageReceived { .. }) => EventKind::MessageReceived, + None => EventKind::Invalid, + } +} + +/// Returns an empty slice for out-of-bounds indices. +/// The slice is valid only while `list` is alive. +#[ffi_export] +fn event_list_convo_id_at(list: &EventList, idx: usize) -> c_slice::Ref<'_, u8> { + list.events + .get(idx) + .map(|r| r.convo_id().as_bytes()) + .unwrap_or(&[]) + .into() +} + +/// Returns an empty slice for non-`MessageReceived` events or out-of-bounds. +/// The slice is valid only while `list` is alive. +#[ffi_export] +fn event_list_content_at(list: &EventList, idx: usize) -> c_slice::Ref<'_, u8> { + list.events + .get(idx) + .map(EventRow::content) + .unwrap_or(&[]) + .into() +} + +/// Returns `FfiConversationClass::Invalid` for non-`ConversationStarted` +/// events or out-of-bounds. +#[ffi_export] +fn event_list_conversation_class_at(list: &EventList, idx: usize) -> FfiConversationClass { + match list.events.get(idx) { + Some(EventRow::ConversationStarted { class, .. }) => *class, + _ => FfiConversationClass::Invalid, + } } #[ffi_export] -fn push_inbound_result_is_new_convo(r: &PushInboundResult) -> bool { - r.is_new_convo -} - -/// Returns an empty slice when has_content is false. -/// The slice is valid only while `r` is alive. -#[ffi_export] -fn push_inbound_result_convo_id(r: &PushInboundResult) -> c_slice::Ref<'_, u8> { - r.convo_id.as_deref().unwrap_or("").as_bytes().into() -} - -/// Returns an empty slice when has_content is false. -/// The slice is valid only while `r` is alive. -#[ffi_export] -fn push_inbound_result_content(r: &PushInboundResult) -> c_slice::Ref<'_, u8> { - r.content.as_deref().unwrap_or(&[]).into() -} - -#[ffi_export] -fn push_inbound_result_free(r: repr_c::Box) { - drop(r) +fn event_list_free(list: repr_c::Box) { + drop(list) } diff --git a/crates/client/examples/message-exchange/main.rs b/crates/client/examples/message-exchange/main.rs index 2698290..5e46d80 100644 --- a/crates/client/examples/message-exchange/main.rs +++ b/crates/client/examples/message-exchange/main.rs @@ -1,5 +1,4 @@ -use logos_chat::{ChatClient, ConversationIdOwned, InProcessDelivery}; -use std::sync::Arc; +use logos_chat::{ChatClient, ConversationId, Event, InProcessDelivery}; fn main() { let delivery = InProcessDelivery::new(Default::default()); @@ -13,21 +12,29 @@ fn main() { .unwrap(); let raw = cursor.next().unwrap(); - let content = raya.receive(&raw).unwrap().unwrap(); - println!( - "Raya received: {:?}", - std::str::from_utf8(&content.data).unwrap() - ); + let events = raya.receive(&raw).unwrap(); + let raya_convo_id: ConversationId = events + .iter() + .find_map(|e| match e { + Event::ConversationStarted { convo_id, .. } => Some(convo_id.to_string()), + _ => None, + }) + .expect("expected ConversationStarted"); + for event in &events { + if let Event::MessageReceived { content, .. } = event { + println!("Raya received: {:?}", std::str::from_utf8(content).unwrap()); + } + } - let raya_convo_id: ConversationIdOwned = Arc::from(content.conversation_id.as_str()); raya.send_message(&raya_convo_id, b"hi saro").unwrap(); let raw = cursor.next().unwrap(); - let content = saro.receive(&raw).unwrap().unwrap(); - println!( - "Saro received: {:?}", - std::str::from_utf8(&content.data).unwrap() - ); + let events = saro.receive(&raw).unwrap(); + for event in &events { + if let Event::MessageReceived { content, .. } = event { + println!("Saro received: {:?}", std::str::from_utf8(content).unwrap()); + } + } println!("Message exchange complete."); } diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 95a36af..f63247b 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -1,11 +1,14 @@ +use std::sync::Arc; + use libchat::{ - AddressedEnvelope, ChatError, ChatStorage, ContentData, Context, ConversationIdOwned, - DeliveryService, Introduction, StorageConfig, + AddressedEnvelope, ChatError, ChatStorage, Context, ConversationId, ConvoOutcome, + DeliveryService, InboxOutcome, Introduction, PayloadOutcome, StorageConfig, }; use components::EphemeralRegistry; use crate::errors::ClientError; +use crate::event::Event; pub struct ChatClient { ctx: Context, @@ -52,7 +55,7 @@ impl ChatClient { &mut self, intro_bundle: &[u8], initial_content: &[u8], - ) -> Result> { + ) -> Result> { let intro = Introduction::try_from(intro_bundle)?; let (convo_id, envelopes) = self.ctx.create_private_convo(&intro, initial_content)?; self.dispatch_all(envelopes)?; @@ -60,27 +63,25 @@ impl ChatClient { } /// List all conversation IDs known to this client. - pub fn list_conversations(&self) -> Result, ClientError> { + pub fn list_conversations(&self) -> Result, ClientError> { self.ctx.list_conversations().map_err(Into::into) } /// Encrypt `content` and dispatch all outbound envelopes. pub fn send_message( &mut self, - convo_id: &ConversationIdOwned, + convo_id: &str, content: &[u8], ) -> Result<(), ClientError> { - let envelopes = self.ctx.send_content(convo_id.as_ref(), content)?; + let envelopes = self.ctx.send_content(convo_id, content)?; self.dispatch_all(envelopes) } - /// Decrypt an inbound payload. Returns `Some(ContentData)` for user - /// content, `None` for protocol frames. - pub fn receive( - &mut self, - payload: &[u8], - ) -> Result, ClientError> { - self.ctx.handle_payload(payload).map_err(Into::into) + /// Decrypt an inbound payload. Returns the events the payload produced, + /// in causal order. May be empty for protocol-only frames. + pub fn receive(&mut self, payload: &[u8]) -> Result, ClientError> { + let result = self.ctx.handle_payload(payload)?; + Ok(events_from_inbound(result)) } fn dispatch_all( @@ -94,3 +95,46 @@ impl ChatClient { Ok(()) } } + +/// Walk an [`PayloadOutcome`] in causal order and emit one `Event` per +/// observation. For an `Inbox` outcome, [`Event::ConversationStarted`] +/// precedes the message event. The convo id is wrapped into `Arc` once +/// per outcome and shared across the events it produces. +fn events_from_inbound(result: PayloadOutcome) -> Vec { + match result { + PayloadOutcome::Empty => Vec::new(), + PayloadOutcome::Convo(co) => convo_events(co), + PayloadOutcome::Inbox(io) => inbox_events(io), + } +} + +fn convo_events(outcome: ConvoOutcome) -> Vec { + let ConvoOutcome { convo_id, content } = outcome; + content + .map(|c| Event::MessageReceived { + convo_id: Arc::from(convo_id), + content: c.bytes, + }) + .into_iter() + .collect() +} + +fn inbox_events(outcome: InboxOutcome) -> Vec { + let InboxOutcome { + new_conversation, + initial, + } = outcome; + let id: Arc = Arc::from(new_conversation.convo_id); + let mut events = Vec::with_capacity(2); + events.push(Event::ConversationStarted { + convo_id: Arc::clone(&id), + class: new_conversation.class, + }); + if let Some(c) = initial.and_then(|co| co.content) { + events.push(Event::MessageReceived { + convo_id: Arc::clone(&id), + content: c.bytes, + }); + } + events +} diff --git a/crates/client/src/event.rs b/crates/client/src/event.rs new file mode 100644 index 0000000..77dde51 --- /dev/null +++ b/crates/client/src/event.rs @@ -0,0 +1,27 @@ +//! Application-facing chat events. +//! +//! Each variant of [`Event`] describes one observable thing the application +//! cares about: a new conversation has appeared, a message was decrypted on +//! an existing one, and so on. The enum is `#[non_exhaustive]` so new +//! variants can be added without breaking exhaustive matches in dependent +//! crates. + +use std::sync::Arc; + +use libchat::ConversationClass; + +/// A discrete chat event. +#[non_exhaustive] +#[derive(Debug, Clone)] +pub enum Event { + /// A new conversation has appeared. + ConversationStarted { + convo_id: Arc, + class: ConversationClass, + }, + /// User content arrived on an existing conversation. + MessageReceived { + convo_id: Arc, + content: Vec, + }, +} diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index a0cac6f..f5d7e4a 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -1,12 +1,14 @@ mod client; mod delivery_in_process; mod errors; +mod event; pub use client::ChatClient; pub use delivery_in_process::{Cursor, InProcessDelivery, MessageBus}; pub use errors::ClientError; +pub use event::Event; -// Re-export types callers need to interact with ChatClient +// Re-export types callers need to interact with ChatClient. pub use libchat::{ - AddressedEnvelope, ContentData, ConversationIdOwned, DeliveryService, StorageConfig, + AddressedEnvelope, ConversationClass, ConversationId, DeliveryService, StorageConfig, }; diff --git a/crates/client/tests/saro_and_raya.rs b/crates/client/tests/saro_and_raya.rs index 4aba9a7..abef30b 100644 --- a/crates/client/tests/saro_and_raya.rs +++ b/crates/client/tests/saro_and_raya.rs @@ -1,14 +1,29 @@ use logos_chat::{ - ChatClient, ContentData, ConversationIdOwned, Cursor, InProcessDelivery, StorageConfig, + ChatClient, ConversationClass, ConversationId, Cursor, Event, InProcessDelivery, StorageConfig, }; -use std::sync::Arc; -fn receive(receiver: &mut ChatClient, cursor: &mut Cursor) -> ContentData { +/// Pulls one envelope, decrypts, and returns the events emitted. +fn receive(receiver: &mut ChatClient, cursor: &mut Cursor) -> Vec { let raw = cursor.next().expect("expected envelope"); - receiver - .receive(&raw) - .expect("receive failed") - .expect("expected content") + receiver.receive(&raw).expect("receive failed") +} + +fn expect_message(event: &Event) -> (&str, &[u8]) { + match event { + Event::MessageReceived { + convo_id, content, .. + } => (convo_id.as_ref(), content.as_slice()), + other => panic!("expected MessageReceived, got {other:?}"), + } +} + +fn expect_conversation_started(event: &Event) -> (&str, ConversationClass) { + match event { + Event::ConversationStarted { + convo_id, class, .. + } => (convo_id.as_ref(), *class), + other => panic!("expected ConversationStarted, got {other:?}"), + } } #[test] @@ -24,27 +39,39 @@ fn saro_raya_message_exchange() { .create_conversation(&raya_bundle, b"hello raya") .unwrap(); - let content = receive(&mut raya, &mut cursor); - assert_eq!(content.data, b"hello raya"); - assert!(content.is_new_convo); - - let raya_convo_id: ConversationIdOwned = Arc::from(content.conversation_id.as_str()); + let events = receive(&mut raya, &mut cursor); + assert_eq!( + events.len(), + 2, + "expected ConversationStarted + MessageReceived" + ); + let (started_id, class) = expect_conversation_started(&events[0]); + assert_eq!(class, ConversationClass::Private); + let (msg_id, content) = expect_message(&events[1]); + assert_eq!(content, b"hello raya"); + assert_eq!(started_id, msg_id); + let raya_convo_id: ConversationId = started_id.to_owned(); raya.send_message(&raya_convo_id, b"hi saro").unwrap(); - let content = receive(&mut saro, &mut cursor); - assert_eq!(content.data, b"hi saro"); - assert!(!content.is_new_convo); + let events = receive(&mut saro, &mut cursor); + assert_eq!(events.len(), 1); + let (_, content) = expect_message(&events[0]); + assert_eq!(content, b"hi saro"); for i in 0u8..5 { let msg = format!("msg {i}"); saro.send_message(&saro_convo_id, msg.as_bytes()).unwrap(); - let content = receive(&mut raya, &mut cursor); - assert_eq!(content.data, msg.as_bytes()); + let events = receive(&mut raya, &mut cursor); + assert_eq!(events.len(), 1); + let (_, content) = expect_message(&events[0]); + assert_eq!(content, msg.as_bytes()); let reply = format!("reply {i}"); raya.send_message(&raya_convo_id, reply.as_bytes()).unwrap(); - let content = receive(&mut saro, &mut cursor); - assert_eq!(content.data, reply.as_bytes()); + let events = receive(&mut saro, &mut cursor); + assert_eq!(events.len(), 1); + let (_, content) = expect_message(&events[0]); + assert_eq!(content, reply.as_bytes()); } assert_eq!(saro.list_conversations().unwrap().len(), 1); diff --git a/docs/adr/0001-client-event-system.md b/docs/adr/0001-client-event-system.md index b3f28a1..636f79e 100644 --- a/docs/adr/0001-client-event-system.md +++ b/docs/adr/0001-client-event-system.md @@ -5,272 +5,57 @@ | Status | Accepted | | Issue | https://github.com/logos-messaging/libchat/issues/97 | | Date | 2026-05-19 | +| Last revised | 2026-05-28 | ## Context and Problem -Applications currently learn about new conversations from an `is_new_convo: bool` flag on `ContentData` (`core/conversations/src/types.rs:16-20`). Two problems: +Applications must observe several kinds of things produced by the chat library: new conversations appearing from peer-initiated handshakes, decrypted messages on existing conversations, and further protocol observations (group membership changes, reliability signals). These observations are not coupled — an MLS group welcome creates a new conversation with no initial content; a single inbound payload can yield multiple observations; some observations (delivery timeouts from background retry work) have no synchronous trigger at all and must reach the application after the call that might have caused them has long since returned. -1. The flag overloads `ContentData`: protocol metadata is smuggled through a content carrier. -2. The flag assumes every new conversation carries an initial content frame. Protocols such as MLS allow a conversation to begin without one; in that case `handle_payload` returns `None` and the application never observes the new conversation. - -Issue #97 calls for a proper event system that can signal new conversations, delivery receipts, and reliability failures — without piggy-backing on content — and that provides a clear path for adding new event types later. - -This ADR specifies the layered design of the event system and how events reach the application. +Issue #97 captures the requirement for an observation surface that does not piggy-back on content, accommodates both sync-triggered and background-triggered observations uniformly, and crosses the FFI boundary cleanly. ## Decision Drivers -- **Simplicity of the core.** Fully synchronous and caller-driven: no background work, no callbacks out. External effects are performed through services injected as method parameters. -- **Extensibility.** A new event type is a localised change (one enum variant, one emit site) that does not break existing consumers. -- **FFI compatibility.** Must remain expressible through the existing `safer-ffi` boundary in `crates/client-ffi`. Event payloads are limited to owned, concrete data (bytes, strings, identifiers) — no closures, generics, or non-`'static` references. +- **Simplicity of the core.** Fully synchronous and caller-driven: no background work, no callbacks out. External effects flow through services injected as method parameters. +- **Asynchronous delivery at the client.** Applications consume events on their own schedule. Observations from sync-triggered processing and observations from background work share a single delivery surface, so the application sees one notification stream and does not care which path produced any given event. +- **FFI compatibility.** Payloads crossing the `safer-ffi` boundary in `crates/client-ffi` are limited to owned, concrete data — no closures, generics, or non-`'static` references — so any delivery mechanism must degrade to a sync drain on that side. ## Architecture -The library is organised in three layers. Calls flow downward; events flow upward. +Three layers. Calls flow downward. Sync results return through method returns; events reach the application asynchronously through a channel. ```mermaid flowchart TB - A["app
UI/UX layer
drives the event loop"] - B["client
owns services
runs background threads"] - C["core
strict sync, caller-driven"] + A["app
drains Receiver<Event>"] + B["client
owns transport poller + services
translates PayloadOutcome → Event values
pushes onto channel"] + C["core
strict sync, caller-driven
returns PayloadOutcome"] A -- "method calls" --> B B -- "method calls" --> C - C -.->|"events (from method returns)"| B - B -.->|"events (sync + background)"| A + C -.->|"PayloadOutcome
(sync method return)"| B + B == "Event (async channel)" ==> A ``` Crates: **app** — `bin/chat-cli`, future `logos-chat-module`; **client** — `crates/client`, `crates/client-ffi`; **core** — `core/conversations` and friends in libchat. -## Design - -### Core layer - -#### Constraints - -- Strict sync, single-threaded. -- No background work, timers, or internal queues. -- External effects (delivery; future registration / identity lookups) are performed through services injected as method parameters. - -#### Approach - -Methods receive the services they need and call them directly. Observations (events) are returned so the caller can surface them upward: - -```rust -impl Context { - pub fn handle_payload( - &mut self, - delivery: &mut D, - payload: &[u8], - ) -> Result, ChatError>; - - pub fn send_content( - &mut self, - delivery: &mut D, - convo: ConversationId, - content: &[u8], - ) -> Result, ChatError>; - - pub fn create_private_convo( - &mut self, - delivery: &mut D, - intro: &Introduction, - content: &[u8], - ) -> Result<(ConversationIdOwned, Vec), ChatError>; -} -``` - -### Client layer - -#### Responsibility split - -The client owns the concrete service implementations (delivery, future registration, identity), polls the transport on a background thread, and processes inbound bytes by calling into the core. The application invokes client methods and consumes events; raw transport bytes (encrypted envelopes off the wire) are handled entirely inside the client. - -#### Constraints - -- Owns the concrete service implementations and injects them into core method calls. -- Events from synchronous calls flow through the method's return type, inherited from the core. -- Polls the transport on a background thread and feeds inbound payloads into the core. -- May spawn additional background threads (e.g. for timer-driven retries). -- Background threads emit events that no caller-invoked method can return — for example `DeliveryFailed { reason: Timeout }`. - -#### Common shape (all options) - -The client invokes core methods with its services; the core publishes envelopes directly through the injected delivery service. Only events flow back as return values. - -```rust -impl ChatClient { - pub fn send_message(&mut self, convo: &ConversationIdOwned, content: &[u8]) - -> Result, ClientError>; // sync events from this send - - // Background events (including those from inbound payload processing) reach the - // application through one of the three mechanisms below. -} -``` - -The three options differ only in how background events reach the application. - -#### Option A — internal poll queue - -The client owns a `Mutex>`. Background threads push to it; the application drains via two new methods. - -```rust -impl ChatClient { - pub fn poll_event(&mut self) -> Option; - pub fn drain_events(&mut self) -> Vec; -} -``` - -Prior art: mio's `Events` (per-`Poll` instance, drained by the caller); rdkafka's `Consumer::poll` (background thread fills a queue, caller polls — same domain). - -**Pros** - -- Single primitive (mutex-protected queue) with no new dependencies. -- FFI mapping is direct: `client_poll_event` returns an opaque `Option`, mirroring the existing `PushInboundResult` shape (`crates/client-ffi/src/api.rs:49-55`). -- Matches the existing chat-cli tick-loop consumer pattern (`bin/chat-cli/src/app.rs:144-180`). - -**Cons** - -- Requires the application to drain after every operation; events accumulate if it forgets. -- Adds shared mutable state (`Mutex`) inside the client; the queue must be bounded with explicit overflow handling. - -#### Option B — channel handed to the caller (selected) - -The client's constructor returns a `Receiver` alongside the client handle. Background threads hold a `Sender` clone; the application reads from the receiver. - -```rust -let (client, events): (ChatClient<_>, Receiver) = - ChatClient::new(name, delivery); -``` - -Prior art: most Rust networking libraries; `std::sync::mpsc`, `crossbeam-channel`, `flume`. - -**Pros** - -- Channels are the canonical multi-producer/single-consumer primitive in the standard library; the shape is idiomatic in pure Rust. -- The application can park in `recv()` from a worker thread, integrate with `select!`, or later swap to `tokio::sync::mpsc` for an async wrapper. -- Mirrors the inbound-bytes channel chat-cli already uses (`bin/chat-cli/src/app.rs:46`). - -**Cons** - -- `Receiver` is not `#[repr(C)]` and cannot cross `safer-ffi` cleanly. The FFI layer must expose a drain function regardless, collapsing Option B into Option A at the boundary. -- Forces a channel-crate choice (`std::sync::mpsc`, `crossbeam-channel`, or `flume`). - -#### Option C — callback registered at construction - -The application registers a closure at construction; background threads invoke it directly when events arise. - -```rust -type EventFn = Box; - -impl ChatClient { - pub fn new(name: &str, delivery: D, on_event: EventFn) -> Self; -} -``` - -Prior art: the existing FFI `DeliverFn` callback at `client_create` (`crates/client-ffi/src/delivery.rs:8-15`); `tracing::Subscriber`; GTK signals. - -**Pros** - -- The codebase already establishes this pattern for outbound delivery; events would extend a familiar contract. -- FFI mapping is direct: register an `EventFn` function pointer at `client_create`. -- No internal queue or `Mutex` to maintain. - -**Cons** - -- The callback fires on the background thread. UI-style consumers (ratatui, GUI toolkits) cannot update state from threads other than the main loop thread and will bridge the callback into a thread-local queue — effectively re-implementing Option A in user code. -- The closure must be `Send + 'static`; capturing application state requires `Arc>` or a channel back to the application. -- Sync events arrive on the caller's thread; background events arrive on the background thread. The handler must be correct in both threading contexts, or the callback must forward to the main thread (collapsing into Option A). - -#### Comparison - -| Criterion | A: poll queue | B: channel | C: callback | -|---|---|---|---| -| Background events delivered via | `poll_event` / `drain_events` | `Receiver` | direct `Fn(&Event)` invocation | -| FFI fit (`safer-ffi`) | Native opaque + accessors | Degrades to Option A at the boundary | Native function pointer (matches `DeliverFn`) | -| New dependencies | None | None (with `std::sync::mpsc`); otherwise `crossbeam-channel` or `flume` | None | -| Internal state required | `Mutex>` | Channel internals | None | -| Thread on which the application observes the event | Application thread (next drain) | Application thread (next drain) | Background thread | -| Bridges naturally to UI thread | Yes | Yes | No (requires re-bridging) | -| Backpressure if the application is slow | Client-side queue buffers; bounded with overflow handling | Channel buffers; bound configurable | No buffer; slow callbacks block the background thread | -| Future `Stream` adapter | Wrap `poll_event` in a `Stream` | Swap to async channel (native) | Bridge callback into a channel, then `Stream` | - -### App layer - -The application drives the event loop. With Option B (selected), each tick drains the `Receiver` handed back at client construction: - -```rust -pub fn tick(&mut self) -> Result<()> { - for event in self.events.try_iter() { - self.handle_event(event); - } - Ok(()) -} -``` - -For reference, Option A would replace `self.events.try_iter()` with `self.client.drain_events()`. Option C moves the drain out of the tick — into the callback — and the callback typically forwards into an application-side channel that is drained on each tick anyway. - -## Event Taxonomy - -The same `Event` enum is shared across all three client options. - -```rust -#[derive(Debug, Clone)] -#[non_exhaustive] -pub enum Event { - #[non_exhaustive] - ConversationStarted { - conversation_id: ConversationIdOwned, - }, - #[non_exhaustive] - MessageReceived { - conversation_id: ConversationIdOwned, - data: Vec, - }, - #[non_exhaustive] - DeliveryReceipt { - conversation_id: ConversationIdOwned, - envelope_id: EnvelopeId, - }, - #[non_exhaustive] - DeliveryFailed { - conversation_id: ConversationIdOwned, - envelope_id: EnvelopeId, - reason: FailureReason, - }, -} - -#[derive(Debug, Clone)] -#[non_exhaustive] -pub enum FailureReason { - Transport, // synchronous transport error on publish - PeerRejected, // peer signalled rejection (future protocol work) - Timeout, // no receipt within the retry window -} -``` - -`#[non_exhaustive]` on the enum permits new variants; on each struct variant it permits new fields. Both are additive minor-release changes. Future variants (`ConversationRekeyed`, `ParticipantJoined`, `PresenceChanged`, transport health, key-rotation reminders, …) follow this rule. - -Mapping of variants to emit sites: - -| Variant | Emitted from | -|---|---| -| `ConversationStarted` (responder side) | `core/conversations/src/inbox/handler.rs:155-162` (replaces `is_new_convo: true`) | -| `MessageReceived` | `core/conversations/src/conversation/privatev1.rs:184-191` (replaces `is_new_convo: false`) | -| `DeliveryReceipt` | `Context::handle_payload` when decoding a `PrivateV1Frame::Receipt` (future protocol work) | -| `DeliveryFailed { Transport }` | Core method that invoked `delivery.publish` and observed a synchronous error | -| `DeliveryFailed { Timeout }` | Client's background retry thread | - -Events are the uniform observation channel: they carry both observations the call itself caused (e.g. a sync `DeliveryFailed { Transport }` from `send_content`) and observations from background work (e.g. `DeliveryFailed { Timeout }` from the retry thread). The only thing kept outside `Vec` is an obvious primary result the caller will use immediately — returned directly for ergonomics. This is why the initiator side does not emit `ConversationStarted`: `create_private_convo` returns the new `ConversationIdOwned` directly as part of its return value. - ## Decisions -1. **Sync at the client layer for now.** The core stays sync; the client also stays sync. Migrating to async later is non-structural — `std::sync::mpsc::Receiver` swaps to `tokio::sync::mpsc::Receiver` and gains an `impl Stream` shape without changing the chosen mechanism (point 2). Option A would migrate to a `Stream` over a notify primitive; Option C to an `async fn` callback. +1. **Core returns `PayloadOutcome`, a dispatcher-level enum.** Each inbound path inside the core yields its own concrete outcome type: `ConvoOutcome` (`Convo::handle_frame`) carries decrypted contents on an existing conversation; `InboxOutcome` (inbox / inbox_v2 handlers) carries a newly observed conversation plus an optional initial `ConvoOutcome`. `PayloadOutcome` is the dispatcher-level union (`Empty`, `Convo(ConvoOutcome)`, `Inbox(InboxOutcome)`) and is the single type `Context::handle_payload` returns; `From` / `From` impls keep the per-path handlers free of `PayloadOutcome` in their signatures. The split encodes at the type level what each producer can populate — a `Convo` cannot manufacture a new conversation, so its signature precludes the possibility. -2. **Consumer pattern: Option B — channel handed to the caller.** Different consumer archetypes could favour different shapes — a polling UI loop suits Option A; a low-latency push-driven consumer (toast notifications, daemons) suits Option C — but Option B is preferred: it is the most Rust-idiomatic of the three, has few drawbacks compared to A or C, and offers the smoothest path to async (point 1). +2. **`Event` is an asynchronous notification.** The client's constructor returns a `Receiver` alongside the client handle. A background poller drives the transport, calls into the core for each inbound payload, translates the resulting `PayloadOutcome` into one event per observation, and pushes them onto the channel. Background work that has no synchronous trigger at all (delivery retry timeouts, future protocol timers) pushes onto the same channel. -## Event flow +3. **Two enums, mapping at the client boundary.** `PayloadOutcome` is the dispatcher-level sum of observations from one payload; `Event` is a discrete app-facing notification. The two enums are allowed to diverge: a protocol-internal observation the app does not need lives only on a core outcome type; a client-only event like `DeliveryFailed { Timeout }` lives only on `Event`. Translation is an explicit per-variant `match` inside the client — not a blanket `From` impl — to preserve that divergence as both sides grow. -A worked example of the decisions above. Two flows cover everything the application observes: a synchronous send initiated by the app, and a background inbound carried by the client's transport poller. +4. **`ConversationClass` is a core boundary type, not a client-only one.** The protocol-versioned `ConversationKind` (`PrivateV1`, `GroupV1`, …) is a storage concern; clients only need the coarse class (`Private`, `Group`). The kind→class mapping happens in core where `NewConversation` is constructed, so adding a new `ConversationKind` is a one-line change in core's mapping site rather than a ripple into every client. The client re-exports `ConversationClass` for consumers, but the canonical definition lives in core alongside the outcome types. + +## Events vs errors + +Events are asynchronous notifications: things the application learns after the call that might have triggered them has returned. They cross thread boundaries through the channel. + +Synchronous failures — publish, parse, store, MLS — stay on `Result<_, ChatError>` on the call that triggered them. They are never events. `DeliveryFailed { reason }` is therefore an event by construction: only background work can raise it, after the original send already returned `Ok`. + +## Sequence + +Two flows cover everything the application observes: a synchronous send initiated by the app, and inbound bytes carried by the client's transport poller. ```mermaid sequenceDiagram @@ -280,36 +65,22 @@ sequenceDiagram participant Core participant Delivery as DeliveryService - Note over App,Delivery: Outbound — synchronous send initiated by the app + Note over App,Delivery: Outbound — synchronous send App->>Client: send_message(convo, content) - Client->>Core: send_content(&mut delivery, ...) + Client->>Core: send_content(...) Core->>Delivery: publish(envelope) Delivery-->>Core: Ok / Err - Core-->>Client: Ok(Vec) - Client-->>App: Ok(Vec) + Core-->>Client: Ok(()) / Err + Client-->>App: Ok(()) / Err - Note over Poller,Delivery: Inbound — background poll loop in the client - Poller->>Poller: poll tick + Note over Poller,Delivery: Inbound — background poller pushes events Poller->>Delivery: poll Delivery-->>Poller: payload bytes - Poller->>Core: handle_payload(&mut delivery, payload) - Core-->>Poller: Ok(vec![MessageReceived, ...]) - Poller-)App: event via Receiver + Poller->>Core: handle_payload(payload) + Core-->>Poller: Ok(PayloadOutcome) + Poller->>Poller: translate fields → Event values + Poller-)App: events via Receiver - Note over App: Next tick — drain the channel - App->>App: for event in events.try_iter() { handle_event(event) } + Note over App: App drains on its own schedule + App->>App: for event in receiver.try_iter() { handle(event) } ``` - -## References - -### Source references - -- `core/conversations/src/types.rs:9-20` — current `ContentData` and `AddressedEnvelope` -- `core/conversations/src/context.rs:138-185` — `Context::handle_payload` (core inbound entry) -- `core/conversations/src/inbox/handler.rs:124-167` — inbox handshake handler (current `is_new_convo` set site) -- `core/conversations/src/conversation/privatev1.rs:184-191, 219-260` — private-conversation handler -- `crates/client/src/client.rs:60-92` — `ChatClient` public surface -- `crates/client/src/delivery.rs` — `DeliveryService` trait -- `crates/client-ffi/src/api.rs:49-55, 220-285` — current FFI inbound result shape -- `crates/client-ffi/src/delivery.rs:8-15` — existing FFI callback pattern (`DeliverFn`) -- `bin/chat-cli/src/app.rs:46, 144-180` — current application consumption pattern diff --git a/flake.lock b/flake.lock index 3f54c9e..6201fba 100644 --- a/flake.lock +++ b/flake.lock @@ -2,16 +2,18 @@ "nodes": { "logos-delivery": { "inputs": { - "nixpkgs": "nixpkgs", + "nixpkgs": [ + "nixpkgs" + ], "rust-overlay": "rust-overlay", "zerokit": "zerokit" }, "locked": { - "lastModified": 1777287099, - "narHash": "sha256-H2gpbDUg6Wy+uIY9wL0t9ICUPN82B/vCnXZ2mo3Wa/E=", + "lastModified": 1779915920, + "narHash": "sha256-rcIgP6MVyUoNEH6xpdLrZtfd4OcvIcMUloX4IhRq5AA=", "owner": "logos-messaging", "repo": "logos-delivery", - "rev": "5034086fefe2f32bf95319cdd39aa62fc622e4bc", + "rev": "74057c66224f43b4aa27b42033d4ed52eed5c7a7", "type": "github" }, "original": { @@ -22,32 +24,16 @@ }, "nixpkgs": { "locked": { - "lastModified": 1770464364, - "narHash": "sha256-z5NJPSBwsLf/OfD8WTmh79tlSU8XgIbwmk6qB1/TFzY=", + "lastModified": 1779955849, + "narHash": "sha256-31mhzm2HpzRr/rupWAFfWBmt9SUjzwr5+giv5Nmb/rA=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "23d72dabcb3b12469f57b37170fcbc1789bd7457", + "rev": "a2c6938835fca96e4a10c8561d461efd2f91d04f", "type": "github" }, "original": { "owner": "NixOS", - "repo": "nixpkgs", - "rev": "23d72dabcb3b12469f57b37170fcbc1789bd7457", - "type": "github" - } - }, - "nixpkgs_2": { - "locked": { - "lastModified": 1775710090, - "narHash": "sha256-ar3rofg+awPB8QXDaFJhJ2jJhu+KqN/PRCXeyuXR76E=", - "owner": "NixOS", - "repo": "nixpkgs", - "rev": "4c1018dae018162ec878d42fec712642d214fdfa", - "type": "github" - }, - "original": { - "owner": "NixOS", - "ref": "nixos-unstable", + "ref": "nixos-unstable-small", "repo": "nixpkgs", "type": "github" } @@ -55,7 +41,7 @@ "root": { "inputs": { "logos-delivery": "logos-delivery", - "nixpkgs": "nixpkgs_2", + "nixpkgs": "nixpkgs", "rust-overlay": "rust-overlay_3" } }, @@ -109,11 +95,11 @@ ] }, "locked": { - "lastModified": 1775877051, - "narHash": "sha256-wpSQm2PD/w4uRo2wb8utk0b5hOBkkg/CZ1xICY+qB7M=", + "lastModified": 1779938491, + "narHash": "sha256-khIekZCrhy3lQom4AZTmgBPV3DOFgAiopLUyUtbVGhY=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "08b4f3633471874c8894632ade1b78d75dbda002", + "rev": "02f536e36eaee387594ce2a02d90ff678d056e0f", "type": "github" }, "original": { @@ -131,17 +117,15 @@ "rust-overlay": "rust-overlay_2" }, "locked": { - "lastModified": 1771279884, - "narHash": "sha256-tzkQPwSl4vPTUo1ixHh6NCENjsBDroMKTjifg2q8QX8=", "owner": "vacp2p", "repo": "zerokit", - "rev": "53b18098e6d5d046e3eb1ac338a8f4f651432477", + "rev": "5e64cb8822bee65eed6cf459f95ae72b80c6ba63", "type": "github" }, "original": { "owner": "vacp2p", "repo": "zerokit", - "rev": "53b18098e6d5d046e3eb1ac338a8f4f651432477", + "rev": "5e64cb8822bee65eed6cf459f95ae72b80c6ba63", "type": "github" } } diff --git a/flake.nix b/flake.nix index e588d48..ed50c4a 100644 --- a/flake.nix +++ b/flake.nix @@ -2,12 +2,17 @@ description = "libchat - Logos Chat cryptographic library"; inputs = { - nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable"; + # nixos-unstable-small has both crates.io UA fixes (NixOS/nixpkgs#512735, + # NixOS/nixpkgs#524985); nixos-unstable hasn't caught up yet as of 2026-05-28. + nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable-small"; rust-overlay = { url = "github:oxalica/rust-overlay"; inputs.nixpkgs.follows = "nixpkgs"; }; - logos-delivery.url = "github:logos-messaging/logos-delivery"; + logos-delivery = { + url = "github:logos-messaging/logos-delivery"; + inputs.nixpkgs.follows = "nixpkgs"; + }; }; outputs = { self, nixpkgs, rust-overlay, logos-delivery }: @@ -79,7 +84,7 @@ } ); - devShells = forAllSystems ({ pkgs }: + devShells = forAllSystems ({ pkgs, ... }: let rustToolchain = pkgs.rust-bin.fromRustupToolchainFile ./rust_toolchain.toml; in @@ -89,6 +94,7 @@ rustToolchain pkgs.pkg-config pkgs.cmake + pkgs.perl ]; }; }