feat: introduce client event system

- Core returns `InboundResult` — a typed struct with an optional
  `NewConversation` and a `FrameOutcome` of decrypted messages.
- Client surfaces app-facing events via `Vec<Event>`, translated from
  `InboundResult` at the boundary.
- MLS group welcomes now produce a `ConversationStarted` event with no
  initial content, fixing the silent-group-join case where the inbox
  layer dropped the observation.
- C FFI exposes an `EventList` opaque type with indexed accessors and
  an `Invalid` sentinel for OOB / non-applicable reads.
- Symmetric `Inbox` / `InboxV2` handlers: both return
  `Result<InboundResult, _>` and own the persistence + ephemeral-key
  cleanup for the conversations they create.
- Updated and simplified `docs/adr/0001-client-event-system.md`.
This commit is contained in:
osmaczko 2026-05-25 18:50:40 +02:00
parent 4ca9130547
commit addcde03eb
22 changed files with 728 additions and 633 deletions

2
.gitignore vendored
View File

@ -39,4 +39,4 @@ result
crates/client-ffi/client_ffi.h
# Compiled C FFI example binary
examples/c-ffi/c-client
crates/client-ffi/examples/message-exchange/c-client

View File

@ -5,7 +5,7 @@ use std::sync::mpsc;
use anyhow::Result;
use arboard::Clipboard;
use logos_chat::{ChatClient, ConversationIdOwned, DeliveryService};
use logos_chat::{ChatClient, ConversationIdOwned, DeliveryService, Event};
use serde::{Deserialize, Serialize};
use crate::utils::now;
@ -144,41 +144,57 @@ impl<D: DeliveryService + 'static> ChatApp<D> {
pub fn process_incoming(&mut self) -> Result<()> {
while let Ok(payload) = self.inbound.try_recv() {
match self.client.receive(&payload) {
Ok(Some(content)) => {
let chat_id = &content.conversation_id;
if !self.state.chats.contains_key(chat_id) && content.is_new_convo {
let session = ChatSession {
chat_id: chat_id.clone(),
nickname: None,
messages: Vec::new(),
};
self.state.chats.insert(chat_id.clone(), session);
let label = chat_id[..8.min(chat_id.len())].to_string();
self.set_active_chat(Some(chat_id.clone()));
self.status = format!("New chat ({label})! Use /nickname to name it.");
Ok(events) => {
for event in events {
self.handle_event(event);
}
if !content.data.is_empty() {
let text = String::from_utf8_lossy(&content.data).to_string();
if let Some(session) = self.state.chats.get_mut(chat_id) {
session.messages.push(DisplayMessage {
from_self: false,
content: text,
timestamp: now(),
});
}
}
self.save_state()?;
}
Ok(None) => {}
Err(e) => tracing::warn!("receive error: {e:?}"),
Err(e) => {
tracing::warn!("receive error: {e:?}");
self.status = format!("Could not decrypt incoming message: {e}");
}
}
}
Ok(())
}
fn handle_event(&mut self, event: Event) {
match event {
Event::ConversationStarted { convo_id, .. } => {
let chat_id = convo_id.to_string();
if self.state.chats.contains_key(&chat_id) {
return;
}
self.state.chats.insert(
chat_id.clone(),
ChatSession {
chat_id: chat_id.clone(),
nickname: None,
messages: Vec::new(),
},
);
let label = &chat_id[..8.min(chat_id.len())];
self.status = format!("New chat ({label})! Use /nickname to name it.");
self.set_active_chat(Some(chat_id));
}
Event::MessageReceived {
convo_id, content, ..
} => {
let chat_id = convo_id.to_string();
let Some(session) = self.state.chats.get_mut(&chat_id) else {
return;
};
session.messages.push(DisplayMessage {
from_self: false,
content: String::from_utf8_lossy(&content).into_owned(),
timestamp: now(),
});
}
_ => {}
}
}
pub fn send_message(&mut self, content: &str) -> Result<()> {
let chat_id = self
.state

View File

@ -8,12 +8,13 @@ use crate::conversation::{Convo, GroupConvo};
use crate::{DeliveryService, RegistrationService};
use crate::{
conversation::{Conversation, Id, PrivateV1Convo},
conversation::{Id, PrivateV1Convo},
errors::ChatError,
inbound::InboundResult,
inbox::Inbox,
inbox_v2::InboxV2,
proto::{EncryptedPayload, EnvelopeV1, Message},
types::{AccountId, AddressedEnvelope, ContentData},
types::{AccountId, AddressedEnvelope},
};
use crypto::{Identity, PublicKey};
use storage::{ChatStore, ConversationKind};
@ -225,7 +226,7 @@ where
}
// Decode bytes and send to protocol for processing.
pub fn handle_payload(&mut self, payload: &[u8]) -> Result<Option<ContentData>, ChatError> {
pub fn handle_payload(&mut self, payload: &[u8]) -> Result<InboundResult, ChatError> {
let env = EnvelopeV1::decode(payload)?;
// TODO: Impl Conversation hinting
@ -237,42 +238,24 @@ where
c if self.store.borrow().has_conversation(&c)? => {
self.dispatch_to_convo(&c, &env.payload)
}
_ => Ok(Some(ContentData {
conversation_id: "".into(),
data: vec![],
is_new_convo: false,
})),
_ => Ok(InboundResult::default()),
}
}
// Dispatch encrypted payload to Inbox, and register the created Conversation
fn dispatch_to_inbox(
&mut self,
enc_payload_bytes: &[u8],
) -> Result<Option<ContentData>, ChatError> {
// Dispatch encrypted payload to Inbox. The Inbox persists the newly
// created conversation and consumes the ephemeral key internally.
fn dispatch_to_inbox(&mut self, enc_payload_bytes: &[u8]) -> Result<InboundResult, ChatError> {
// EncryptedPayloads are not used by GroupConvos at this time, else this can be performed in `handle_payload`
// TODO: (P1) reconcile envelope parsing between Covno and GroupConvo
let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?;
let public_key_hex = Inbox::<CS>::extract_ephemeral_key_hex(&enc_payload)?;
let (convo, content) =
self.inbox
.handle_frame(enc_payload, &public_key_hex, Rc::clone(&self.store))?;
match convo {
Conversation::Private(mut convo) => convo.persist()?,
};
self.store
.borrow_mut()
.remove_ephemeral_key(&public_key_hex)?;
Ok(content)
self.inbox
.handle_frame(enc_payload, &public_key_hex, Rc::clone(&self.store))
}
// Dispatch encrypted payload to Inbox, and register the created Conversation
fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result<Option<ContentData>, ChatError> {
self.pq_inbox.handle_frame(payload)?;
Ok(None)
// Dispatch encrypted payload to the post-quantum inbox.
fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result<InboundResult, ChatError> {
self.pq_inbox.handle_frame(payload)
}
// Dispatch encrypted payload to its corresponding conversation
@ -280,10 +263,14 @@ where
&mut self,
convo_id: ConversationId,
enc_payload_bytes: &[u8],
) -> Result<Option<ContentData>, ChatError> {
) -> Result<InboundResult, ChatError> {
let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?;
let mut convo = self.load_convo(convo_id)?;
convo.handle_frame(enc_payload)
let frame = convo.handle_frame(enc_payload)?;
Ok(InboundResult {
new_conversation: None,
frame,
})
}
pub fn create_intro_bundle(&mut self) -> Result<Vec<u8>, ChatError> {

View File

@ -3,13 +3,14 @@ mod privatev1;
use crate::{
DeliveryService,
inbound::FrameOutcome,
service_traits::KeyPackageProvider,
types::{AccountId, AddressedEncryptedPayload, ContentData},
types::{AccountId, AddressedEncryptedPayload},
};
use chat_proto::logoschat::encryption::EncryptedPayload;
use std::fmt::Debug;
use std::sync::Arc;
use storage::{ConversationKind, ConversationStore, RatchetStore};
use storage::ConversationKind;
pub use crate::errors::ChatError;
pub use group_v1::{GroupV1Convo, IdentityProvider};
@ -28,13 +29,10 @@ pub trait Convo: Id + Debug {
/// Decrypts and processes an incoming encrypted frame.
///
/// Returns `Ok(Some(ContentData))` if the frame contains user content,
/// `Ok(None)` for protocol frames (e.g., placeholders), or an error if
/// decryption or frame parsing fails.
fn handle_frame(
&mut self,
enc_payload: EncryptedPayload,
) -> Result<Option<ContentData>, ChatError>;
/// Returns the [`FrameOutcome`] describing what the frame produced. May be
/// empty for protocol-only frames (placeholders, commits). Errors only on
/// decryption or frame-parsing failure.
fn handle_frame(&mut self, enc_payload: EncryptedPayload) -> Result<FrameOutcome, ChatError>;
fn remote_id(&self) -> String;
@ -49,7 +47,3 @@ pub trait GroupConvo<DS: DeliveryService, RS: KeyPackageProvider>: Convo {
// sends the payload directly.
fn send_content(&mut self, content: &[u8]) -> Result<(), ChatError>;
}
pub enum Conversation<S: ConversationStore + RatchetStore> {
Private(PrivateV1Convo<S>),
}

View File

@ -1,41 +0,0 @@
use crate::{
conversation::{ChatError, ConversationId, Convo, Id},
proto::EncryptedPayload,
types::{AddressedEncryptedPayload, ContentData},
};
#[derive(Debug)]
pub struct GroupTestConvo {}
impl GroupTestConvo {
pub fn new() -> Self {
Self {}
}
}
impl Id for GroupTestConvo {
fn id(&self) -> ConversationId<'_> {
// implementation
"grouptest"
}
}
impl Convo for GroupTestConvo {
fn send_message(
&mut self,
_content: &[u8],
) -> Result<Vec<AddressedEncryptedPayload>, ChatError> {
Ok(vec![])
}
fn handle_frame(
&mut self,
_encoded_payload: EncryptedPayload,
) -> Result<Option<ContentData>, ChatError> {
Ok(None)
}
fn remote_id(&self) -> String {
self.id().to_string()
}
}

View File

@ -4,6 +4,7 @@
/// - Multiple
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Arc;
use blake2::{Blake2b, Digest, digest::consts::U6};
use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload};
@ -21,8 +22,9 @@ use crate::types::AccountId;
use crate::{
DeliveryService,
conversation::{ChatError, ConversationId, Convo, GroupConvo, Id},
inbound::{FrameOutcome, Message},
service_traits::KeyPackageProvider,
types::{AddressedEncryptedPayload, ContentData},
types::AddressedEncryptedPayload,
};
/// Provides the identity information needed to participate in an MLS group.
@ -306,7 +308,7 @@ where
fn handle_frame(
&mut self,
encoded_payload: EncryptedPayload,
) -> Result<Option<ContentData>, ChatError> {
) -> Result<FrameOutcome, ChatError> {
let bytes = match encoded_payload.encryption {
Some(encrypted_payload::Encryption::Plaintext(pt)) => pt.payload,
_ => {
@ -329,7 +331,7 @@ where
if protocol_message.epoch() < self.mls_group.epoch() {
// TODO: (P1) Add logging for messages arriving from past epoch.
return Ok(None);
return Ok(FrameOutcome::default());
}
let processed = self
@ -337,27 +339,27 @@ where
.process_message(provider, protocol_message)
.map_err(ChatError::generic)?;
match processed.into_content() {
let messages = match processed.into_content() {
ProcessedMessageContent::ApplicationMessage(msg) => {
let reliable = ReliablePayload::decode(msg.into_bytes().as_slice())?;
self.causal.on_receive(&self.convo_id, &reliable);
Ok(Some(ContentData {
conversation_id: hex::encode(self.mls_group.group_id().as_slice()),
data: reliable.content.to_vec(),
is_new_convo: false,
}))
vec![Message {
convo_id: Arc::from(self.id()),
content: reliable.content.to_vec(),
}]
}
ProcessedMessageContent::StagedCommitMessage(commit) => {
self.mls_group
.merge_staged_commit(provider, *commit)
.map_err(ChatError::generic)?;
Ok(None)
vec![]
}
_ => {
// TODO: (P2) Log unknown message type
Ok(None)
vec![]
}
}
};
Ok(FrameOutcome { messages })
}
fn remote_id(&self) -> String {

View File

@ -8,7 +8,7 @@ use chat_proto::logoschat::{
};
use crypto::{PrivateKey, PublicKey, SymmetricKey32};
use double_ratchets::{Header, InstallationKeyPair, RatchetState, restore_ratchet_state};
use prost::{Message, bytes::Bytes};
use prost::{Message as _, bytes::Bytes};
use std::{cell::RefCell, fmt::Debug, rc::Rc, sync::Arc};
use storage::{ConversationKind, ConversationMeta, ConversationStore};
@ -16,8 +16,9 @@ use crate::{
context::ConversationIdOwned,
conversation::{ChatError, ConversationId, Convo, Id},
errors::EncryptionError,
inbound::{FrameOutcome, Message},
proto,
types::{AddressedEncryptedPayload, ContentData},
types::AddressedEncryptedPayload,
utils::timestamp_millis,
};
use double_ratchets::{to_ratchet_record, to_skipped_key_records};
@ -181,15 +182,6 @@ impl<S: ConversationStore + RatchetStore> PrivateV1Convo<S> {
Ok(PrivateV1Frame::decode(content_bytes.as_slice()).unwrap())
}
// Handler for application content
fn handle_content(&self, data: Vec<u8>) -> Option<ContentData> {
Some(ContentData {
conversation_id: self.id().into(),
data,
is_new_convo: false,
})
}
/// Persists a conversation's metadata and ratchet state to DB.
pub fn persist(&mut self) -> Result<ConversationIdOwned, ChatError> {
let convo_info = ConversationMeta {
@ -208,6 +200,13 @@ impl<S: ConversationStore + RatchetStore> PrivateV1Convo<S> {
storage.save_ratchet_state(&self.local_convo_id, &record, &skipped_keys)?;
Ok(())
}
fn handle_content(&self, bytes: Bytes) -> Vec<Message> {
vec![Message {
convo_id: Arc::from(self.id()),
content: bytes.into(),
}]
}
}
impl<S: ConversationStore + RatchetStore> Id for PrivateV1Convo<S> {
@ -241,7 +240,7 @@ impl<S: ConversationStore + RatchetStore> Convo for PrivateV1Convo<S> {
fn handle_frame(
&mut self,
encoded_payload: EncryptedPayload,
) -> Result<Option<ContentData>, ChatError> {
) -> Result<FrameOutcome, ChatError> {
// Extract expected frame
let frame = self
.decrypt(encoded_payload)
@ -253,13 +252,11 @@ impl<S: ConversationStore + RatchetStore> Convo for PrivateV1Convo<S> {
self.save_ratchet_state(&mut *self.store.borrow_mut())?;
// Handle FrameTypes
let output = match frame_type {
FrameType::Content(bytes) => self.handle_content(bytes.into()),
FrameType::Placeholder(_) => None,
let messages = match frame_type {
FrameType::Content(bytes) => self.handle_content(bytes),
FrameType::Placeholder(_) => vec![],
};
Ok(output)
Ok(FrameOutcome { messages })
}
fn remote_id(&self) -> String {

View File

@ -0,0 +1,57 @@
//! Outcome of processing a single inbound payload.
//!
//! [`InboundResult`] composes two layers:
//! - [`FrameOutcome`] captures what processing one frame within a conversation
//! produces: today, decrypted messages. As protocol features land, new
//! per-conversation observations (e.g. group membership changes) become
//! additive fields on `FrameOutcome`.
//! - [`InboundResult`] wraps a `FrameOutcome` and adds the payload-level
//! observations a single frame cannot produce — today, the appearance of
//! a new conversation from the peer side.
use storage::ConversationKind;
use crate::context::ConversationIdOwned;
/// Observations a conversation produces from processing one frame.
#[derive(Debug, Clone, Default)]
pub struct FrameOutcome {
/// User content decrypted from this frame, in protocol order.
pub messages: Vec<Message>,
}
impl FrameOutcome {
pub fn is_empty(&self) -> bool {
self.messages.is_empty()
}
}
/// Everything one inbound payload produced.
#[derive(Debug, Clone, Default)]
pub struct InboundResult {
/// A new conversation appeared from this payload, if any.
pub new_conversation: Option<NewConversation>,
/// Observations from the frame inside this payload.
pub frame: FrameOutcome,
}
impl InboundResult {
/// True when the payload produced no observable outcome.
pub fn is_empty(&self) -> bool {
self.new_conversation.is_none() && self.frame.is_empty()
}
}
/// A conversation newly observed from the peer side.
#[derive(Debug, Clone)]
pub struct NewConversation {
pub convo_id: ConversationIdOwned,
pub kind: ConversationKind,
}
/// User content decrypted from an inbound payload.
#[derive(Debug, Clone)]
pub struct Message {
pub convo_id: ConversationIdOwned,
pub content: Vec<u8>,
}

View File

@ -5,16 +5,18 @@ use prost::bytes::Bytes;
use rand_core::OsRng;
use std::cell::RefCell;
use std::rc::Rc;
use storage::{ConversationStore, EphemeralKeyStore, RatchetStore};
use std::sync::Arc;
use storage::{ConversationKind, ConversationStore, EphemeralKeyStore, RatchetStore};
use crypto::{PrekeyBundle, SymmetricKey32};
use crate::context::Introduction;
use crate::conversation::{ChatError, Conversation, ConversationId, Convo, Id, PrivateV1Convo};
use crate::conversation::{ChatError, ConversationId, Convo, Id, PrivateV1Convo};
use crate::crypto::{CopyBytes, PrivateKey, PublicKey};
use crate::inbound::{InboundResult, NewConversation};
use crate::inbox::handshake::InboxHandshake;
use crate::proto;
use crate::types::{AddressedEncryptedPayload, ContentData};
use crate::types::AddressedEncryptedPayload;
use crypto::Identity;
/// Compute the deterministic Delivery_address for an installation
@ -119,14 +121,17 @@ impl<S: EphemeralKeyStore> Inbox<S> {
Ok((convo, payloads))
}
/// Handles an incoming inbox frame. The caller must provide the ephemeral private key
/// looked up from storage. Returns the created conversation and optional content data.
/// Handles an incoming inbox frame. The caller must provide the ephemeral
/// private key hex looked up from storage. Persists the created
/// conversation and consumes the ephemeral key. Returns the
/// [`InboundResult`] describing what was observed — for a successful
/// invite, a `new_conversation` and one initial `message`.
pub fn handle_frame<PS: ConversationStore + RatchetStore>(
&self,
enc_payload: EncryptedPayload,
public_key_hex: &str,
private_store: Rc<RefCell<PS>>,
) -> Result<(Conversation<PS>, Option<ContentData>), ChatError> {
) -> Result<InboundResult, ChatError> {
let ephemeral_key = self
.store
.borrow()
@ -143,7 +148,7 @@ impl<S: EphemeralKeyStore> Inbox<S> {
let (seed_key, frame) =
self.perform_handshake(&ephemeral_key, header, handshake.payload)?;
match frame.frame_type.unwrap() {
let result = match frame.frame_type.unwrap() {
proto::inbox_v1_frame::FrameType::InvitePrivateV1(_invite_private_v1) => {
let mut convo =
PrivateV1Convo::new_responder(private_store, seed_key, &ephemeral_key);
@ -152,18 +157,31 @@ impl<S: EphemeralKeyStore> Inbox<S> {
return Err(ChatError::Protocol("missing initial encpayload".into()));
};
// Set is_new_convo for content data
let content = match convo.handle_frame(enc_payload)? {
Some(v) => ContentData {
is_new_convo: true,
..v
},
None => return Err(ChatError::Protocol("expected contentData".into())),
};
let frame = convo.handle_frame(enc_payload)?;
if frame.messages.is_empty() {
return Err(ChatError::Protocol(
"expected initial message in invite".into(),
));
}
Ok((Conversation::Private(convo), Some(content)))
let new_conversation = NewConversation {
convo_id: Arc::from(convo.id()),
kind: ConversationKind::PrivateV1,
};
convo.persist()?;
InboundResult {
new_conversation: Some(new_conversation),
frame,
}
}
}
};
self.store
.borrow_mut()
.remove_ephemeral_key(public_key_hex)?;
Ok(result)
}
/// Extracts the ephemeral key hex from an incoming encrypted payload

View File

@ -1,5 +1,6 @@
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Arc;
use chat_proto::logoschat::envelope::EnvelopeV1;
use openmls::prelude::tls_codec::Serialize;
@ -7,6 +8,7 @@ use openmls::prelude::*;
use openmls_libcrux_crypto::Provider as LibcruxProvider;
use prost::{Message, Oneof};
use storage::ChatStore;
use storage::ConversationKind;
use storage::ConversationMeta;
use crate::AddressedEnvelope;
@ -18,7 +20,8 @@ use crate::causal_history::CausalHistoryStore;
use crate::causal_history::MissingMessage;
use crate::conversation::GroupConvo;
use crate::conversation::group_v1::MlsContext;
use crate::conversation::{GroupV1Convo, IdentityProvider};
use crate::conversation::{GroupV1Convo, Id, IdentityProvider};
use crate::inbound::{FrameOutcome, InboundResult, NewConversation};
use crate::types::AccountId;
use crate::utils::{blake2b_hex, hash_size};
pub struct PqMlsContext {
@ -152,7 +155,7 @@ where
)
}
pub fn handle_frame(&self, payload_bytes: &[u8]) -> Result<(), ChatError> {
pub fn handle_frame(&self, payload_bytes: &[u8]) -> Result<InboundResult, ChatError> {
let inbox_frame = InboxV2Frame::decode(payload_bytes)?;
let Some(payload) = inbox_frame.payload else {
@ -172,14 +175,14 @@ where
let meta = ConversationMeta {
local_convo_id: convo.id().to_string(),
remote_convo_id: "0".into(),
kind: storage::ConversationKind::GroupV1,
kind: ConversationKind::GroupV1,
};
self.store.borrow_mut().save_conversation(&meta)?;
// TODO: (P1) Persist state
Ok(())
}
fn handle_heavy_invite(&self, invite: GroupV1HeavyInvite) -> Result<(), ChatError> {
fn handle_heavy_invite(&self, invite: GroupV1HeavyInvite) -> Result<InboundResult, ChatError> {
let (msg_in, _rest) = MlsMessageIn::tls_deserialize_bytes(invite.welcome_bytes.as_slice())?;
let MlsMessageBodyIn::Welcome(welcome) = msg_in.extract() else {
@ -197,7 +200,15 @@ where
self.causal.clone(),
welcome,
)?;
self.persist_convo(convo)
let convo_id = Arc::from(convo.id());
self.persist_convo(convo)?;
Ok(InboundResult {
new_conversation: Some(NewConversation {
convo_id,
kind: ConversationKind::GroupV1,
}),
frame: FrameOutcome::default(),
})
}
fn create_keypackage(&self) -> Result<KeyPackage, ChatError> {

View File

@ -4,6 +4,7 @@ mod context;
mod conversation;
mod crypto;
mod errors;
mod inbound;
mod inbox;
mod inbox_v2;
mod proto;
@ -18,6 +19,8 @@ pub use chat_sqlite::StorageConfig;
pub use context::{Context, ConversationId, ConversationIdOwned, Introduction};
pub use conversation::GroupConvo;
pub use errors::ChatError;
pub use inbound::{FrameOutcome, InboundResult, Message, NewConversation};
pub use service_traits::{DeliveryService, IdentityProvider, RegistrationService};
pub use types::{AccountId, AddressedEnvelope, ContentData};
pub use storage::ConversationKind;
pub use types::{AccountId, AddressedEnvelope};
pub use utils::hex_trunc;

View File

@ -48,15 +48,6 @@ impl Debug for AddressedEnvelope {
}
}
// This struct represents the result of processed inbound data.
// It wraps content payload with a conversation_id
#[derive(Debug)]
pub struct ContentData {
pub conversation_id: String,
pub data: Vec<u8>,
pub is_new_convo: bool,
}
// Internal type Definitions
// Used by Conversations to attach addresses to outbound encrypted payloads

View File

@ -1,38 +1,48 @@
use std::ops::{Deref, DerefMut};
use components::{EphemeralRegistry, LocalBroadcaster, MemStore};
use libchat::{ContentData, Context, GroupConvo, hex_trunc};
use libchat::{
Context, ConversationKind, GroupConvo, InboundResult, Message, NewConversation, hex_trunc,
};
type ResultCallback = Box<dyn Fn(&InboundResult)>;
// Simple client Functionality for testing
struct Client {
inner: Context<LocalBroadcaster, EphemeralRegistry, MemStore>,
on_content: Option<Box<dyn Fn(ContentData)>>,
on_result: Option<ResultCallback>,
new_conversations: Vec<NewConversation>,
received_messages: Vec<Message>,
}
impl Client {
fn init(
ctx: Context<LocalBroadcaster, EphemeralRegistry, MemStore>,
cb: Option<impl Fn(ContentData) + 'static>,
cb: Option<impl Fn(&InboundResult) + 'static>,
) -> Self {
Client {
inner: ctx,
on_content: cb.map(|f| Box::new(f) as Box<dyn Fn(ContentData)>),
on_result: cb.map(|f| Box::new(f) as ResultCallback),
new_conversations: Vec::new(),
received_messages: Vec::new(),
}
}
fn process_messages(&mut self) {
let messages: Vec<_> = {
let payloads: Vec<_> = {
let mut ds = self.ds();
std::iter::from_fn(|| ds.poll()).collect()
};
for data in messages {
let res = self.handle_payload(&data).unwrap();
if let Some(cb) = &self.on_content
&& let Some(content_data) = res
{
cb(content_data);
for data in payloads {
let result = self.handle_payload(&data).unwrap();
if let Some(cb) = &self.on_result {
cb(&result);
}
if let Some(nc) = result.new_conversation {
self.new_conversations.push(nc);
}
self.received_messages.extend(result.frame.messages);
}
}
@ -60,12 +70,21 @@ impl DerefMut for Client {
}
// Higher order function to handle printing
fn pretty_print(prefix: impl Into<String>) -> Box<dyn Fn(ContentData)> {
fn pretty_print(prefix: impl Into<String>) -> ResultCallback {
let prefix = prefix.into();
Box::new(move |c: ContentData| {
let cid = hex_trunc(c.conversation_id.as_bytes());
let content = String::from_utf8(c.data).unwrap();
println!("{} ({:?}) {}", prefix, cid, content)
Box::new(move |result: &InboundResult| {
if let Some(nc) = &result.new_conversation {
let cid = hex_trunc(nc.convo_id.as_bytes());
println!(
"{prefix} ({cid:?}) [conversation started: {:?}]",
nc.kind
);
}
for msg in &result.frame.messages {
let cid = hex_trunc(msg.convo_id.as_bytes());
let text = String::from_utf8_lossy(&msg.content);
println!("{prefix} ({cid:?}) {text}");
}
})
}
@ -95,21 +114,33 @@ fn create_group() {
let raya_id = clients[RAYA].account_id().clone();
let s_convo = clients[SARO].create_group_convo(&[&raya_id]).unwrap();
let convo_id = s_convo.id();
let convo_id = s_convo.id().to_string();
// Raya can read this message because
// 1) It was sent after add_members was committed, and
// 2) LocalBroadcaster provides historical messages.
clients[SARO]
.convo(convo_id)
.convo(&convo_id)
.send_content(b"ok who broke the group chat again")
.unwrap();
process(&mut clients);
// Raya should observe exactly one new GroupV1 conversation from the
// welcome, even though no initial content arrives with it.
let raya_started = clients[RAYA]
.new_conversations
.iter()
.filter(|nc| matches!(nc.kind, ConversationKind::GroupV1))
.count();
assert_eq!(
raya_started, 1,
"Raya should have observed exactly one new GroupV1 conversation for the welcome"
);
clients[RAYA]
.convo(convo_id)
.convo(&convo_id)
.send_content(b"it was literally working five minutes ago")
.unwrap();
@ -121,21 +152,31 @@ fn create_group() {
let pax_id = clients[PAX].account_id().clone();
clients[SARO]
.convo(convo_id)
.convo(&convo_id)
.add_member(&[&pax_id])
.unwrap();
process(&mut clients);
let pax_started = clients[PAX]
.new_conversations
.iter()
.filter(|nc| matches!(nc.kind, ConversationKind::GroupV1))
.count();
assert_eq!(
pax_started, 1,
"Pax should have observed exactly one new GroupV1 conversation for the welcome"
);
clients[PAX]
.convo(convo_id)
.convo(&convo_id)
.send_content(b"ngl the key rotation is cooked")
.unwrap();
process(&mut clients);
clients[SARO]
.convo(convo_id)
.convo(&convo_id)
.send_content(b"bro we literally just added you to the group ")
.unwrap();

View File

@ -1,5 +1,5 @@
use chat_sqlite::{ChatStorage, StorageConfig};
use libchat::{Context, Introduction};
use libchat::{Context, ConversationKind, Introduction};
use storage::{ConversationStore, IdentityStore};
use tempfile::tempdir;
@ -13,12 +13,14 @@ fn send_and_verify(
) {
let payloads = sender.send_content(convo_id, content).unwrap();
let payload = payloads.first().unwrap();
let received = receiver
.handle_payload(&payload.data)
.unwrap()
.expect("expected content");
assert_eq!(content, received.data.as_slice());
assert!(!received.is_new_convo);
let result = receiver.handle_payload(&payload.data).unwrap();
assert!(result.new_conversation.is_none());
assert_eq!(
result.frame.messages.len(),
1,
"steady-state send should yield one message"
);
assert_eq!(content, result.frame.messages[0].content.as_slice());
}
#[test]
@ -38,16 +40,22 @@ fn ctx_integration() {
let mut content = vec![10];
let (saro_convo_id, payloads) = saro.create_private_convo(&intro, &content).unwrap();
// Raya receives initial message
// Raya receives the invite + initial message
let payload = payloads.first().unwrap();
let initial_content = raya
.handle_payload(&payload.data)
.unwrap()
.expect("expected initial content");
let raya_convo_id = initial_content.conversation_id;
assert_eq!(content, initial_content.data);
assert!(initial_content.is_new_convo);
let initial = raya.handle_payload(&payload.data).unwrap();
let new_convo = initial
.new_conversation
.as_ref()
.expect("invite must create a conversation");
assert!(matches!(new_convo.kind, ConversationKind::PrivateV1));
assert_eq!(
initial.frame.messages.len(),
1,
"invite must include initial message"
);
assert_eq!(content, initial.frame.messages[0].content);
assert_eq!(new_convo.convo_id, initial.frame.messages[0].convo_id);
let raya_convo_id = new_convo.convo_id.clone();
// Exchange messages back and forth
for _ in 0..10 {
@ -107,8 +115,12 @@ fn conversation_metadata_persistence() {
let (_, payloads) = bob.create_private_convo(&intro, b"hi").unwrap();
let payload = payloads.first().unwrap();
let content = alice.handle_payload(&payload.data).unwrap().unwrap();
assert!(content.is_new_convo);
let result = alice.handle_payload(&payload.data).unwrap();
let new_convo = result
.new_conversation
.as_ref()
.expect("invite must create a conversation");
assert!(matches!(new_convo.kind, ConversationKind::PrivateV1));
let convos = alice.store().load_conversations().unwrap();
assert_eq!(convos.len(), 1);
@ -128,16 +140,23 @@ fn conversation_full_flow() {
let (bob_convo_id, payloads) = bob.create_private_convo(&intro, b"hello").unwrap();
let payload = payloads.first().unwrap();
let content = alice.handle_payload(&payload.data).unwrap().unwrap();
let alice_convo_id = content.conversation_id;
let result = alice.handle_payload(&payload.data).unwrap();
let alice_convo_id = result
.new_conversation
.as_ref()
.expect("invite must create a conversation")
.convo_id
.clone();
let payloads = alice.send_content(&alice_convo_id, b"reply 1").unwrap();
let payload = payloads.first().unwrap();
bob.handle_payload(&payload.data).unwrap().unwrap();
let result = bob.handle_payload(&payload.data).unwrap();
assert_eq!(result.frame.messages[0].content, b"reply 1");
let payloads = bob.send_content(&bob_convo_id, b"reply 2").unwrap();
let payload = payloads.first().unwrap();
alice.handle_payload(&payload.data).unwrap().unwrap();
let result = alice.handle_payload(&payload.data).unwrap();
assert_eq!(result.frame.messages[0].content, b"reply 2");
// Verify conversation list
let convo_ids = alice.list_conversations().unwrap();
@ -146,18 +165,12 @@ fn conversation_full_flow() {
// Continue exchanging messages
let payloads = bob.send_content(&bob_convo_id, b"more messages").unwrap();
let payload = payloads.first().unwrap();
let content = alice
.handle_payload(&payload.data)
.expect("should decrypt")
.expect("should have content");
assert_eq!(content.data, b"more messages");
let result = alice.handle_payload(&payload.data).expect("should decrypt");
assert_eq!(result.frame.messages[0].content, b"more messages");
// Alice can also send back
let payloads = alice.send_content(&alice_convo_id, b"alice reply").unwrap();
let payload = payloads.first().unwrap();
let content = bob
.handle_payload(&payload.data)
.unwrap()
.expect("bob should receive");
assert_eq!(content.data, b"alice reply");
let result = bob.handle_payload(&payload.data).unwrap();
assert_eq!(result.frame.messages[0].content, b"alice reply");
}

View File

@ -84,19 +84,38 @@ static int32_t deliver_cb(
/* ------------------------------------------------------------------
* Helper: pop one envelope from the bus and push it into receiver.
* Returns a heap-allocated result; caller frees with
* push_inbound_result_free().
* Returns a heap-allocated event list; caller frees with
* event_list_free().
* ------------------------------------------------------------------ */
static PushInboundResult_t *route(ClientHandle_t *receiver)
static EventList_t *route(ClientHandle_t *receiver)
{
const uint8_t *data;
size_t len;
int ok = queue_pop(&bus, &data, &len);
assert(ok && "expected an envelope in the bus");
PushInboundResult_t *r = client_receive(receiver, SLICE(data, len));
assert(push_inbound_result_error_code(r) == 0 && "push_inbound failed");
return r;
EventList_t *evs = client_receive(receiver, SLICE(data, len));
assert(event_list_error_code(evs) == 0 && "client_receive failed");
return evs;
}
/* ------------------------------------------------------------------
* Helper: locate the first MessageReceived event in a list and copy
* its content into the caller-supplied buffer. Returns -1 if not found.
* ------------------------------------------------------------------ */
static int find_message(EventList_t *evs, char *out, size_t out_cap, size_t *out_len)
{
size_t n = event_list_len(evs);
for (size_t i = 0; i < n; ++i) {
if (event_list_kind_at(evs, i) == EVENT_KIND_MESSAGE_RECEIVED) {
slice_ref_uint8_t s = event_list_content_at(evs, i);
assert(s.len <= out_cap && "content buffer too small");
memcpy(out, s.ptr, s.len);
*out_len = s.len;
return (int)i;
}
}
return -1;
}
/* ------------------------------------------------------------------
@ -125,19 +144,23 @@ int main(void)
assert(create_convo_result_error_code(saro_convo) == 0);
create_intro_result_free(raya_intro);
/* Route saro -> raya */
PushInboundResult_t *recv = route(raya);
/* Route saro -> raya: expect [ConversationStarted, MessageReceived] */
EventList_t *evs = route(raya);
assert(event_list_len(evs) == 2 && "expected 2 events for invite");
assert(event_list_kind_at(evs, 0) == EVENT_KIND_CONVERSATION_STARTED
&& "first event should be ConversationStarted");
assert(event_list_conversation_class_at(evs, 0) == FFI_CONVERSATION_CLASS_PRIVATE
&& "expected Private convo class");
assert(push_inbound_result_has_content(recv) && "expected content from saro");
assert(push_inbound_result_is_new_convo(recv) && "expected new-conversation flag");
char msg[64];
size_t msg_len;
int idx = find_message(evs, msg, sizeof(msg), &msg_len);
assert(idx >= 0 && "expected MessageReceived from saro");
assert(msg_len == 10 && memcmp(msg, "hello raya", 10) == 0);
printf("Raya received: \"%.*s\"\n", (int)msg_len, msg);
slice_ref_uint8_t content = push_inbound_result_content(recv);
assert(content.len == 10);
assert(memcmp(content.ptr, "hello raya", 10) == 0);
printf("Raya received: \"%.*s\"\n", (int)content.len, content.ptr);
/* Copy Raya's convo_id before freeing recv */
slice_ref_uint8_t cid_ref = push_inbound_result_convo_id(recv);
/* Copy Raya's convo_id from the ConversationStarted event */
slice_ref_uint8_t cid_ref = event_list_convo_id_at(evs, 0);
uint8_t raya_cid[256];
size_t raya_cid_len = cid_ref.len;
if (raya_cid_len >= sizeof(raya_cid)) {
@ -145,37 +168,37 @@ int main(void)
return 1;
}
memcpy(raya_cid, cid_ref.ptr, raya_cid_len);
push_inbound_result_free(recv);
event_list_free(evs);
/* Raya replies */
ErrorCode_t rc = client_send_message(
raya, SLICE(raya_cid, raya_cid_len), STR("hi saro"));
assert(rc == ERROR_CODE_NONE);
recv = route(saro);
assert(push_inbound_result_has_content(recv) && "expected content from raya");
assert(!push_inbound_result_is_new_convo(recv) && "unexpected new-convo flag");
content = push_inbound_result_content(recv);
assert(content.len == 7);
assert(memcmp(content.ptr, "hi saro", 7) == 0);
printf("Saro received: \"%.*s\"\n", (int)content.len, content.ptr);
push_inbound_result_free(recv);
evs = route(saro);
assert(event_list_len(evs) == 1 && "expected MessageReceived only");
assert(event_list_kind_at(evs, 0) == EVENT_KIND_MESSAGE_RECEIVED);
idx = find_message(evs, msg, sizeof(msg), &msg_len);
assert(idx >= 0);
assert(msg_len == 7 && memcmp(msg, "hi saro", 7) == 0);
printf("Saro received: \"%.*s\"\n", (int)msg_len, msg);
event_list_free(evs);
/* Multiple back-and-forth rounds */
slice_ref_uint8_t saro_cid = create_convo_result_id(saro_convo);
for (int i = 0; i < 3; i++) {
char msg[32];
int mlen = snprintf(msg, sizeof(msg), "msg %d", i);
char text[32];
int tlen = snprintf(text, sizeof(text), "msg %d", i);
rc = client_send_message(saro, saro_cid, SLICE(msg, (size_t)mlen));
rc = client_send_message(saro, saro_cid, SLICE(text, (size_t)tlen));
assert(rc == ERROR_CODE_NONE);
recv = route(raya);
assert(push_inbound_result_has_content(recv));
content = push_inbound_result_content(recv);
assert((int)content.len == mlen);
assert(memcmp(content.ptr, msg, (size_t)mlen) == 0);
push_inbound_result_free(recv);
evs = route(raya);
idx = find_message(evs, msg, sizeof(msg), &msg_len);
assert(idx >= 0);
assert((int)msg_len == tlen);
assert(memcmp(msg, text, (size_t)tlen) == 0);
event_list_free(evs);
char reply[32];
int rlen = snprintf(reply, sizeof(reply), "reply %d", i);
@ -184,12 +207,12 @@ int main(void)
raya, SLICE(raya_cid, raya_cid_len), SLICE(reply, (size_t)rlen));
assert(rc == ERROR_CODE_NONE);
recv = route(saro);
assert(push_inbound_result_has_content(recv));
content = push_inbound_result_content(recv);
assert((int)content.len == rlen);
assert(memcmp(content.ptr, reply, (size_t)rlen) == 0);
push_inbound_result_free(recv);
evs = route(saro);
idx = find_message(evs, msg, sizeof(msg), &msg_len);
assert(idx >= 0);
assert((int)msg_len == rlen);
assert(memcmp(msg, reply, (size_t)rlen) == 0);
event_list_free(evs);
}
/* Cleanup */

View File

@ -2,7 +2,7 @@ use safer_ffi::prelude::*;
use std::sync::Arc;
use crate::delivery::{CDelivery, DeliverFn};
use logos_chat::{ChatClient, ClientError};
use logos_chat::{ChatClient, ClientError, ConversationClass, Event};
// ---------------------------------------------------------------------------
// Opaque client handle
@ -21,9 +21,47 @@ pub struct ClientHandle(pub(crate) ChatClient<CDelivery>);
pub enum ErrorCode {
None = 0,
BadUtf8 = -1,
/// Failure parsing or processing an introduction bundle.
BadIntro = -2,
DeliveryFail = -3,
UnknownError = -4,
/// Failure decoding, decrypting, or processing an inbound payload.
BadPayload = -5,
}
// ---------------------------------------------------------------------------
// Event taxonomy (C-side view of Event)
// ---------------------------------------------------------------------------
#[derive_ReprC]
#[repr(i32)]
#[derive(Clone, Copy)]
pub enum EventKind {
/// Sentinel returned by `event_list_kind_at` for out-of-bounds indices.
/// Never the kind of a real event row.
Invalid = -1,
ConversationStarted = 0,
MessageReceived = 1,
}
#[derive_ReprC]
#[repr(i32)]
#[derive(Clone, Copy)]
pub enum FfiConversationClass {
/// Sentinel for accessor calls that don't apply to the queried row
/// (out-of-bounds, or a non-`ConversationStarted` event).
Invalid = -1,
Private = 0,
Group = 1,
}
impl From<ConversationClass> for FfiConversationClass {
fn from(c: ConversationClass) -> Self {
match c {
ConversationClass::Private => FfiConversationClass::Private,
ConversationClass::Group => FfiConversationClass::Group,
}
}
}
// ---------------------------------------------------------------------------
@ -44,14 +82,61 @@ pub struct CreateConvoResult {
convo_id: Option<String>,
}
/// An ordered list of events with a status code. Inspect `error_code` (zero
/// on success) before iterating with `event_list_len` and the indexed
/// accessors.
#[derive_ReprC]
#[repr(opaque)]
pub struct PushInboundResult {
pub struct EventList {
error_code: i32,
has_content: bool,
is_new_convo: bool,
convo_id: Option<String>,
content: Option<Vec<u8>>,
events: Vec<EventRow>,
}
enum EventRow {
ConversationStarted {
convo_id: String,
class: FfiConversationClass,
},
MessageReceived {
convo_id: String,
content: Vec<u8>,
},
}
impl EventRow {
/// Translate an [`Event`] into the FFI row shape, or `None` for variants
/// without an FFI representation.
fn from_event(event: Event) -> Option<Self> {
match event {
Event::ConversationStarted {
convo_id, class, ..
} => Some(EventRow::ConversationStarted {
convo_id: convo_id.to_string(),
class: class.into(),
}),
Event::MessageReceived {
convo_id, content, ..
} => Some(EventRow::MessageReceived {
convo_id: convo_id.to_string(),
content,
}),
_ => None,
}
}
fn convo_id(&self) -> &str {
match self {
EventRow::ConversationStarted { convo_id, .. }
| EventRow::MessageReceived { convo_id, .. } => convo_id,
}
}
fn content(&self) -> &[u8] {
match self {
EventRow::MessageReceived { content, .. } => content,
_ => &[],
}
}
}
// ---------------------------------------------------------------------------
@ -214,72 +299,90 @@ fn client_send_message(
}
// ---------------------------------------------------------------------------
// Push inbound
// Receive (process inbound, get event list back)
// ---------------------------------------------------------------------------
/// Decrypt an inbound payload. `has_content` is false for protocol frames.
/// Free with `push_inbound_result_free`.
/// Decrypt an inbound payload. Returns the events the payload produced;
/// the list may be empty for protocol-only frames. Free with
/// `event_list_free`.
#[ffi_export]
fn client_receive(
handle: &mut ClientHandle,
payload: c_slice::Ref<'_, u8>,
) -> repr_c::Box<PushInboundResult> {
) -> repr_c::Box<EventList> {
let result = match handle.0.receive(payload.as_slice()) {
Ok(Some(cd)) => PushInboundResult {
Ok(events) => EventList {
error_code: ErrorCode::None as i32,
has_content: true,
is_new_convo: cd.is_new_convo,
convo_id: Some(cd.conversation_id),
content: Some(cd.data),
events: events
.into_iter()
.filter_map(EventRow::from_event)
.collect(),
},
Ok(None) => PushInboundResult {
error_code: ErrorCode::None as i32,
has_content: false,
is_new_convo: false,
convo_id: None,
content: None,
Err(ClientError::Chat(_)) => EventList {
error_code: ErrorCode::BadPayload as i32,
events: Vec::new(),
},
Err(_) => PushInboundResult {
error_code: ErrorCode::UnknownError as i32,
has_content: false,
is_new_convo: false,
convo_id: None,
content: None,
Err(ClientError::Delivery(_)) => EventList {
error_code: ErrorCode::DeliveryFail as i32,
events: Vec::new(),
},
};
Box::new(result).into()
}
#[ffi_export]
fn push_inbound_result_error_code(r: &PushInboundResult) -> i32 {
r.error_code
fn event_list_error_code(list: &EventList) -> i32 {
list.error_code
}
#[ffi_export]
fn push_inbound_result_has_content(r: &PushInboundResult) -> bool {
r.has_content
fn event_list_len(list: &EventList) -> usize {
list.events.len()
}
/// Returns `EventKind::Invalid` for out-of-bounds indices.
#[ffi_export]
fn event_list_kind_at(list: &EventList, idx: usize) -> EventKind {
match list.events.get(idx) {
Some(EventRow::ConversationStarted { .. }) => EventKind::ConversationStarted,
Some(EventRow::MessageReceived { .. }) => EventKind::MessageReceived,
None => EventKind::Invalid,
}
}
/// Returns an empty slice for out-of-bounds indices.
/// The slice is valid only while `list` is alive.
#[ffi_export]
fn event_list_convo_id_at(list: &EventList, idx: usize) -> c_slice::Ref<'_, u8> {
list.events
.get(idx)
.map(|r| r.convo_id().as_bytes())
.unwrap_or(&[])
.into()
}
/// Returns an empty slice for non-`MessageReceived` events or out-of-bounds.
/// The slice is valid only while `list` is alive.
#[ffi_export]
fn event_list_content_at(list: &EventList, idx: usize) -> c_slice::Ref<'_, u8> {
list.events
.get(idx)
.map(EventRow::content)
.unwrap_or(&[])
.into()
}
/// Returns `FfiConversationClass::Invalid` for non-`ConversationStarted`
/// events or out-of-bounds.
#[ffi_export]
fn event_list_conversation_class_at(list: &EventList, idx: usize) -> FfiConversationClass {
match list.events.get(idx) {
Some(EventRow::ConversationStarted { class, .. }) => *class,
_ => FfiConversationClass::Invalid,
}
}
#[ffi_export]
fn push_inbound_result_is_new_convo(r: &PushInboundResult) -> bool {
r.is_new_convo
}
/// Returns an empty slice when has_content is false.
/// The slice is valid only while `r` is alive.
#[ffi_export]
fn push_inbound_result_convo_id(r: &PushInboundResult) -> c_slice::Ref<'_, u8> {
r.convo_id.as_deref().unwrap_or("").as_bytes().into()
}
/// Returns an empty slice when has_content is false.
/// The slice is valid only while `r` is alive.
#[ffi_export]
fn push_inbound_result_content(r: &PushInboundResult) -> c_slice::Ref<'_, u8> {
r.content.as_deref().unwrap_or(&[]).into()
}
#[ffi_export]
fn push_inbound_result_free(r: repr_c::Box<PushInboundResult>) {
drop(r)
fn event_list_free(list: repr_c::Box<EventList>) {
drop(list)
}

View File

@ -1,4 +1,4 @@
use logos_chat::{ChatClient, ConversationIdOwned, InProcessDelivery};
use logos_chat::{ChatClient, ConversationIdOwned, Event, InProcessDelivery};
use std::sync::Arc;
fn main() {
@ -13,21 +13,29 @@ fn main() {
.unwrap();
let raw = cursor.next().unwrap();
let content = raya.receive(&raw).unwrap().unwrap();
println!(
"Raya received: {:?}",
std::str::from_utf8(&content.data).unwrap()
);
let events = raya.receive(&raw).unwrap();
let raya_convo_id: ConversationIdOwned = events
.iter()
.find_map(|e| match e {
Event::ConversationStarted { convo_id, .. } => Some(Arc::clone(convo_id)),
_ => None,
})
.expect("expected ConversationStarted");
for event in &events {
if let Event::MessageReceived { content, .. } = event {
println!("Raya received: {:?}", std::str::from_utf8(content).unwrap());
}
}
let raya_convo_id: ConversationIdOwned = Arc::from(content.conversation_id.as_str());
raya.send_message(&raya_convo_id, b"hi saro").unwrap();
let raw = cursor.next().unwrap();
let content = saro.receive(&raw).unwrap().unwrap();
println!(
"Saro received: {:?}",
std::str::from_utf8(&content.data).unwrap()
);
let events = saro.receive(&raw).unwrap();
for event in &events {
if let Event::MessageReceived { content, .. } = event {
println!("Saro received: {:?}", std::str::from_utf8(content).unwrap());
}
}
println!("Message exchange complete.");
}

View File

@ -1,11 +1,12 @@
use libchat::{
AddressedEnvelope, ChatError, ChatStorage, ContentData, Context, ConversationIdOwned,
DeliveryService, Introduction, StorageConfig,
AddressedEnvelope, ChatError, ChatStorage, Context, ConversationIdOwned, ConversationKind,
DeliveryService, InboundResult, Introduction, StorageConfig,
};
use components::EphemeralRegistry;
use crate::errors::ClientError;
use crate::event::{ConversationClass, Event};
pub struct ChatClient<D: DeliveryService> {
ctx: Context<D, EphemeralRegistry, ChatStorage>,
@ -74,13 +75,11 @@ impl<D: DeliveryService + 'static> ChatClient<D> {
self.dispatch_all(envelopes)
}
/// Decrypt an inbound payload. Returns `Some(ContentData)` for user
/// content, `None` for protocol frames.
pub fn receive(
&mut self,
payload: &[u8],
) -> Result<Option<ContentData>, ClientError<D::Error>> {
self.ctx.handle_payload(payload).map_err(Into::into)
/// Decrypt an inbound payload. Returns the events the payload produced,
/// in causal order. May be empty for protocol-only frames.
pub fn receive(&mut self, payload: &[u8]) -> Result<Vec<Event>, ClientError<D::Error>> {
let result = self.ctx.handle_payload(payload)?;
Ok(events_from_inbound(result))
}
fn dispatch_all(
@ -94,3 +93,41 @@ impl<D: DeliveryService + 'static> ChatClient<D> {
Ok(())
}
}
/// Walk an [`InboundResult`] in causal order and emit one `Event` per
/// observation. The structural ordering of `InboundResult` (new conversation
/// before frame contents) determines the order of events here.
fn events_from_inbound(result: InboundResult) -> Vec<Event> {
let mut events = Vec::with_capacity(
usize::from(result.new_conversation.is_some()) + result.frame.messages.len(),
);
if let Some(nc) = result.new_conversation
&& let Some(class) = class_from_kind(&nc.kind)
{
events.push(Event::ConversationStarted {
convo_id: nc.convo_id,
class,
});
}
for msg in result.frame.messages {
events.push(Event::MessageReceived {
convo_id: msg.convo_id,
content: msg.content,
});
}
events
}
/// Map a core [`ConversationKind`] to the coarse app-facing
/// [`ConversationClass`]. The exhaustive match means a new
/// `ConversationKind` variant becomes a compile error here, forcing a
/// deliberate mapping decision rather than silently misclassifying it.
/// `Unknown(_)` yields `None`: the client does not surface conversations
/// whose protocol kind cannot be safely classified for the application.
fn class_from_kind(kind: &ConversationKind) -> Option<ConversationClass> {
match kind {
ConversationKind::PrivateV1 => Some(ConversationClass::Private),
ConversationKind::GroupV1 => Some(ConversationClass::Group),
ConversationKind::Unknown(_) => None,
}
}

View File

@ -0,0 +1,37 @@
//! Application-facing chat events.
//!
//! Each variant of [`Event`] describes one observable thing the application
//! cares about: a new conversation has appeared, a message was decrypted on
//! an existing one, and so on. The enum is `#[non_exhaustive]` so new
//! variants can be added without breaking exhaustive matches in dependent
//! crates.
use libchat::ConversationIdOwned;
/// A discrete chat event.
#[non_exhaustive]
#[derive(Debug, Clone)]
pub enum Event {
/// A new conversation has appeared.
ConversationStarted {
convo_id: ConversationIdOwned,
class: ConversationClass,
},
/// User content arrived on an existing conversation.
MessageReceived {
convo_id: ConversationIdOwned,
content: Vec<u8>,
},
}
/// Coarse classification of a conversation, intended as a UI/UX hint.
///
/// Decoupled from the core's protocol-versioned kinds: future versions of
/// an existing class (e.g. a `PrivateV2`) map to the same variant here.
/// New variants are reserved for fundamentally different conversation
/// shapes and are intentionally breaking when added.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConversationClass {
Private,
Group,
}

View File

@ -1,12 +1,12 @@
mod client;
mod delivery_in_process;
mod errors;
mod event;
pub use client::ChatClient;
pub use delivery_in_process::{Cursor, InProcessDelivery, MessageBus};
pub use errors::ClientError;
pub use event::{ConversationClass, Event};
// Re-export types callers need to interact with ChatClient
pub use libchat::{
AddressedEnvelope, ContentData, ConversationIdOwned, DeliveryService, StorageConfig,
};
// Re-export types callers need to interact with ChatClient.
pub use libchat::{AddressedEnvelope, ConversationIdOwned, DeliveryService, StorageConfig};

View File

@ -1,14 +1,31 @@
use logos_chat::{
ChatClient, ContentData, ConversationIdOwned, Cursor, InProcessDelivery, StorageConfig,
ChatClient, ConversationClass, ConversationIdOwned, Cursor, Event, InProcessDelivery,
StorageConfig,
};
use std::sync::Arc;
fn receive(receiver: &mut ChatClient<InProcessDelivery>, cursor: &mut Cursor) -> ContentData {
/// Pulls one envelope, decrypts, and returns the events emitted.
fn receive(receiver: &mut ChatClient<InProcessDelivery>, cursor: &mut Cursor) -> Vec<Event> {
let raw = cursor.next().expect("expected envelope");
receiver
.receive(&raw)
.expect("receive failed")
.expect("expected content")
receiver.receive(&raw).expect("receive failed")
}
fn expect_message(event: &Event) -> (&ConversationIdOwned, &[u8]) {
match event {
Event::MessageReceived {
convo_id, content, ..
} => (convo_id, content.as_slice()),
other => panic!("expected MessageReceived, got {other:?}"),
}
}
fn expect_conversation_started(event: &Event) -> (&ConversationIdOwned, ConversationClass) {
match event {
Event::ConversationStarted {
convo_id, class, ..
} => (convo_id, *class),
other => panic!("expected ConversationStarted, got {other:?}"),
}
}
#[test]
@ -24,27 +41,39 @@ fn saro_raya_message_exchange() {
.create_conversation(&raya_bundle, b"hello raya")
.unwrap();
let content = receive(&mut raya, &mut cursor);
assert_eq!(content.data, b"hello raya");
assert!(content.is_new_convo);
let raya_convo_id: ConversationIdOwned = Arc::from(content.conversation_id.as_str());
let events = receive(&mut raya, &mut cursor);
assert_eq!(
events.len(),
2,
"expected ConversationStarted + MessageReceived"
);
let (started_id, class) = expect_conversation_started(&events[0]);
assert_eq!(class, ConversationClass::Private);
let (msg_id, content) = expect_message(&events[1]);
assert_eq!(content, b"hello raya");
assert_eq!(started_id, msg_id);
let raya_convo_id: ConversationIdOwned = Arc::clone(started_id);
raya.send_message(&raya_convo_id, b"hi saro").unwrap();
let content = receive(&mut saro, &mut cursor);
assert_eq!(content.data, b"hi saro");
assert!(!content.is_new_convo);
let events = receive(&mut saro, &mut cursor);
assert_eq!(events.len(), 1);
let (_, content) = expect_message(&events[0]);
assert_eq!(content, b"hi saro");
for i in 0u8..5 {
let msg = format!("msg {i}");
saro.send_message(&saro_convo_id, msg.as_bytes()).unwrap();
let content = receive(&mut raya, &mut cursor);
assert_eq!(content.data, msg.as_bytes());
let events = receive(&mut raya, &mut cursor);
assert_eq!(events.len(), 1);
let (_, content) = expect_message(&events[0]);
assert_eq!(content, msg.as_bytes());
let reply = format!("reply {i}");
raya.send_message(&raya_convo_id, reply.as_bytes()).unwrap();
let content = receive(&mut saro, &mut cursor);
assert_eq!(content.data, reply.as_bytes());
let events = receive(&mut saro, &mut cursor);
assert_eq!(events.len(), 1);
let (_, content) = expect_message(&events[0]);
assert_eq!(content, reply.as_bytes());
}
assert_eq!(saro.list_conversations().unwrap().len(), 1);

View File

@ -5,272 +5,55 @@
| Status | Accepted |
| Issue | https://github.com/logos-messaging/libchat/issues/97 |
| Date | 2026-05-19 |
| Last revised | 2026-05-25 |
## Context and Problem
Applications currently learn about new conversations from an `is_new_convo: bool` flag on `ContentData` (`core/conversations/src/types.rs:16-20`). Two problems:
Applications must observe several kinds of things produced by the chat library: new conversations appearing from peer-initiated handshakes, decrypted messages on existing conversations, and further protocol observations (group membership changes, reliability signals). These observations are not coupled — an MLS group welcome creates a new conversation with no initial content; a single inbound payload can yield multiple observations; some observations (delivery timeouts from background retry work) have no synchronous trigger at all and must reach the application after the call that might have caused them has long since returned.
1. The flag overloads `ContentData`: protocol metadata is smuggled through a content carrier.
2. The flag assumes every new conversation carries an initial content frame. Protocols such as MLS allow a conversation to begin without one; in that case `handle_payload` returns `None` and the application never observes the new conversation.
Issue #97 calls for a proper event system that can signal new conversations, delivery receipts, and reliability failures — without piggy-backing on content — and that provides a clear path for adding new event types later.
This ADR specifies the layered design of the event system and how events reach the application.
Issue #97 captures the requirement for an observation surface that does not piggy-back on content, accommodates both sync-triggered and background-triggered observations uniformly, and crosses the FFI boundary cleanly.
## Decision Drivers
- **Simplicity of the core.** Fully synchronous and caller-driven: no background work, no callbacks out. External effects are performed through services injected as method parameters.
- **Extensibility.** A new event type is a localised change (one enum variant, one emit site) that does not break existing consumers.
- **FFI compatibility.** Must remain expressible through the existing `safer-ffi` boundary in `crates/client-ffi`. Event payloads are limited to owned, concrete data (bytes, strings, identifiers) — no closures, generics, or non-`'static` references.
- **Simplicity of the core.** Fully synchronous and caller-driven: no background work, no callbacks out. External effects flow through services injected as method parameters.
- **Asynchronous delivery at the client.** Applications consume events on their own schedule. Observations from sync-triggered processing and observations from background work share a single delivery surface, so the application sees one notification stream and does not care which path produced any given event.
- **FFI compatibility.** Payloads crossing the `safer-ffi` boundary in `crates/client-ffi` are limited to owned, concrete data — no closures, generics, or non-`'static` references — so any delivery mechanism must degrade to a sync drain on that side.
## Architecture
The library is organised in three layers. Calls flow downward; events flow upward.
Three layers. Calls flow downward. Sync results return through method returns; events reach the application asynchronously through a channel.
```mermaid
flowchart TB
A["<b>app</b><br/>UI/UX layer<br/>drives the event loop"]
B["<b>client</b><br/>owns services<br/>runs background threads"]
C["<b>core</b><br/>strict sync, caller-driven"]
A["<b>app</b><br/>drains Receiver&lt;Event&gt;"]
B["<b>client</b><br/>owns transport poller + services<br/>translates InboundResult → Event values<br/>pushes onto channel"]
C["<b>core</b><br/>strict sync, caller-driven<br/>returns InboundResult"]
A -- "method calls" --> B
B -- "method calls" --> C
C -.->|"events (from method returns)"| B
B -.->|"events (sync + background)"| A
C -.->|"InboundResult<br/>(sync method return)"| B
B == "Event (async channel)" ==> A
```
Crates: **app**`bin/chat-cli`, future `logos-chat-module`; **client**`crates/client`, `crates/client-ffi`; **core**`core/conversations` and friends in libchat.
## Design
### Core layer
#### Constraints
- Strict sync, single-threaded.
- No background work, timers, or internal queues.
- External effects (delivery; future registration / identity lookups) are performed through services injected as method parameters.
#### Approach
Methods receive the services they need and call them directly. Observations (events) are returned so the caller can surface them upward:
```rust
impl<S: ChatStore> Context<S> {
pub fn handle_payload<D: DeliveryService>(
&mut self,
delivery: &mut D,
payload: &[u8],
) -> Result<Vec<Event>, ChatError>;
pub fn send_content<D: DeliveryService>(
&mut self,
delivery: &mut D,
convo: ConversationId,
content: &[u8],
) -> Result<Vec<Event>, ChatError>;
pub fn create_private_convo<D: DeliveryService>(
&mut self,
delivery: &mut D,
intro: &Introduction,
content: &[u8],
) -> Result<(ConversationIdOwned, Vec<Event>), ChatError>;
}
```
### Client layer
#### Responsibility split
The client owns the concrete service implementations (delivery, future registration, identity), polls the transport on a background thread, and processes inbound bytes by calling into the core. The application invokes client methods and consumes events; raw transport bytes (encrypted envelopes off the wire) are handled entirely inside the client.
#### Constraints
- Owns the concrete service implementations and injects them into core method calls.
- Events from synchronous calls flow through the method's return type, inherited from the core.
- Polls the transport on a background thread and feeds inbound payloads into the core.
- May spawn additional background threads (e.g. for timer-driven retries).
- Background threads emit events that no caller-invoked method can return — for example `DeliveryFailed { reason: Timeout }`.
#### Common shape (all options)
The client invokes core methods with its services; the core publishes envelopes directly through the injected delivery service. Only events flow back as return values.
```rust
impl<D: DeliveryService> ChatClient<D> {
pub fn send_message(&mut self, convo: &ConversationIdOwned, content: &[u8])
-> Result<Vec<Event>, ClientError<D::Error>>; // sync events from this send
// Background events (including those from inbound payload processing) reach the
// application through one of the three mechanisms below.
}
```
The three options differ only in how background events reach the application.
#### Option A — internal poll queue
The client owns a `Mutex<VecDeque<Event>>`. Background threads push to it; the application drains via two new methods.
```rust
impl<D: DeliveryService> ChatClient<D> {
pub fn poll_event(&mut self) -> Option<Event>;
pub fn drain_events(&mut self) -> Vec<Event>;
}
```
Prior art: mio's `Events` (per-`Poll` instance, drained by the caller); rdkafka's `Consumer::poll` (background thread fills a queue, caller polls — same domain).
**Pros**
- Single primitive (mutex-protected queue) with no new dependencies.
- FFI mapping is direct: `client_poll_event` returns an opaque `Option<Event>`, mirroring the existing `PushInboundResult` shape (`crates/client-ffi/src/api.rs:49-55`).
- Matches the existing chat-cli tick-loop consumer pattern (`bin/chat-cli/src/app.rs:144-180`).
**Cons**
- Requires the application to drain after every operation; events accumulate if it forgets.
- Adds shared mutable state (`Mutex<VecDeque>`) inside the client; the queue must be bounded with explicit overflow handling.
#### Option B — channel handed to the caller (selected)
The client's constructor returns a `Receiver<Event>` alongside the client handle. Background threads hold a `Sender<Event>` clone; the application reads from the receiver.
```rust
let (client, events): (ChatClient<_>, Receiver<Event>) =
ChatClient::new(name, delivery);
```
Prior art: most Rust networking libraries; `std::sync::mpsc`, `crossbeam-channel`, `flume`.
**Pros**
- Channels are the canonical multi-producer/single-consumer primitive in the standard library; the shape is idiomatic in pure Rust.
- The application can park in `recv()` from a worker thread, integrate with `select!`, or later swap to `tokio::sync::mpsc` for an async wrapper.
- Mirrors the inbound-bytes channel chat-cli already uses (`bin/chat-cli/src/app.rs:46`).
**Cons**
- `Receiver<T>` is not `#[repr(C)]` and cannot cross `safer-ffi` cleanly. The FFI layer must expose a drain function regardless, collapsing Option B into Option A at the boundary.
- Forces a channel-crate choice (`std::sync::mpsc`, `crossbeam-channel`, or `flume`).
#### Option C — callback registered at construction
The application registers a closure at construction; background threads invoke it directly when events arise.
```rust
type EventFn = Box<dyn Fn(&Event) + Send + 'static>;
impl ChatClient<D> {
pub fn new(name: &str, delivery: D, on_event: EventFn) -> Self;
}
```
Prior art: the existing FFI `DeliverFn` callback at `client_create` (`crates/client-ffi/src/delivery.rs:8-15`); `tracing::Subscriber`; GTK signals.
**Pros**
- The codebase already establishes this pattern for outbound delivery; events would extend a familiar contract.
- FFI mapping is direct: register an `EventFn` function pointer at `client_create`.
- No internal queue or `Mutex` to maintain.
**Cons**
- The callback fires on the background thread. UI-style consumers (ratatui, GUI toolkits) cannot update state from threads other than the main loop thread and will bridge the callback into a thread-local queue — effectively re-implementing Option A in user code.
- The closure must be `Send + 'static`; capturing application state requires `Arc<Mutex<…>>` or a channel back to the application.
- Sync events arrive on the caller's thread; background events arrive on the background thread. The handler must be correct in both threading contexts, or the callback must forward to the main thread (collapsing into Option A).
#### Comparison
| Criterion | A: poll queue | B: channel | C: callback |
|---|---|---|---|
| Background events delivered via | `poll_event` / `drain_events` | `Receiver<Event>` | direct `Fn(&Event)` invocation |
| FFI fit (`safer-ffi`) | Native opaque + accessors | Degrades to Option A at the boundary | Native function pointer (matches `DeliverFn`) |
| New dependencies | None | None (with `std::sync::mpsc`); otherwise `crossbeam-channel` or `flume` | None |
| Internal state required | `Mutex<VecDeque<Event>>` | Channel internals | None |
| Thread on which the application observes the event | Application thread (next drain) | Application thread (next drain) | Background thread |
| Bridges naturally to UI thread | Yes | Yes | No (requires re-bridging) |
| Backpressure if the application is slow | Client-side queue buffers; bounded with overflow handling | Channel buffers; bound configurable | No buffer; slow callbacks block the background thread |
| Future `Stream` adapter | Wrap `poll_event` in a `Stream` | Swap to async channel (native) | Bridge callback into a channel, then `Stream` |
### App layer
The application drives the event loop. With Option B (selected), each tick drains the `Receiver<Event>` handed back at client construction:
```rust
pub fn tick(&mut self) -> Result<()> {
for event in self.events.try_iter() {
self.handle_event(event);
}
Ok(())
}
```
For reference, Option A would replace `self.events.try_iter()` with `self.client.drain_events()`. Option C moves the drain out of the tick — into the callback — and the callback typically forwards into an application-side channel that is drained on each tick anyway.
## Event Taxonomy
The same `Event` enum is shared across all three client options.
```rust
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum Event {
#[non_exhaustive]
ConversationStarted {
conversation_id: ConversationIdOwned,
},
#[non_exhaustive]
MessageReceived {
conversation_id: ConversationIdOwned,
data: Vec<u8>,
},
#[non_exhaustive]
DeliveryReceipt {
conversation_id: ConversationIdOwned,
envelope_id: EnvelopeId,
},
#[non_exhaustive]
DeliveryFailed {
conversation_id: ConversationIdOwned,
envelope_id: EnvelopeId,
reason: FailureReason,
},
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum FailureReason {
Transport, // synchronous transport error on publish
PeerRejected, // peer signalled rejection (future protocol work)
Timeout, // no receipt within the retry window
}
```
`#[non_exhaustive]` on the enum permits new variants; on each struct variant it permits new fields. Both are additive minor-release changes. Future variants (`ConversationRekeyed`, `ParticipantJoined`, `PresenceChanged`, transport health, key-rotation reminders, …) follow this rule.
Mapping of variants to emit sites:
| Variant | Emitted from |
|---|---|
| `ConversationStarted` (responder side) | `core/conversations/src/inbox/handler.rs:155-162` (replaces `is_new_convo: true`) |
| `MessageReceived` | `core/conversations/src/conversation/privatev1.rs:184-191` (replaces `is_new_convo: false`) |
| `DeliveryReceipt` | `Context::handle_payload` when decoding a `PrivateV1Frame::Receipt` (future protocol work) |
| `DeliveryFailed { Transport }` | Core method that invoked `delivery.publish` and observed a synchronous error |
| `DeliveryFailed { Timeout }` | Client's background retry thread |
Events are the uniform observation channel: they carry both observations the call itself caused (e.g. a sync `DeliveryFailed { Transport }` from `send_content`) and observations from background work (e.g. `DeliveryFailed { Timeout }` from the retry thread). The only thing kept outside `Vec<Event>` is an obvious primary result the caller will use immediately — returned directly for ergonomics. This is why the initiator side does not emit `ConversationStarted`: `create_private_convo` returns the new `ConversationIdOwned` directly as part of its return value.
## Decisions
1. **Sync at the client layer for now.** The core stays sync; the client also stays sync. Migrating to async later is non-structural — `std::sync::mpsc::Receiver<Event>` swaps to `tokio::sync::mpsc::Receiver<Event>` and gains an `impl Stream` shape without changing the chosen mechanism (point 2). Option A would migrate to a `Stream` over a notify primitive; Option C to an `async fn` callback.
1. **Core returns `InboundResult`, a structural result type.** One field per kind of observation a payload can produce: an optional new conversation, plus a `FrameOutcome` carrying everything a per-conversation frame processor yields. The structural shape encodes causality (a new conversation is logically prior to anything that happens inside it), so a wrong ordering of observations cannot be represented in the type. `FrameOutcome` exists as a separate type because `Convo::handle_frame` cannot create a conversation; embedding it inside `InboundResult` keeps each return type producing only what its source can populate.
2. **Consumer pattern: Option B — channel handed to the caller.** Different consumer archetypes could favour different shapes — a polling UI loop suits Option A; a low-latency push-driven consumer (toast notifications, daemons) suits Option C — but Option B is preferred: it is the most Rust-idiomatic of the three, has few drawbacks compared to A or C, and offers the smoothest path to async (point 1).
2. **`Event` is an asynchronous notification.** The client's constructor returns a `Receiver<Event>` alongside the client handle. A background poller drives the transport, calls into the core for each inbound payload, translates the resulting `InboundResult` into one event per observation, and pushes them onto the channel. Background work that has no synchronous trigger at all (delivery retry timeouts, future protocol timers) pushes onto the same channel.
## Event flow
3. **Two enums, mapping at the client boundary.** `InboundResult` is the structural sum of observations from one payload; `Event` is a discrete app-facing notification. The two enums are allowed to diverge: a protocol-internal observation the app does not need lives only on `FrameOutcome`; a client-only event like `DeliveryFailed { Timeout }` lives only on `Event`. Translation is an explicit per-variant `match` inside the client — not a blanket `From` impl — to preserve that divergence as both sides grow.
A worked example of the decisions above. Two flows cover everything the application observes: a synchronous send initiated by the app, and a background inbound carried by the client's transport poller.
## Events vs errors
Events are asynchronous notifications: things the application learns after the call that might have triggered them has returned. They cross thread boundaries through the channel.
Synchronous failures — publish, parse, store, MLS — stay on `Result<_, ChatError>` on the call that triggered them. They are never events. `DeliveryFailed { reason }` is therefore an event by construction: only background work can raise it, after the original send already returned `Ok`.
## Sequence
Two flows cover everything the application observes: a synchronous send initiated by the app, and inbound bytes carried by the client's transport poller.
```mermaid
sequenceDiagram
@ -280,36 +63,22 @@ sequenceDiagram
participant Core
participant Delivery as DeliveryService
Note over App,Delivery: Outbound — synchronous send initiated by the app
Note over App,Delivery: Outbound — synchronous send
App->>Client: send_message(convo, content)
Client->>Core: send_content(&mut delivery, ...)
Client->>Core: send_content(...)
Core->>Delivery: publish(envelope)
Delivery-->>Core: Ok / Err
Core-->>Client: Ok(Vec<Event>)
Client-->>App: Ok(Vec<Event>)
Core-->>Client: Ok(()) / Err
Client-->>App: Ok(()) / Err
Note over Poller,Delivery: Inbound — background poll loop in the client
Poller->>Poller: poll tick
Note over Poller,Delivery: Inbound — background poller pushes events
Poller->>Delivery: poll
Delivery-->>Poller: payload bytes
Poller->>Core: handle_payload(&mut delivery, payload)
Core-->>Poller: Ok(vec![MessageReceived, ...])
Poller-)App: event via Receiver<Event>
Poller->>Core: handle_payload(payload)
Core-->>Poller: Ok(InboundResult)
Poller->>Poller: translate fields → Event values
Poller-)App: events via Receiver<Event>
Note over App: Next tick — drain the channel
App->>App: for event in events.try_iter() { handle_event(event) }
Note over App: App drains on its own schedule
App->>App: for event in receiver.try_iter() { handle(event) }
```
## References
### Source references
- `core/conversations/src/types.rs:9-20` — current `ContentData` and `AddressedEnvelope`
- `core/conversations/src/context.rs:138-185``Context::handle_payload` (core inbound entry)
- `core/conversations/src/inbox/handler.rs:124-167` — inbox handshake handler (current `is_new_convo` set site)
- `core/conversations/src/conversation/privatev1.rs:184-191, 219-260` — private-conversation handler
- `crates/client/src/client.rs:60-92``ChatClient` public surface
- `crates/client/src/delivery.rs``DeliveryService` trait
- `crates/client-ffi/src/api.rs:49-55, 220-285` — current FFI inbound result shape
- `crates/client-ffi/src/delivery.rs:8-15` — existing FFI callback pattern (`DeliverFn`)
- `bin/chat-cli/src/app.rs:46, 144-180` — current application consumption pattern