feat: introduce client event system (#106)

* chore(flake): accept extra system attr; add perl for openssl-sys build

forAllSystems calls the lambda with {system, pkgs}; strict
destructuring requires `..` to ignore the system attribute.

`pkgs.perl` is needed because openssl-sys is pulled vendored via
libsqlite3-sys / rusqlite / chat-sqlite, and its `perl Configure`
step needs FindBin.pm, which Fedora's system perl doesn't ship.

* feat: introduce client event system

- Core processing yields a `PayloadOutcome` enum — `Empty`, `Convo`, or
  `Inbox`. `ConvoOutcome` carries a conversation id and an optional
  decrypted `Content`; `InboxOutcome` adds a `NewConversation`
  (id + `ConversationClass`) for a peer-initiated conversation.
- Client translates `PayloadOutcome` into app-facing `Vec<Event>`
  (`ConversationStarted`, `MessageReceived`) at the boundary, so the
  application loop sees discrete events rather than core types.
- MLS group welcomes produce a `ConversationStarted` event with no
  initial content, fixing the silent-group-join case where the inbox
  layer dropped the observation.
- C FFI exposes an `EventList` opaque type with indexed accessors and
  an `Invalid` sentinel for out-of-bounds / non-applicable reads.
- Symmetric `Inbox` / `InboxV2` handlers: both return
  `Result<InboxOutcome, _>` and own the persistence + ephemeral-key
  cleanup for the conversations they create.
- Updated and simplified `docs/adr/0001-client-event-system.md`.

* chore(flake): bump nixpkgs to nixos-unstable-small

Temporary. The two crates.io UA fixes (NixOS/nixpkgs#512735 for
fetchCargoVendor's python-requests UA, NixOS/nixpkgs#524985 for
importCargoLock's curl UA) haven't propagated to nixos-unstable yet.
Switch to nixos-unstable-small and force logos-delivery to follow so
the smoketest gets the same fix. Revert once nixos-unstable catches up.

Refs:
- https://github.com/rust-lang/crates.io/issues/13482
- https://github.com/rust-lang/crates.io/issues/13783
- https://crates.io/data-access
This commit is contained in:
osmaczko 2026-05-28 23:51:15 +02:00 committed by GitHub
parent 279477cdeb
commit c677cc9334
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 851 additions and 712 deletions

View File

@ -73,10 +73,15 @@ jobs:
working-directory: crates/client-ffi/examples/message-exchange
- name: Build logos-delivery
run: nix build .#logos-delivery
# Build and run chat-cli through the dev shell so it links against the
# same Nix glibc as the prebuilt liblogosdelivery.so. A plain `cargo
# build` uses the runner's system glibc, which is older than Nix's and
# mismatches it at runtime (libc.so.6: version `GLIBC_ABI_DT_X86_64_PLT'
# not found, required by Nix glibc's libm.so.6).
- name: Build chat-cli (logos-delivery)
run: LOGOS_DELIVERY_LIB_DIR=./result/lib cargo build --release -p chat-cli
run: nix develop -c bash -c 'LOGOS_DELIVERY_LIB_DIR=./result/lib cargo build --release -p chat-cli'
- name: Run chat-cli smoketest
run: ./target/release/chat-cli --name ci-test --smoketest
run: nix develop -c ./target/release/chat-cli --name ci-test --smoketest
nix-build:
name: Nix Build

2
.gitignore vendored
View File

@ -39,4 +39,4 @@ result
crates/client-ffi/client_ffi.h
# Compiled C FFI example binary
examples/c-ffi/c-client
crates/client-ffi/examples/message-exchange/c-client

View File

@ -5,7 +5,7 @@ use std::sync::mpsc;
use anyhow::Result;
use arboard::Clipboard;
use logos_chat::{ChatClient, ConversationIdOwned, DeliveryService};
use logos_chat::{ChatClient, DeliveryService, Event};
use serde::{Deserialize, Serialize};
use crate::utils::now;
@ -144,41 +144,57 @@ impl<D: DeliveryService + 'static> ChatApp<D> {
pub fn process_incoming(&mut self) -> Result<()> {
while let Ok(payload) = self.inbound.try_recv() {
match self.client.receive(&payload) {
Ok(Some(content)) => {
let chat_id = &content.conversation_id;
if !self.state.chats.contains_key(chat_id) && content.is_new_convo {
let session = ChatSession {
chat_id: chat_id.clone(),
nickname: None,
messages: Vec::new(),
};
self.state.chats.insert(chat_id.clone(), session);
let label = chat_id[..8.min(chat_id.len())].to_string();
self.set_active_chat(Some(chat_id.clone()));
self.status = format!("New chat ({label})! Use /nickname to name it.");
Ok(events) => {
for event in events {
self.handle_event(event);
}
if !content.data.is_empty() {
let text = String::from_utf8_lossy(&content.data).to_string();
if let Some(session) = self.state.chats.get_mut(chat_id) {
session.messages.push(DisplayMessage {
from_self: false,
content: text,
timestamp: now(),
});
}
}
self.save_state()?;
}
Ok(None) => {}
Err(e) => tracing::warn!("receive error: {e:?}"),
Err(e) => {
tracing::warn!("receive error: {e:?}");
self.status = format!("Could not decrypt incoming message: {e}");
}
}
}
Ok(())
}
fn handle_event(&mut self, event: Event) {
match event {
Event::ConversationStarted { convo_id, .. } => {
let chat_id = convo_id.to_string();
if self.state.chats.contains_key(&chat_id) {
return;
}
self.state.chats.insert(
chat_id.clone(),
ChatSession {
chat_id: chat_id.clone(),
nickname: None,
messages: Vec::new(),
},
);
let label = &chat_id[..8.min(chat_id.len())];
self.status = format!("New chat ({label})! Use /nickname to name it.");
self.set_active_chat(Some(chat_id));
}
Event::MessageReceived {
convo_id, content, ..
} => {
let chat_id = convo_id.to_string();
let Some(session) = self.state.chats.get_mut(&chat_id) else {
return;
};
session.messages.push(DisplayMessage {
from_self: false,
content: String::from_utf8_lossy(&content).into_owned(),
timestamp: now(),
});
}
_ => {}
}
}
pub fn send_message(&mut self, content: &str) -> Result<()> {
let chat_id = self
.state
@ -186,10 +202,8 @@ impl<D: DeliveryService + 'static> ChatApp<D> {
.clone()
.ok_or_else(|| anyhow::anyhow!("No active chat. Use /connect or /switch first."))?;
let convo_id: ConversationIdOwned = chat_id.as_str().into();
self.client
.send_message(&convo_id, content.as_bytes())
.send_message(&chat_id, content.as_bytes())
.map_err(|e| anyhow::anyhow!("{e:?}"))?;
if let Some(session) = self.state.chats.get_mut(&chat_id) {
@ -253,12 +267,11 @@ impl<D: DeliveryService + 'static> ChatApp<D> {
return Ok(Some("Usage: /connect <bundle>".to_string()));
}
let initial = format!("Hello from {}!", self.user_name);
let convo_id = self
let chat_id = self
.client
.create_conversation(args.as_bytes(), initial.as_bytes())
.map_err(|e| anyhow::anyhow!("{e:?}"))?;
let chat_id = convo_id.to_string();
let label = chat_id[..8.min(chat_id.len())].to_string();
let mut session = ChatSession {
chat_id: chat_id.clone(),

View File

@ -1,5 +1,4 @@
use std::cell::{Ref, RefMut};
use std::sync::Arc;
use std::{cell::RefCell, rc::Rc};
use crate::account::LogosAccount;
@ -8,17 +7,18 @@ use crate::conversation::{Convo, GroupConvo};
use crate::{DeliveryService, RegistrationService};
use crate::{
conversation::{Conversation, Id, PrivateV1Convo},
conversation::{Id, PrivateV1Convo},
errors::ChatError,
inbox::Inbox,
inbox_v2::InboxV2,
outcomes::{ConvoOutcome, InboxOutcome, PayloadOutcome},
proto::{EncryptedPayload, EnvelopeV1, Message},
types::{AccountId, AddressedEnvelope, ContentData},
types::{AccountId, AddressedEnvelope},
};
use crypto::{Identity, PublicKey};
use storage::{ChatStore, ConversationKind};
pub use crate::conversation::{ConversationId, ConversationIdOwned};
pub use crate::conversation::ConversationId;
pub use crate::inbox::Introduction;
// This is the main entry point to the conversations api.
@ -163,7 +163,7 @@ where
&mut self,
remote_bundle: &Introduction,
content: &[u8],
) -> Result<(ConversationIdOwned, Vec<AddressedEnvelope>), ChatError> {
) -> Result<(ConversationId, Vec<AddressedEnvelope>), ChatError> {
let (mut convo, payloads) = self
.inbox
.invite_to_private_convo(remote_bundle, content, Rc::clone(&self.store))
@ -198,12 +198,9 @@ where
Ok(Box::new(convo))
}
pub fn list_conversations(&self) -> Result<Vec<ConversationIdOwned>, ChatError> {
pub fn list_conversations(&self) -> Result<Vec<ConversationId>, ChatError> {
let records = self.store.borrow().load_conversations()?;
Ok(records
.into_iter()
.map(|r| Arc::from(r.local_convo_id.as_str()))
.collect())
Ok(records.into_iter().map(|r| r.local_convo_id).collect())
}
pub fn take_missing_messages(&self) -> Vec<MissingMessage> {
@ -212,7 +209,7 @@ where
pub fn send_content(
&mut self,
convo_id: ConversationId,
convo_id: &str,
content: &[u8],
) -> Result<Vec<AddressedEnvelope>, ChatError> {
let mut convo = self.load_convo(convo_id)?;
@ -225,62 +222,44 @@ where
}
// Decode bytes and send to protocol for processing.
pub fn handle_payload(&mut self, payload: &[u8]) -> Result<Option<ContentData>, ChatError> {
pub fn handle_payload(&mut self, payload: &[u8]) -> Result<PayloadOutcome, ChatError> {
let env = EnvelopeV1::decode(payload)?;
// TODO: Impl Conversation hinting
let convo_id = env.conversation_hint;
match convo_id {
c if c == self.inbox.id() => self.dispatch_to_inbox(&env.payload),
c if c == self.pq_inbox.id() => self.dispatch_to_inbox2(&env.payload),
c if c == self.inbox.id() => self.dispatch_to_inbox(&env.payload).map(Into::into),
c if c == self.pq_inbox.id() => self.dispatch_to_inbox2(&env.payload).map(Into::into),
c if self.store.borrow().has_conversation(&c)? => {
self.dispatch_to_convo(&c, &env.payload)
self.dispatch_to_convo(&c, &env.payload).map(Into::into)
}
_ => Ok(Some(ContentData {
conversation_id: "".into(),
data: vec![],
is_new_convo: false,
})),
_ => Ok(PayloadOutcome::Empty),
}
}
// Dispatch encrypted payload to Inbox, and register the created Conversation
fn dispatch_to_inbox(
&mut self,
enc_payload_bytes: &[u8],
) -> Result<Option<ContentData>, ChatError> {
// Dispatch encrypted payload to Inbox. The Inbox persists the newly
// created conversation and consumes the ephemeral key internally.
fn dispatch_to_inbox(&mut self, enc_payload_bytes: &[u8]) -> Result<InboxOutcome, ChatError> {
// EncryptedPayloads are not used by GroupConvos at this time, else this can be performed in `handle_payload`
// TODO: (P1) reconcile envelope parsing between Covno and GroupConvo
let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?;
let public_key_hex = Inbox::<CS>::extract_ephemeral_key_hex(&enc_payload)?;
let (convo, content) =
self.inbox
.handle_frame(enc_payload, &public_key_hex, Rc::clone(&self.store))?;
match convo {
Conversation::Private(mut convo) => convo.persist()?,
};
self.store
.borrow_mut()
.remove_ephemeral_key(&public_key_hex)?;
Ok(content)
self.inbox
.handle_frame(enc_payload, &public_key_hex, Rc::clone(&self.store))
}
// Dispatch encrypted payload to Inbox, and register the created Conversation
fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result<Option<ContentData>, ChatError> {
self.pq_inbox.handle_frame(payload)?;
Ok(None)
// Dispatch encrypted payload to the post-quantum inbox.
fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result<InboxOutcome, ChatError> {
self.pq_inbox.handle_frame(payload)
}
// Dispatch encrypted payload to its corresponding conversation
fn dispatch_to_convo(
&mut self,
convo_id: ConversationId,
convo_id: &str,
enc_payload_bytes: &[u8],
) -> Result<Option<ContentData>, ChatError> {
) -> Result<ConvoOutcome, ChatError> {
let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?;
let mut convo = self.load_convo(convo_id)?;
convo.handle_frame(enc_payload)
@ -291,15 +270,12 @@ where
Ok(intro.into())
}
pub fn get_convo(
&mut self,
convo_id: ConversationId,
) -> Result<Box<dyn GroupConvo<DS, RS>>, ChatError> {
pub fn get_convo(&mut self, convo_id: &str) -> Result<Box<dyn GroupConvo<DS, RS>>, ChatError> {
self.load_group_convo(convo_id)
}
/// Loads a conversation from DB by constructing it from metadata.
fn load_convo(&mut self, convo_id: ConversationId) -> Result<Box<dyn Convo>, ChatError> {
fn load_convo(&mut self, convo_id: &str) -> Result<Box<dyn Convo>, ChatError> {
let record = self
.store
.borrow()
@ -327,7 +303,7 @@ where
fn load_group_convo(
&mut self,
convo_id: ConversationId,
convo_id: &str,
) -> Result<Box<dyn GroupConvo<DS, RS>>, ChatError> {
let record = self
.store

View File

@ -3,23 +3,22 @@ mod privatev1;
use crate::{
DeliveryService,
outcomes::ConvoOutcome,
service_traits::KeyPackageProvider,
types::{AccountId, AddressedEncryptedPayload, ContentData},
types::{AccountId, AddressedEncryptedPayload},
};
use chat_proto::logoschat::encryption::EncryptedPayload;
use std::fmt::Debug;
use std::sync::Arc;
use storage::{ConversationKind, ConversationStore, RatchetStore};
use storage::ConversationKind;
pub use crate::errors::ChatError;
pub use group_v1::{GroupV1Convo, IdentityProvider};
pub use privatev1::PrivateV1Convo;
pub type ConversationId<'a> = &'a str;
pub type ConversationIdOwned = Arc<str>;
pub type ConversationId = String;
pub trait Id: Debug {
fn id(&self) -> ConversationId<'_>;
fn id(&self) -> &str;
}
pub trait Convo: Id + Debug {
@ -28,13 +27,10 @@ pub trait Convo: Id + Debug {
/// Decrypts and processes an incoming encrypted frame.
///
/// Returns `Ok(Some(ContentData))` if the frame contains user content,
/// `Ok(None)` for protocol frames (e.g., placeholders), or an error if
/// decryption or frame parsing fails.
fn handle_frame(
&mut self,
enc_payload: EncryptedPayload,
) -> Result<Option<ContentData>, ChatError>;
/// Returns the [`ConvoOutcome`] describing what the frame produced; its
/// `content` is `None` for protocol-only frames (placeholders, MLS
/// commits). Errors only on decryption or frame-parsing failure.
fn handle_frame(&mut self, enc_payload: EncryptedPayload) -> Result<ConvoOutcome, ChatError>;
fn remote_id(&self) -> String;
@ -49,7 +45,3 @@ pub trait GroupConvo<DS: DeliveryService, RS: KeyPackageProvider>: Convo {
// sends the payload directly.
fn send_content(&mut self, content: &[u8]) -> Result<(), ChatError>;
}
pub enum Conversation<S: ConversationStore + RatchetStore> {
Private(PrivateV1Convo<S>),
}

View File

@ -1,41 +0,0 @@
use crate::{
conversation::{ChatError, ConversationId, Convo, Id},
proto::EncryptedPayload,
types::{AddressedEncryptedPayload, ContentData},
};
#[derive(Debug)]
pub struct GroupTestConvo {}
impl GroupTestConvo {
pub fn new() -> Self {
Self {}
}
}
impl Id for GroupTestConvo {
fn id(&self) -> ConversationId<'_> {
// implementation
"grouptest"
}
}
impl Convo for GroupTestConvo {
fn send_message(
&mut self,
_content: &[u8],
) -> Result<Vec<AddressedEncryptedPayload>, ChatError> {
Ok(vec![])
}
fn handle_frame(
&mut self,
_encoded_payload: EncryptedPayload,
) -> Result<Option<ContentData>, ChatError> {
Ok(None)
}
fn remote_id(&self) -> String {
self.id().to_string()
}
}

View File

@ -20,9 +20,10 @@ use crate::causal_history::CausalHistoryStore;
use crate::types::AccountId;
use crate::{
DeliveryService,
conversation::{ChatError, ConversationId, Convo, GroupConvo, Id},
conversation::{ChatError, Convo, GroupConvo, Id},
outcomes::{Content, ConvoOutcome},
service_traits::KeyPackageProvider,
types::{AddressedEncryptedPayload, ContentData},
types::AddressedEncryptedPayload,
};
/// Provides the identity information needed to participate in an MLS group.
@ -264,7 +265,7 @@ where
DS: DeliveryService,
KP: KeyPackageProvider,
{
fn id(&self) -> ConversationId<'_> {
fn id(&self) -> &str {
&self.convo_id
}
}
@ -306,7 +307,7 @@ where
fn handle_frame(
&mut self,
encoded_payload: EncryptedPayload,
) -> Result<Option<ContentData>, ChatError> {
) -> Result<ConvoOutcome, ChatError> {
let bytes = match encoded_payload.encryption {
Some(encrypted_payload::Encryption::Plaintext(pt)) => pt.payload,
_ => {
@ -329,7 +330,7 @@ where
if protocol_message.epoch() < self.mls_group.epoch() {
// TODO: (P1) Add logging for messages arriving from past epoch.
return Ok(None);
return Ok(ConvoOutcome::empty(self.id().to_string()));
}
let processed = self
@ -337,27 +338,29 @@ where
.process_message(provider, protocol_message)
.map_err(ChatError::generic)?;
match processed.into_content() {
let content = match processed.into_content() {
ProcessedMessageContent::ApplicationMessage(msg) => {
let reliable = ReliablePayload::decode(msg.into_bytes().as_slice())?;
self.causal.on_receive(&self.convo_id, &reliable);
Ok(Some(ContentData {
conversation_id: hex::encode(self.mls_group.group_id().as_slice()),
data: reliable.content.to_vec(),
is_new_convo: false,
}))
Some(Content {
bytes: reliable.content.to_vec(),
})
}
ProcessedMessageContent::StagedCommitMessage(commit) => {
self.mls_group
.merge_staged_commit(provider, *commit)
.map_err(ChatError::generic)?;
Ok(None)
None
}
_ => {
// TODO: (P2) Log unknown message type
Ok(None)
None
}
}
};
Ok(ConvoOutcome {
convo_id: self.id().to_string(),
content,
})
}
fn remote_id(&self) -> String {

View File

@ -8,16 +8,16 @@ use chat_proto::logoschat::{
};
use crypto::{PrivateKey, PublicKey, SymmetricKey32};
use double_ratchets::{Header, InstallationKeyPair, RatchetState, restore_ratchet_state};
use prost::{Message, bytes::Bytes};
use std::{cell::RefCell, fmt::Debug, rc::Rc, sync::Arc};
use prost::{Message as _, bytes::Bytes};
use std::{cell::RefCell, fmt::Debug, rc::Rc};
use storage::{ConversationKind, ConversationMeta, ConversationStore};
use crate::{
context::ConversationIdOwned,
conversation::{ChatError, ConversationId, Convo, Id},
errors::EncryptionError,
outcomes::{Content, ConvoOutcome},
proto,
types::{AddressedEncryptedPayload, ContentData},
types::AddressedEncryptedPayload,
utils::timestamp_millis,
};
use double_ratchets::{to_ratchet_record, to_skipped_key_records};
@ -181,17 +181,8 @@ impl<S: ConversationStore + RatchetStore> PrivateV1Convo<S> {
Ok(PrivateV1Frame::decode(content_bytes.as_slice()).unwrap())
}
// Handler for application content
fn handle_content(&self, data: Vec<u8>) -> Option<ContentData> {
Some(ContentData {
conversation_id: self.id().into(),
data,
is_new_convo: false,
})
}
/// Persists a conversation's metadata and ratchet state to DB.
pub fn persist(&mut self) -> Result<ConversationIdOwned, ChatError> {
pub fn persist(&mut self) -> Result<ConversationId, ChatError> {
let convo_info = ConversationMeta {
local_convo_id: self.id().to_string(),
remote_convo_id: self.remote_id(),
@ -199,7 +190,7 @@ impl<S: ConversationStore + RatchetStore> PrivateV1Convo<S> {
};
self.store.borrow_mut().save_conversation(&convo_info)?;
self.save_ratchet_state(&mut *self.store.borrow_mut())?;
Ok(Arc::from(self.id()))
Ok(self.id().to_string())
}
pub fn save_ratchet_state<T: RatchetStore>(&self, storage: &mut T) -> Result<(), ChatError> {
@ -208,10 +199,16 @@ impl<S: ConversationStore + RatchetStore> PrivateV1Convo<S> {
storage.save_ratchet_state(&self.local_convo_id, &record, &skipped_keys)?;
Ok(())
}
fn handle_content(&self, bytes: Bytes) -> Content {
Content {
bytes: bytes.into(),
}
}
}
impl<S: ConversationStore + RatchetStore> Id for PrivateV1Convo<S> {
fn id(&self) -> ConversationId<'_> {
fn id(&self) -> &str {
&self.local_convo_id
}
}
@ -241,7 +238,7 @@ impl<S: ConversationStore + RatchetStore> Convo for PrivateV1Convo<S> {
fn handle_frame(
&mut self,
encoded_payload: EncryptedPayload,
) -> Result<Option<ContentData>, ChatError> {
) -> Result<ConvoOutcome, ChatError> {
// Extract expected frame
let frame = self
.decrypt(encoded_payload)
@ -253,13 +250,14 @@ impl<S: ConversationStore + RatchetStore> Convo for PrivateV1Convo<S> {
self.save_ratchet_state(&mut *self.store.borrow_mut())?;
// Handle FrameTypes
let output = match frame_type {
FrameType::Content(bytes) => self.handle_content(bytes.into()),
let content = match frame_type {
FrameType::Content(bytes) => Some(self.handle_content(bytes)),
FrameType::Placeholder(_) => None,
};
Ok(output)
Ok(ConvoOutcome {
convo_id: self.id().to_string(),
content,
})
}
fn remote_id(&self) -> String {

View File

@ -10,11 +10,12 @@ use storage::{ConversationStore, EphemeralKeyStore, RatchetStore};
use crypto::{PrekeyBundle, SymmetricKey32};
use crate::context::Introduction;
use crate::conversation::{ChatError, Conversation, ConversationId, Convo, Id, PrivateV1Convo};
use crate::conversation::{ChatError, Convo, Id, PrivateV1Convo};
use crate::crypto::{CopyBytes, PrivateKey, PublicKey};
use crate::inbox::handshake::InboxHandshake;
use crate::outcomes::{ConversationClass, InboxOutcome, NewConversation};
use crate::proto;
use crate::types::{AddressedEncryptedPayload, ContentData};
use crate::types::AddressedEncryptedPayload;
use crypto::Identity;
/// Compute the deterministic Delivery_address for an installation
@ -119,14 +120,18 @@ impl<S: EphemeralKeyStore> Inbox<S> {
Ok((convo, payloads))
}
/// Handles an incoming inbox frame. The caller must provide the ephemeral private key
/// looked up from storage. Returns the created conversation and optional content data.
/// Handles an incoming inbox frame. The caller must provide the ephemeral
/// private key hex looked up from storage. Persists the created
/// conversation and consumes the ephemeral key. Returns the
/// [`InboxOutcome`] describing what was observed — for a successful
/// invite, a `new_conversation` and the initial `ConvoOutcome` carrying
/// the first message.
pub fn handle_frame<PS: ConversationStore + RatchetStore>(
&self,
enc_payload: EncryptedPayload,
public_key_hex: &str,
private_store: Rc<RefCell<PS>>,
) -> Result<(Conversation<PS>, Option<ContentData>), ChatError> {
) -> Result<InboxOutcome, ChatError> {
let ephemeral_key = self
.store
.borrow()
@ -143,7 +148,7 @@ impl<S: EphemeralKeyStore> Inbox<S> {
let (seed_key, frame) =
self.perform_handshake(&ephemeral_key, header, handshake.payload)?;
match frame.frame_type.unwrap() {
let result = match frame.frame_type.unwrap() {
proto::inbox_v1_frame::FrameType::InvitePrivateV1(_invite_private_v1) => {
let mut convo =
PrivateV1Convo::new_responder(private_store, seed_key, &ephemeral_key);
@ -152,18 +157,31 @@ impl<S: EphemeralKeyStore> Inbox<S> {
return Err(ChatError::Protocol("missing initial encpayload".into()));
};
// Set is_new_convo for content data
let content = match convo.handle_frame(enc_payload)? {
Some(v) => ContentData {
is_new_convo: true,
..v
},
None => return Err(ChatError::Protocol("expected contentData".into())),
};
let initial = convo.handle_frame(enc_payload)?;
if initial.content.is_none() {
return Err(ChatError::Protocol(
"expected initial message in invite".into(),
));
}
Ok((Conversation::Private(convo), Some(content)))
let new_conversation = NewConversation {
convo_id: initial.convo_id.clone(),
class: ConversationClass::Private,
};
convo.persist()?;
InboxOutcome {
new_conversation,
initial: Some(initial),
}
}
}
};
self.store
.borrow_mut()
.remove_ephemeral_key(public_key_hex)?;
Ok(result)
}
/// Extracts the ephemeral key hex from an incoming encrypted payload
@ -250,7 +268,7 @@ impl<S: EphemeralKeyStore> Inbox<S> {
}
impl<S: EphemeralKeyStore> Id for Inbox<S> {
fn id(&self) -> ConversationId<'_> {
fn id(&self) -> &str {
&self.local_convo_id
}
}

View File

@ -7,6 +7,7 @@ use openmls::prelude::*;
use openmls_libcrux_crypto::Provider as LibcruxProvider;
use prost::{Message, Oneof};
use storage::ChatStore;
use storage::ConversationKind;
use storage::ConversationMeta;
use crate::AddressedEnvelope;
@ -16,9 +17,11 @@ use crate::RegistrationService;
use crate::account::LogosAccount;
use crate::causal_history::CausalHistoryStore;
use crate::causal_history::MissingMessage;
use crate::conversation::ConversationId;
use crate::conversation::GroupConvo;
use crate::conversation::group_v1::MlsContext;
use crate::conversation::{GroupV1Convo, IdentityProvider};
use crate::conversation::{GroupV1Convo, Id, IdentityProvider};
use crate::outcomes::{ConversationClass, InboxOutcome, NewConversation};
use crate::types::AccountId;
use crate::utils::{blake2b_hex, hash_size};
pub struct PqMlsContext {
@ -152,7 +155,7 @@ where
)
}
pub fn handle_frame(&self, payload_bytes: &[u8]) -> Result<(), ChatError> {
pub fn handle_frame(&self, payload_bytes: &[u8]) -> Result<InboxOutcome, ChatError> {
let inbox_frame = InboxV2Frame::decode(payload_bytes)?;
let Some(payload) = inbox_frame.payload else {
@ -172,14 +175,14 @@ where
let meta = ConversationMeta {
local_convo_id: convo.id().to_string(),
remote_convo_id: "0".into(),
kind: storage::ConversationKind::GroupV1,
kind: ConversationKind::GroupV1,
};
self.store.borrow_mut().save_conversation(&meta)?;
// TODO: (P1) Persist state
Ok(())
}
fn handle_heavy_invite(&self, invite: GroupV1HeavyInvite) -> Result<(), ChatError> {
fn handle_heavy_invite(&self, invite: GroupV1HeavyInvite) -> Result<InboxOutcome, ChatError> {
let (msg_in, _rest) = MlsMessageIn::tls_deserialize_bytes(invite.welcome_bytes.as_slice())?;
let MlsMessageBodyIn::Welcome(welcome) = msg_in.extract() else {
@ -197,7 +200,15 @@ where
self.causal.clone(),
welcome,
)?;
self.persist_convo(convo)
let convo_id: ConversationId = convo.id().to_string();
self.persist_convo(convo)?;
Ok(InboxOutcome {
new_conversation: NewConversation {
convo_id,
class: ConversationClass::Group,
},
initial: None,
})
}
fn create_keypackage(&self) -> Result<KeyPackage, ChatError> {

View File

@ -6,6 +6,7 @@ mod crypto;
mod errors;
mod inbox;
mod inbox_v2;
mod outcomes;
mod proto;
mod service_traits;
mod types;
@ -15,9 +16,13 @@ pub use account::LogosAccount;
pub use causal_history::MissingMessage;
pub use chat_sqlite::ChatStorage;
pub use chat_sqlite::StorageConfig;
pub use context::{Context, ConversationId, ConversationIdOwned, Introduction};
pub use context::{Context, ConversationId, Introduction};
pub use conversation::GroupConvo;
pub use errors::ChatError;
pub use outcomes::{
Content, ConversationClass, ConvoOutcome, InboxOutcome, NewConversation, PayloadOutcome,
};
pub use service_traits::{DeliveryService, IdentityProvider, RegistrationService};
pub use types::{AccountId, AddressedEnvelope, ContentData};
pub use storage::ConversationKind;
pub use types::{AccountId, AddressedEnvelope};
pub use utils::hex_trunc;

View File

@ -0,0 +1,81 @@
//! Observations a single inbound payload produces.
//!
//! - [`ConvoOutcome`] — an optional [`Content`] on a single existing
//! conversation.
//! - [`InboxOutcome`] — a newly observed conversation, optionally with an
//! initial [`ConvoOutcome`].
//! - [`PayloadOutcome`] — the union of the above, plus `Empty`.
use storage::ConversationKind;
use crate::conversation::ConversationId;
#[derive(Debug, Clone)]
pub struct Content {
pub bytes: Vec<u8>,
}
#[derive(Debug, Clone)]
pub struct ConvoOutcome {
pub convo_id: ConversationId,
pub content: Option<Content>,
}
impl ConvoOutcome {
pub fn empty(convo_id: ConversationId) -> Self {
Self {
convo_id,
content: None,
}
}
}
#[derive(Debug, Clone)]
pub struct NewConversation {
pub convo_id: ConversationId,
pub class: ConversationClass,
}
#[derive(Debug, Clone)]
pub struct InboxOutcome {
pub new_conversation: NewConversation,
pub initial: Option<ConvoOutcome>,
}
#[derive(Debug, Clone, Default)]
pub enum PayloadOutcome {
#[default]
Empty,
Convo(ConvoOutcome),
Inbox(InboxOutcome),
}
impl From<ConvoOutcome> for PayloadOutcome {
fn from(c: ConvoOutcome) -> Self {
Self::Convo(c)
}
}
impl From<InboxOutcome> for PayloadOutcome {
fn from(i: InboxOutcome) -> Self {
Self::Inbox(i)
}
}
/// Stable across protocol versions of the same conversation shape.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConversationClass {
Private,
Group,
}
impl ConversationClass {
/// `Unknown(_)` yields `None`.
pub fn from_kind(kind: &ConversationKind) -> Option<Self> {
match kind {
ConversationKind::PrivateV1 => Some(Self::Private),
ConversationKind::GroupV1 => Some(Self::Group),
ConversationKind::Unknown(_) => None,
}
}
}

View File

@ -48,15 +48,6 @@ impl Debug for AddressedEnvelope {
}
}
// This struct represents the result of processed inbound data.
// It wraps content payload with a conversation_id
#[derive(Debug)]
pub struct ContentData {
pub conversation_id: String,
pub data: Vec<u8>,
pub is_new_convo: bool,
}
// Internal type Definitions
// Used by Conversations to attach addresses to outbound encrypted payloads

View File

@ -1,38 +1,61 @@
use std::ops::{Deref, DerefMut};
use components::{EphemeralRegistry, LocalBroadcaster, MemStore};
use libchat::{ContentData, Context, GroupConvo, hex_trunc};
use libchat::{
Content, Context, ConversationClass, ConvoOutcome, GroupConvo, NewConversation, PayloadOutcome,
hex_trunc,
};
type ResultCallback = Box<dyn Fn(&PayloadOutcome)>;
// Simple client Functionality for testing
struct Client {
inner: Context<LocalBroadcaster, EphemeralRegistry, MemStore>,
on_content: Option<Box<dyn Fn(ContentData)>>,
on_result: Option<ResultCallback>,
new_conversations: Vec<NewConversation>,
received_messages: Vec<(libchat::ConversationId, Content)>,
}
impl Client {
fn init(
ctx: Context<LocalBroadcaster, EphemeralRegistry, MemStore>,
cb: Option<impl Fn(ContentData) + 'static>,
cb: Option<impl Fn(&PayloadOutcome) + 'static>,
) -> Self {
Client {
inner: ctx,
on_content: cb.map(|f| Box::new(f) as Box<dyn Fn(ContentData)>),
on_result: cb.map(|f| Box::new(f) as ResultCallback),
new_conversations: Vec::new(),
received_messages: Vec::new(),
}
}
fn process_messages(&mut self) {
let messages: Vec<_> = {
let payloads: Vec<_> = {
let mut ds = self.ds();
std::iter::from_fn(|| ds.poll()).collect()
};
for data in messages {
let res = self.handle_payload(&data).unwrap();
if let Some(cb) = &self.on_content
&& let Some(content_data) = res
{
cb(content_data);
for data in payloads {
let result = self.handle_payload(&data).unwrap();
if let Some(cb) = &self.on_result {
cb(&result);
}
match result {
PayloadOutcome::Empty => {}
PayloadOutcome::Convo(co) => self.absorb_convo_outcome(co),
PayloadOutcome::Inbox(io) => {
self.new_conversations.push(io.new_conversation);
if let Some(initial) = io.initial {
self.absorb_convo_outcome(initial);
}
}
}
}
}
fn absorb_convo_outcome(&mut self, outcome: ConvoOutcome) {
if let Some(content) = outcome.content {
self.received_messages.push((outcome.convo_id, content));
}
}
@ -60,15 +83,32 @@ impl DerefMut for Client {
}
// Higher order function to handle printing
fn pretty_print(prefix: impl Into<String>) -> Box<dyn Fn(ContentData)> {
fn pretty_print(prefix: impl Into<String>) -> ResultCallback {
let prefix = prefix.into();
Box::new(move |c: ContentData| {
let cid = hex_trunc(c.conversation_id.as_bytes());
let content = String::from_utf8(c.data).unwrap();
println!("{} ({:?}) {}", prefix, cid, content)
Box::new(move |result: &PayloadOutcome| match result {
PayloadOutcome::Empty => {}
PayloadOutcome::Inbox(io) => {
let cid = hex_trunc(io.new_conversation.convo_id.as_bytes());
println!(
"{prefix} ({cid:?}) [conversation started: {:?}]",
io.new_conversation.class
);
if let Some(initial) = &io.initial {
print_contents(&prefix, initial);
}
}
PayloadOutcome::Convo(co) => print_contents(&prefix, co),
})
}
fn print_contents(prefix: &str, outcome: &ConvoOutcome) {
let cid = hex_trunc(outcome.convo_id.as_bytes());
if let Some(content) = &outcome.content {
let text = String::from_utf8_lossy(&content.bytes);
println!("{prefix} ({cid:?}) {text}");
}
}
fn process(clients: &mut Vec<Client>) {
for client in clients {
client.process_messages();
@ -95,21 +135,33 @@ fn create_group() {
let raya_id = clients[RAYA].account_id().clone();
let s_convo = clients[SARO].create_group_convo(&[&raya_id]).unwrap();
let convo_id = s_convo.id();
let convo_id = s_convo.id().to_string();
// Raya can read this message because
// 1) It was sent after add_members was committed, and
// 2) LocalBroadcaster provides historical messages.
clients[SARO]
.convo(convo_id)
.convo(&convo_id)
.send_content(b"ok who broke the group chat again")
.unwrap();
process(&mut clients);
// Raya should observe exactly one new Group conversation from the
// welcome, even though no initial content arrives with it.
let raya_started = clients[RAYA]
.new_conversations
.iter()
.filter(|nc| matches!(nc.class, ConversationClass::Group))
.count();
assert_eq!(
raya_started, 1,
"Raya should have observed exactly one new Group conversation for the welcome"
);
clients[RAYA]
.convo(convo_id)
.convo(&convo_id)
.send_content(b"it was literally working five minutes ago")
.unwrap();
@ -121,21 +173,31 @@ fn create_group() {
let pax_id = clients[PAX].account_id().clone();
clients[SARO]
.convo(convo_id)
.convo(&convo_id)
.add_member(&[&pax_id])
.unwrap();
process(&mut clients);
let pax_started = clients[PAX]
.new_conversations
.iter()
.filter(|nc| matches!(nc.class, ConversationClass::Group))
.count();
assert_eq!(
pax_started, 1,
"Pax should have observed exactly one new Group conversation for the welcome"
);
clients[PAX]
.convo(convo_id)
.convo(&convo_id)
.send_content(b"ngl the key rotation is cooked")
.unwrap();
process(&mut clients);
clients[SARO]
.convo(convo_id)
.convo(&convo_id)
.send_content(b"bro we literally just added you to the group ")
.unwrap();

View File

@ -1,5 +1,5 @@
use chat_sqlite::{ChatStorage, StorageConfig};
use libchat::{Context, Introduction};
use libchat::{Context, ConversationClass, Introduction, PayloadOutcome};
use storage::{ConversationStore, IdentityStore};
use tempfile::tempdir;
@ -13,12 +13,14 @@ fn send_and_verify(
) {
let payloads = sender.send_content(convo_id, content).unwrap();
let payload = payloads.first().unwrap();
let received = receiver
.handle_payload(&payload.data)
.unwrap()
.expect("expected content");
assert_eq!(content, received.data.as_slice());
assert!(!received.is_new_convo);
let result = receiver.handle_payload(&payload.data).unwrap();
let PayloadOutcome::Convo(co) = result else {
panic!("steady-state send should yield PayloadOutcome::Convo, got {result:?}");
};
let content_out = co
.content
.expect("steady-state send should yield one content");
assert_eq!(content, content_out.bytes.as_slice());
}
#[test]
@ -38,16 +40,23 @@ fn ctx_integration() {
let mut content = vec![10];
let (saro_convo_id, payloads) = saro.create_private_convo(&intro, &content).unwrap();
// Raya receives initial message
// Raya receives the invite + initial message
let payload = payloads.first().unwrap();
let initial_content = raya
.handle_payload(&payload.data)
.unwrap()
.expect("expected initial content");
let raya_convo_id = initial_content.conversation_id;
assert_eq!(content, initial_content.data);
assert!(initial_content.is_new_convo);
let initial = raya.handle_payload(&payload.data).unwrap();
let PayloadOutcome::Inbox(io) = initial else {
panic!("invite must yield PayloadOutcome::Inbox, got {initial:?}");
};
assert!(matches!(
io.new_conversation.class,
ConversationClass::Private
));
let initial_co = io.initial.expect("invite must include initial content");
assert_eq!(io.new_conversation.convo_id, initial_co.convo_id);
let initial_content = initial_co
.content
.expect("invite must include initial message");
assert_eq!(content, initial_content.bytes);
let raya_convo_id = io.new_conversation.convo_id.clone();
// Exchange messages back and forth
for _ in 0..10 {
@ -107,8 +116,14 @@ fn conversation_metadata_persistence() {
let (_, payloads) = bob.create_private_convo(&intro, b"hi").unwrap();
let payload = payloads.first().unwrap();
let content = alice.handle_payload(&payload.data).unwrap().unwrap();
assert!(content.is_new_convo);
let result = alice.handle_payload(&payload.data).unwrap();
let PayloadOutcome::Inbox(io) = result else {
panic!("invite must yield PayloadOutcome::Inbox, got {result:?}");
};
assert!(matches!(
io.new_conversation.class,
ConversationClass::Private
));
let convos = alice.store().load_conversations().unwrap();
assert_eq!(convos.len(), 1);
@ -128,16 +143,27 @@ fn conversation_full_flow() {
let (bob_convo_id, payloads) = bob.create_private_convo(&intro, b"hello").unwrap();
let payload = payloads.first().unwrap();
let content = alice.handle_payload(&payload.data).unwrap().unwrap();
let alice_convo_id = content.conversation_id;
let result = alice.handle_payload(&payload.data).unwrap();
let PayloadOutcome::Inbox(io) = result else {
panic!("invite must yield PayloadOutcome::Inbox, got {result:?}");
};
let alice_convo_id = io.new_conversation.convo_id.clone();
let payloads = alice.send_content(&alice_convo_id, b"reply 1").unwrap();
let payload = payloads.first().unwrap();
bob.handle_payload(&payload.data).unwrap().unwrap();
let result = bob.handle_payload(&payload.data).unwrap();
assert_eq!(
expect_convo(result).content.expect("message content").bytes,
b"reply 1"
);
let payloads = bob.send_content(&bob_convo_id, b"reply 2").unwrap();
let payload = payloads.first().unwrap();
alice.handle_payload(&payload.data).unwrap().unwrap();
let result = alice.handle_payload(&payload.data).unwrap();
assert_eq!(
expect_convo(result).content.expect("message content").bytes,
b"reply 2"
);
// Verify conversation list
let convo_ids = alice.list_conversations().unwrap();
@ -146,18 +172,25 @@ fn conversation_full_flow() {
// Continue exchanging messages
let payloads = bob.send_content(&bob_convo_id, b"more messages").unwrap();
let payload = payloads.first().unwrap();
let content = alice
.handle_payload(&payload.data)
.expect("should decrypt")
.expect("should have content");
assert_eq!(content.data, b"more messages");
let result = alice.handle_payload(&payload.data).expect("should decrypt");
assert_eq!(
expect_convo(result).content.expect("message content").bytes,
b"more messages"
);
// Alice can also send back
let payloads = alice.send_content(&alice_convo_id, b"alice reply").unwrap();
let payload = payloads.first().unwrap();
let content = bob
.handle_payload(&payload.data)
.unwrap()
.expect("bob should receive");
assert_eq!(content.data, b"alice reply");
let result = bob.handle_payload(&payload.data).unwrap();
assert_eq!(
expect_convo(result).content.expect("message content").bytes,
b"alice reply"
);
}
fn expect_convo(result: PayloadOutcome) -> libchat::ConvoOutcome {
match result {
PayloadOutcome::Convo(co) => co,
other => panic!("expected PayloadOutcome::Convo, got {other:?}"),
}
}

View File

@ -84,19 +84,38 @@ static int32_t deliver_cb(
/* ------------------------------------------------------------------
* Helper: pop one envelope from the bus and push it into receiver.
* Returns a heap-allocated result; caller frees with
* push_inbound_result_free().
* Returns a heap-allocated event list; caller frees with
* event_list_free().
* ------------------------------------------------------------------ */
static PushInboundResult_t *route(ClientHandle_t *receiver)
static EventList_t *route(ClientHandle_t *receiver)
{
const uint8_t *data;
size_t len;
int ok = queue_pop(&bus, &data, &len);
assert(ok && "expected an envelope in the bus");
PushInboundResult_t *r = client_receive(receiver, SLICE(data, len));
assert(push_inbound_result_error_code(r) == 0 && "push_inbound failed");
return r;
EventList_t *evs = client_receive(receiver, SLICE(data, len));
assert(event_list_error_code(evs) == 0 && "client_receive failed");
return evs;
}
/* ------------------------------------------------------------------
* Helper: locate the first MessageReceived event in a list and copy
* its content into the caller-supplied buffer. Returns -1 if not found.
* ------------------------------------------------------------------ */
static int find_message(EventList_t *evs, char *out, size_t out_cap, size_t *out_len)
{
size_t n = event_list_len(evs);
for (size_t i = 0; i < n; ++i) {
if (event_list_kind_at(evs, i) == EVENT_KIND_MESSAGE_RECEIVED) {
slice_ref_uint8_t s = event_list_content_at(evs, i);
assert(s.len <= out_cap && "content buffer too small");
memcpy(out, s.ptr, s.len);
*out_len = s.len;
return (int)i;
}
}
return -1;
}
/* ------------------------------------------------------------------
@ -125,19 +144,23 @@ int main(void)
assert(create_convo_result_error_code(saro_convo) == 0);
create_intro_result_free(raya_intro);
/* Route saro -> raya */
PushInboundResult_t *recv = route(raya);
/* Route saro -> raya: expect [ConversationStarted, MessageReceived] */
EventList_t *evs = route(raya);
assert(event_list_len(evs) == 2 && "expected 2 events for invite");
assert(event_list_kind_at(evs, 0) == EVENT_KIND_CONVERSATION_STARTED
&& "first event should be ConversationStarted");
assert(event_list_conversation_class_at(evs, 0) == FFI_CONVERSATION_CLASS_PRIVATE
&& "expected Private convo class");
assert(push_inbound_result_has_content(recv) && "expected content from saro");
assert(push_inbound_result_is_new_convo(recv) && "expected new-conversation flag");
char msg[64];
size_t msg_len;
int idx = find_message(evs, msg, sizeof(msg), &msg_len);
assert(idx >= 0 && "expected MessageReceived from saro");
assert(msg_len == 10 && memcmp(msg, "hello raya", 10) == 0);
printf("Raya received: \"%.*s\"\n", (int)msg_len, msg);
slice_ref_uint8_t content = push_inbound_result_content(recv);
assert(content.len == 10);
assert(memcmp(content.ptr, "hello raya", 10) == 0);
printf("Raya received: \"%.*s\"\n", (int)content.len, content.ptr);
/* Copy Raya's convo_id before freeing recv */
slice_ref_uint8_t cid_ref = push_inbound_result_convo_id(recv);
/* Copy Raya's convo_id from the ConversationStarted event */
slice_ref_uint8_t cid_ref = event_list_convo_id_at(evs, 0);
uint8_t raya_cid[256];
size_t raya_cid_len = cid_ref.len;
if (raya_cid_len >= sizeof(raya_cid)) {
@ -145,37 +168,37 @@ int main(void)
return 1;
}
memcpy(raya_cid, cid_ref.ptr, raya_cid_len);
push_inbound_result_free(recv);
event_list_free(evs);
/* Raya replies */
ErrorCode_t rc = client_send_message(
raya, SLICE(raya_cid, raya_cid_len), STR("hi saro"));
assert(rc == ERROR_CODE_NONE);
recv = route(saro);
assert(push_inbound_result_has_content(recv) && "expected content from raya");
assert(!push_inbound_result_is_new_convo(recv) && "unexpected new-convo flag");
content = push_inbound_result_content(recv);
assert(content.len == 7);
assert(memcmp(content.ptr, "hi saro", 7) == 0);
printf("Saro received: \"%.*s\"\n", (int)content.len, content.ptr);
push_inbound_result_free(recv);
evs = route(saro);
assert(event_list_len(evs) == 1 && "expected MessageReceived only");
assert(event_list_kind_at(evs, 0) == EVENT_KIND_MESSAGE_RECEIVED);
idx = find_message(evs, msg, sizeof(msg), &msg_len);
assert(idx >= 0);
assert(msg_len == 7 && memcmp(msg, "hi saro", 7) == 0);
printf("Saro received: \"%.*s\"\n", (int)msg_len, msg);
event_list_free(evs);
/* Multiple back-and-forth rounds */
slice_ref_uint8_t saro_cid = create_convo_result_id(saro_convo);
for (int i = 0; i < 3; i++) {
char msg[32];
int mlen = snprintf(msg, sizeof(msg), "msg %d", i);
char text[32];
int tlen = snprintf(text, sizeof(text), "msg %d", i);
rc = client_send_message(saro, saro_cid, SLICE(msg, (size_t)mlen));
rc = client_send_message(saro, saro_cid, SLICE(text, (size_t)tlen));
assert(rc == ERROR_CODE_NONE);
recv = route(raya);
assert(push_inbound_result_has_content(recv));
content = push_inbound_result_content(recv);
assert((int)content.len == mlen);
assert(memcmp(content.ptr, msg, (size_t)mlen) == 0);
push_inbound_result_free(recv);
evs = route(raya);
idx = find_message(evs, msg, sizeof(msg), &msg_len);
assert(idx >= 0);
assert((int)msg_len == tlen);
assert(memcmp(msg, text, (size_t)tlen) == 0);
event_list_free(evs);
char reply[32];
int rlen = snprintf(reply, sizeof(reply), "reply %d", i);
@ -184,12 +207,12 @@ int main(void)
raya, SLICE(raya_cid, raya_cid_len), SLICE(reply, (size_t)rlen));
assert(rc == ERROR_CODE_NONE);
recv = route(saro);
assert(push_inbound_result_has_content(recv));
content = push_inbound_result_content(recv);
assert((int)content.len == rlen);
assert(memcmp(content.ptr, reply, (size_t)rlen) == 0);
push_inbound_result_free(recv);
evs = route(saro);
idx = find_message(evs, msg, sizeof(msg), &msg_len);
assert(idx >= 0);
assert((int)msg_len == rlen);
assert(memcmp(msg, reply, (size_t)rlen) == 0);
event_list_free(evs);
}
/* Cleanup */

View File

@ -1,8 +1,7 @@
use safer_ffi::prelude::*;
use std::sync::Arc;
use crate::delivery::{CDelivery, DeliverFn};
use logos_chat::{ChatClient, ClientError};
use logos_chat::{ChatClient, ClientError, ConversationClass, Event};
// ---------------------------------------------------------------------------
// Opaque client handle
@ -21,9 +20,47 @@ pub struct ClientHandle(pub(crate) ChatClient<CDelivery>);
pub enum ErrorCode {
None = 0,
BadUtf8 = -1,
/// Failure parsing or processing an introduction bundle.
BadIntro = -2,
DeliveryFail = -3,
UnknownError = -4,
/// Failure decoding, decrypting, or processing an inbound payload.
BadPayload = -5,
}
// ---------------------------------------------------------------------------
// Event taxonomy (C-side view of Event)
// ---------------------------------------------------------------------------
#[derive_ReprC]
#[repr(i32)]
#[derive(Clone, Copy)]
pub enum EventKind {
/// Sentinel returned by `event_list_kind_at` for out-of-bounds indices.
/// Never the kind of a real event row.
Invalid = -1,
ConversationStarted = 0,
MessageReceived = 1,
}
#[derive_ReprC]
#[repr(i32)]
#[derive(Clone, Copy)]
pub enum FfiConversationClass {
/// Sentinel for accessor calls that don't apply to the queried row
/// (out-of-bounds, or a non-`ConversationStarted` event).
Invalid = -1,
Private = 0,
Group = 1,
}
impl From<ConversationClass> for FfiConversationClass {
fn from(c: ConversationClass) -> Self {
match c {
ConversationClass::Private => FfiConversationClass::Private,
ConversationClass::Group => FfiConversationClass::Group,
}
}
}
// ---------------------------------------------------------------------------
@ -44,14 +81,61 @@ pub struct CreateConvoResult {
convo_id: Option<String>,
}
/// An ordered list of events with a status code. Inspect `error_code` (zero
/// on success) before iterating with `event_list_len` and the indexed
/// accessors.
#[derive_ReprC]
#[repr(opaque)]
pub struct PushInboundResult {
pub struct EventList {
error_code: i32,
has_content: bool,
is_new_convo: bool,
convo_id: Option<String>,
content: Option<Vec<u8>>,
events: Vec<EventRow>,
}
enum EventRow {
ConversationStarted {
convo_id: String,
class: FfiConversationClass,
},
MessageReceived {
convo_id: String,
content: Vec<u8>,
},
}
impl EventRow {
/// Translate an [`Event`] into the FFI row shape, or `None` for variants
/// without an FFI representation.
fn from_event(event: Event) -> Option<Self> {
match event {
Event::ConversationStarted {
convo_id, class, ..
} => Some(EventRow::ConversationStarted {
convo_id: convo_id.to_string(),
class: class.into(),
}),
Event::MessageReceived {
convo_id, content, ..
} => Some(EventRow::MessageReceived {
convo_id: convo_id.to_string(),
content,
}),
_ => None,
}
}
fn convo_id(&self) -> &str {
match self {
EventRow::ConversationStarted { convo_id, .. }
| EventRow::MessageReceived { convo_id, .. } => convo_id,
}
}
fn content(&self) -> &[u8] {
match self {
EventRow::MessageReceived { content, .. } => content,
_ => &[],
}
}
}
// ---------------------------------------------------------------------------
@ -159,7 +243,7 @@ fn client_create_conversation(
{
Ok(convo_id) => CreateConvoResult {
error_code: ErrorCode::None as i32,
convo_id: Some(convo_id.to_string()),
convo_id: Some(convo_id),
},
Err(ClientError::Chat(_)) => CreateConvoResult {
error_code: ErrorCode::BadIntro as i32,
@ -205,8 +289,7 @@ fn client_send_message(
Ok(s) => s,
Err(_) => return ErrorCode::BadUtf8,
};
let convo_id_owned: logos_chat::ConversationIdOwned = Arc::from(id_str);
match handle.0.send_message(&convo_id_owned, content.as_slice()) {
match handle.0.send_message(id_str, content.as_slice()) {
Ok(()) => ErrorCode::None,
Err(ClientError::Delivery(_)) => ErrorCode::DeliveryFail,
Err(_) => ErrorCode::UnknownError,
@ -214,72 +297,90 @@ fn client_send_message(
}
// ---------------------------------------------------------------------------
// Push inbound
// Receive (process inbound, get event list back)
// ---------------------------------------------------------------------------
/// Decrypt an inbound payload. `has_content` is false for protocol frames.
/// Free with `push_inbound_result_free`.
/// Decrypt an inbound payload. Returns the events the payload produced;
/// the list may be empty for protocol-only frames. Free with
/// `event_list_free`.
#[ffi_export]
fn client_receive(
handle: &mut ClientHandle,
payload: c_slice::Ref<'_, u8>,
) -> repr_c::Box<PushInboundResult> {
) -> repr_c::Box<EventList> {
let result = match handle.0.receive(payload.as_slice()) {
Ok(Some(cd)) => PushInboundResult {
Ok(events) => EventList {
error_code: ErrorCode::None as i32,
has_content: true,
is_new_convo: cd.is_new_convo,
convo_id: Some(cd.conversation_id),
content: Some(cd.data),
events: events
.into_iter()
.filter_map(EventRow::from_event)
.collect(),
},
Ok(None) => PushInboundResult {
error_code: ErrorCode::None as i32,
has_content: false,
is_new_convo: false,
convo_id: None,
content: None,
Err(ClientError::Chat(_)) => EventList {
error_code: ErrorCode::BadPayload as i32,
events: Vec::new(),
},
Err(_) => PushInboundResult {
error_code: ErrorCode::UnknownError as i32,
has_content: false,
is_new_convo: false,
convo_id: None,
content: None,
Err(ClientError::Delivery(_)) => EventList {
error_code: ErrorCode::DeliveryFail as i32,
events: Vec::new(),
},
};
Box::new(result).into()
}
#[ffi_export]
fn push_inbound_result_error_code(r: &PushInboundResult) -> i32 {
r.error_code
fn event_list_error_code(list: &EventList) -> i32 {
list.error_code
}
#[ffi_export]
fn push_inbound_result_has_content(r: &PushInboundResult) -> bool {
r.has_content
fn event_list_len(list: &EventList) -> usize {
list.events.len()
}
/// Returns `EventKind::Invalid` for out-of-bounds indices.
#[ffi_export]
fn event_list_kind_at(list: &EventList, idx: usize) -> EventKind {
match list.events.get(idx) {
Some(EventRow::ConversationStarted { .. }) => EventKind::ConversationStarted,
Some(EventRow::MessageReceived { .. }) => EventKind::MessageReceived,
None => EventKind::Invalid,
}
}
/// Returns an empty slice for out-of-bounds indices.
/// The slice is valid only while `list` is alive.
#[ffi_export]
fn event_list_convo_id_at(list: &EventList, idx: usize) -> c_slice::Ref<'_, u8> {
list.events
.get(idx)
.map(|r| r.convo_id().as_bytes())
.unwrap_or(&[])
.into()
}
/// Returns an empty slice for non-`MessageReceived` events or out-of-bounds.
/// The slice is valid only while `list` is alive.
#[ffi_export]
fn event_list_content_at(list: &EventList, idx: usize) -> c_slice::Ref<'_, u8> {
list.events
.get(idx)
.map(EventRow::content)
.unwrap_or(&[])
.into()
}
/// Returns `FfiConversationClass::Invalid` for non-`ConversationStarted`
/// events or out-of-bounds.
#[ffi_export]
fn event_list_conversation_class_at(list: &EventList, idx: usize) -> FfiConversationClass {
match list.events.get(idx) {
Some(EventRow::ConversationStarted { class, .. }) => *class,
_ => FfiConversationClass::Invalid,
}
}
#[ffi_export]
fn push_inbound_result_is_new_convo(r: &PushInboundResult) -> bool {
r.is_new_convo
}
/// Returns an empty slice when has_content is false.
/// The slice is valid only while `r` is alive.
#[ffi_export]
fn push_inbound_result_convo_id(r: &PushInboundResult) -> c_slice::Ref<'_, u8> {
r.convo_id.as_deref().unwrap_or("").as_bytes().into()
}
/// Returns an empty slice when has_content is false.
/// The slice is valid only while `r` is alive.
#[ffi_export]
fn push_inbound_result_content(r: &PushInboundResult) -> c_slice::Ref<'_, u8> {
r.content.as_deref().unwrap_or(&[]).into()
}
#[ffi_export]
fn push_inbound_result_free(r: repr_c::Box<PushInboundResult>) {
drop(r)
fn event_list_free(list: repr_c::Box<EventList>) {
drop(list)
}

View File

@ -1,5 +1,4 @@
use logos_chat::{ChatClient, ConversationIdOwned, InProcessDelivery};
use std::sync::Arc;
use logos_chat::{ChatClient, ConversationId, Event, InProcessDelivery};
fn main() {
let delivery = InProcessDelivery::new(Default::default());
@ -13,21 +12,29 @@ fn main() {
.unwrap();
let raw = cursor.next().unwrap();
let content = raya.receive(&raw).unwrap().unwrap();
println!(
"Raya received: {:?}",
std::str::from_utf8(&content.data).unwrap()
);
let events = raya.receive(&raw).unwrap();
let raya_convo_id: ConversationId = events
.iter()
.find_map(|e| match e {
Event::ConversationStarted { convo_id, .. } => Some(convo_id.to_string()),
_ => None,
})
.expect("expected ConversationStarted");
for event in &events {
if let Event::MessageReceived { content, .. } = event {
println!("Raya received: {:?}", std::str::from_utf8(content).unwrap());
}
}
let raya_convo_id: ConversationIdOwned = Arc::from(content.conversation_id.as_str());
raya.send_message(&raya_convo_id, b"hi saro").unwrap();
let raw = cursor.next().unwrap();
let content = saro.receive(&raw).unwrap().unwrap();
println!(
"Saro received: {:?}",
std::str::from_utf8(&content.data).unwrap()
);
let events = saro.receive(&raw).unwrap();
for event in &events {
if let Event::MessageReceived { content, .. } = event {
println!("Saro received: {:?}", std::str::from_utf8(content).unwrap());
}
}
println!("Message exchange complete.");
}

View File

@ -1,11 +1,14 @@
use std::sync::Arc;
use libchat::{
AddressedEnvelope, ChatError, ChatStorage, ContentData, Context, ConversationIdOwned,
DeliveryService, Introduction, StorageConfig,
AddressedEnvelope, ChatError, ChatStorage, Context, ConversationId, ConvoOutcome,
DeliveryService, InboxOutcome, Introduction, PayloadOutcome, StorageConfig,
};
use components::EphemeralRegistry;
use crate::errors::ClientError;
use crate::event::Event;
pub struct ChatClient<D: DeliveryService> {
ctx: Context<D, EphemeralRegistry, ChatStorage>,
@ -52,7 +55,7 @@ impl<D: DeliveryService + 'static> ChatClient<D> {
&mut self,
intro_bundle: &[u8],
initial_content: &[u8],
) -> Result<ConversationIdOwned, ClientError<D::Error>> {
) -> Result<ConversationId, ClientError<D::Error>> {
let intro = Introduction::try_from(intro_bundle)?;
let (convo_id, envelopes) = self.ctx.create_private_convo(&intro, initial_content)?;
self.dispatch_all(envelopes)?;
@ -60,27 +63,25 @@ impl<D: DeliveryService + 'static> ChatClient<D> {
}
/// List all conversation IDs known to this client.
pub fn list_conversations(&self) -> Result<Vec<ConversationIdOwned>, ClientError<D::Error>> {
pub fn list_conversations(&self) -> Result<Vec<ConversationId>, ClientError<D::Error>> {
self.ctx.list_conversations().map_err(Into::into)
}
/// Encrypt `content` and dispatch all outbound envelopes.
pub fn send_message(
&mut self,
convo_id: &ConversationIdOwned,
convo_id: &str,
content: &[u8],
) -> Result<(), ClientError<D::Error>> {
let envelopes = self.ctx.send_content(convo_id.as_ref(), content)?;
let envelopes = self.ctx.send_content(convo_id, content)?;
self.dispatch_all(envelopes)
}
/// Decrypt an inbound payload. Returns `Some(ContentData)` for user
/// content, `None` for protocol frames.
pub fn receive(
&mut self,
payload: &[u8],
) -> Result<Option<ContentData>, ClientError<D::Error>> {
self.ctx.handle_payload(payload).map_err(Into::into)
/// Decrypt an inbound payload. Returns the events the payload produced,
/// in causal order. May be empty for protocol-only frames.
pub fn receive(&mut self, payload: &[u8]) -> Result<Vec<Event>, ClientError<D::Error>> {
let result = self.ctx.handle_payload(payload)?;
Ok(events_from_inbound(result))
}
fn dispatch_all(
@ -94,3 +95,46 @@ impl<D: DeliveryService + 'static> ChatClient<D> {
Ok(())
}
}
/// Walk an [`PayloadOutcome`] in causal order and emit one `Event` per
/// observation. For an `Inbox` outcome, [`Event::ConversationStarted`]
/// precedes the message event. The convo id is wrapped into `Arc<str>` once
/// per outcome and shared across the events it produces.
fn events_from_inbound(result: PayloadOutcome) -> Vec<Event> {
match result {
PayloadOutcome::Empty => Vec::new(),
PayloadOutcome::Convo(co) => convo_events(co),
PayloadOutcome::Inbox(io) => inbox_events(io),
}
}
fn convo_events(outcome: ConvoOutcome) -> Vec<Event> {
let ConvoOutcome { convo_id, content } = outcome;
content
.map(|c| Event::MessageReceived {
convo_id: Arc::from(convo_id),
content: c.bytes,
})
.into_iter()
.collect()
}
fn inbox_events(outcome: InboxOutcome) -> Vec<Event> {
let InboxOutcome {
new_conversation,
initial,
} = outcome;
let id: Arc<str> = Arc::from(new_conversation.convo_id);
let mut events = Vec::with_capacity(2);
events.push(Event::ConversationStarted {
convo_id: Arc::clone(&id),
class: new_conversation.class,
});
if let Some(c) = initial.and_then(|co| co.content) {
events.push(Event::MessageReceived {
convo_id: Arc::clone(&id),
content: c.bytes,
});
}
events
}

View File

@ -0,0 +1,27 @@
//! Application-facing chat events.
//!
//! Each variant of [`Event`] describes one observable thing the application
//! cares about: a new conversation has appeared, a message was decrypted on
//! an existing one, and so on. The enum is `#[non_exhaustive]` so new
//! variants can be added without breaking exhaustive matches in dependent
//! crates.
use std::sync::Arc;
use libchat::ConversationClass;
/// A discrete chat event.
#[non_exhaustive]
#[derive(Debug, Clone)]
pub enum Event {
/// A new conversation has appeared.
ConversationStarted {
convo_id: Arc<str>,
class: ConversationClass,
},
/// User content arrived on an existing conversation.
MessageReceived {
convo_id: Arc<str>,
content: Vec<u8>,
},
}

View File

@ -1,12 +1,14 @@
mod client;
mod delivery_in_process;
mod errors;
mod event;
pub use client::ChatClient;
pub use delivery_in_process::{Cursor, InProcessDelivery, MessageBus};
pub use errors::ClientError;
pub use event::Event;
// Re-export types callers need to interact with ChatClient
// Re-export types callers need to interact with ChatClient.
pub use libchat::{
AddressedEnvelope, ContentData, ConversationIdOwned, DeliveryService, StorageConfig,
AddressedEnvelope, ConversationClass, ConversationId, DeliveryService, StorageConfig,
};

View File

@ -1,14 +1,29 @@
use logos_chat::{
ChatClient, ContentData, ConversationIdOwned, Cursor, InProcessDelivery, StorageConfig,
ChatClient, ConversationClass, ConversationId, Cursor, Event, InProcessDelivery, StorageConfig,
};
use std::sync::Arc;
fn receive(receiver: &mut ChatClient<InProcessDelivery>, cursor: &mut Cursor) -> ContentData {
/// Pulls one envelope, decrypts, and returns the events emitted.
fn receive(receiver: &mut ChatClient<InProcessDelivery>, cursor: &mut Cursor) -> Vec<Event> {
let raw = cursor.next().expect("expected envelope");
receiver
.receive(&raw)
.expect("receive failed")
.expect("expected content")
receiver.receive(&raw).expect("receive failed")
}
fn expect_message(event: &Event) -> (&str, &[u8]) {
match event {
Event::MessageReceived {
convo_id, content, ..
} => (convo_id.as_ref(), content.as_slice()),
other => panic!("expected MessageReceived, got {other:?}"),
}
}
fn expect_conversation_started(event: &Event) -> (&str, ConversationClass) {
match event {
Event::ConversationStarted {
convo_id, class, ..
} => (convo_id.as_ref(), *class),
other => panic!("expected ConversationStarted, got {other:?}"),
}
}
#[test]
@ -24,27 +39,39 @@ fn saro_raya_message_exchange() {
.create_conversation(&raya_bundle, b"hello raya")
.unwrap();
let content = receive(&mut raya, &mut cursor);
assert_eq!(content.data, b"hello raya");
assert!(content.is_new_convo);
let raya_convo_id: ConversationIdOwned = Arc::from(content.conversation_id.as_str());
let events = receive(&mut raya, &mut cursor);
assert_eq!(
events.len(),
2,
"expected ConversationStarted + MessageReceived"
);
let (started_id, class) = expect_conversation_started(&events[0]);
assert_eq!(class, ConversationClass::Private);
let (msg_id, content) = expect_message(&events[1]);
assert_eq!(content, b"hello raya");
assert_eq!(started_id, msg_id);
let raya_convo_id: ConversationId = started_id.to_owned();
raya.send_message(&raya_convo_id, b"hi saro").unwrap();
let content = receive(&mut saro, &mut cursor);
assert_eq!(content.data, b"hi saro");
assert!(!content.is_new_convo);
let events = receive(&mut saro, &mut cursor);
assert_eq!(events.len(), 1);
let (_, content) = expect_message(&events[0]);
assert_eq!(content, b"hi saro");
for i in 0u8..5 {
let msg = format!("msg {i}");
saro.send_message(&saro_convo_id, msg.as_bytes()).unwrap();
let content = receive(&mut raya, &mut cursor);
assert_eq!(content.data, msg.as_bytes());
let events = receive(&mut raya, &mut cursor);
assert_eq!(events.len(), 1);
let (_, content) = expect_message(&events[0]);
assert_eq!(content, msg.as_bytes());
let reply = format!("reply {i}");
raya.send_message(&raya_convo_id, reply.as_bytes()).unwrap();
let content = receive(&mut saro, &mut cursor);
assert_eq!(content.data, reply.as_bytes());
let events = receive(&mut saro, &mut cursor);
assert_eq!(events.len(), 1);
let (_, content) = expect_message(&events[0]);
assert_eq!(content, reply.as_bytes());
}
assert_eq!(saro.list_conversations().unwrap().len(), 1);

View File

@ -5,272 +5,57 @@
| Status | Accepted |
| Issue | https://github.com/logos-messaging/libchat/issues/97 |
| Date | 2026-05-19 |
| Last revised | 2026-05-28 |
## Context and Problem
Applications currently learn about new conversations from an `is_new_convo: bool` flag on `ContentData` (`core/conversations/src/types.rs:16-20`). Two problems:
Applications must observe several kinds of things produced by the chat library: new conversations appearing from peer-initiated handshakes, decrypted messages on existing conversations, and further protocol observations (group membership changes, reliability signals). These observations are not coupled — an MLS group welcome creates a new conversation with no initial content; a single inbound payload can yield multiple observations; some observations (delivery timeouts from background retry work) have no synchronous trigger at all and must reach the application after the call that might have caused them has long since returned.
1. The flag overloads `ContentData`: protocol metadata is smuggled through a content carrier.
2. The flag assumes every new conversation carries an initial content frame. Protocols such as MLS allow a conversation to begin without one; in that case `handle_payload` returns `None` and the application never observes the new conversation.
Issue #97 calls for a proper event system that can signal new conversations, delivery receipts, and reliability failures — without piggy-backing on content — and that provides a clear path for adding new event types later.
This ADR specifies the layered design of the event system and how events reach the application.
Issue #97 captures the requirement for an observation surface that does not piggy-back on content, accommodates both sync-triggered and background-triggered observations uniformly, and crosses the FFI boundary cleanly.
## Decision Drivers
- **Simplicity of the core.** Fully synchronous and caller-driven: no background work, no callbacks out. External effects are performed through services injected as method parameters.
- **Extensibility.** A new event type is a localised change (one enum variant, one emit site) that does not break existing consumers.
- **FFI compatibility.** Must remain expressible through the existing `safer-ffi` boundary in `crates/client-ffi`. Event payloads are limited to owned, concrete data (bytes, strings, identifiers) — no closures, generics, or non-`'static` references.
- **Simplicity of the core.** Fully synchronous and caller-driven: no background work, no callbacks out. External effects flow through services injected as method parameters.
- **Asynchronous delivery at the client.** Applications consume events on their own schedule. Observations from sync-triggered processing and observations from background work share a single delivery surface, so the application sees one notification stream and does not care which path produced any given event.
- **FFI compatibility.** Payloads crossing the `safer-ffi` boundary in `crates/client-ffi` are limited to owned, concrete data — no closures, generics, or non-`'static` references — so any delivery mechanism must degrade to a sync drain on that side.
## Architecture
The library is organised in three layers. Calls flow downward; events flow upward.
Three layers. Calls flow downward. Sync results return through method returns; events reach the application asynchronously through a channel.
```mermaid
flowchart TB
A["<b>app</b><br/>UI/UX layer<br/>drives the event loop"]
B["<b>client</b><br/>owns services<br/>runs background threads"]
C["<b>core</b><br/>strict sync, caller-driven"]
A["<b>app</b><br/>drains Receiver&lt;Event&gt;"]
B["<b>client</b><br/>owns transport poller + services<br/>translates PayloadOutcome → Event values<br/>pushes onto channel"]
C["<b>core</b><br/>strict sync, caller-driven<br/>returns PayloadOutcome"]
A -- "method calls" --> B
B -- "method calls" --> C
C -.->|"events (from method returns)"| B
B -.->|"events (sync + background)"| A
C -.->|"PayloadOutcome<br/>(sync method return)"| B
B == "Event (async channel)" ==> A
```
Crates: **app**`bin/chat-cli`, future `logos-chat-module`; **client**`crates/client`, `crates/client-ffi`; **core**`core/conversations` and friends in libchat.
## Design
### Core layer
#### Constraints
- Strict sync, single-threaded.
- No background work, timers, or internal queues.
- External effects (delivery; future registration / identity lookups) are performed through services injected as method parameters.
#### Approach
Methods receive the services they need and call them directly. Observations (events) are returned so the caller can surface them upward:
```rust
impl<S: ChatStore> Context<S> {
pub fn handle_payload<D: DeliveryService>(
&mut self,
delivery: &mut D,
payload: &[u8],
) -> Result<Vec<Event>, ChatError>;
pub fn send_content<D: DeliveryService>(
&mut self,
delivery: &mut D,
convo: ConversationId,
content: &[u8],
) -> Result<Vec<Event>, ChatError>;
pub fn create_private_convo<D: DeliveryService>(
&mut self,
delivery: &mut D,
intro: &Introduction,
content: &[u8],
) -> Result<(ConversationIdOwned, Vec<Event>), ChatError>;
}
```
### Client layer
#### Responsibility split
The client owns the concrete service implementations (delivery, future registration, identity), polls the transport on a background thread, and processes inbound bytes by calling into the core. The application invokes client methods and consumes events; raw transport bytes (encrypted envelopes off the wire) are handled entirely inside the client.
#### Constraints
- Owns the concrete service implementations and injects them into core method calls.
- Events from synchronous calls flow through the method's return type, inherited from the core.
- Polls the transport on a background thread and feeds inbound payloads into the core.
- May spawn additional background threads (e.g. for timer-driven retries).
- Background threads emit events that no caller-invoked method can return — for example `DeliveryFailed { reason: Timeout }`.
#### Common shape (all options)
The client invokes core methods with its services; the core publishes envelopes directly through the injected delivery service. Only events flow back as return values.
```rust
impl<D: DeliveryService> ChatClient<D> {
pub fn send_message(&mut self, convo: &ConversationIdOwned, content: &[u8])
-> Result<Vec<Event>, ClientError<D::Error>>; // sync events from this send
// Background events (including those from inbound payload processing) reach the
// application through one of the three mechanisms below.
}
```
The three options differ only in how background events reach the application.
#### Option A — internal poll queue
The client owns a `Mutex<VecDeque<Event>>`. Background threads push to it; the application drains via two new methods.
```rust
impl<D: DeliveryService> ChatClient<D> {
pub fn poll_event(&mut self) -> Option<Event>;
pub fn drain_events(&mut self) -> Vec<Event>;
}
```
Prior art: mio's `Events` (per-`Poll` instance, drained by the caller); rdkafka's `Consumer::poll` (background thread fills a queue, caller polls — same domain).
**Pros**
- Single primitive (mutex-protected queue) with no new dependencies.
- FFI mapping is direct: `client_poll_event` returns an opaque `Option<Event>`, mirroring the existing `PushInboundResult` shape (`crates/client-ffi/src/api.rs:49-55`).
- Matches the existing chat-cli tick-loop consumer pattern (`bin/chat-cli/src/app.rs:144-180`).
**Cons**
- Requires the application to drain after every operation; events accumulate if it forgets.
- Adds shared mutable state (`Mutex<VecDeque>`) inside the client; the queue must be bounded with explicit overflow handling.
#### Option B — channel handed to the caller (selected)
The client's constructor returns a `Receiver<Event>` alongside the client handle. Background threads hold a `Sender<Event>` clone; the application reads from the receiver.
```rust
let (client, events): (ChatClient<_>, Receiver<Event>) =
ChatClient::new(name, delivery);
```
Prior art: most Rust networking libraries; `std::sync::mpsc`, `crossbeam-channel`, `flume`.
**Pros**
- Channels are the canonical multi-producer/single-consumer primitive in the standard library; the shape is idiomatic in pure Rust.
- The application can park in `recv()` from a worker thread, integrate with `select!`, or later swap to `tokio::sync::mpsc` for an async wrapper.
- Mirrors the inbound-bytes channel chat-cli already uses (`bin/chat-cli/src/app.rs:46`).
**Cons**
- `Receiver<T>` is not `#[repr(C)]` and cannot cross `safer-ffi` cleanly. The FFI layer must expose a drain function regardless, collapsing Option B into Option A at the boundary.
- Forces a channel-crate choice (`std::sync::mpsc`, `crossbeam-channel`, or `flume`).
#### Option C — callback registered at construction
The application registers a closure at construction; background threads invoke it directly when events arise.
```rust
type EventFn = Box<dyn Fn(&Event) + Send + 'static>;
impl ChatClient<D> {
pub fn new(name: &str, delivery: D, on_event: EventFn) -> Self;
}
```
Prior art: the existing FFI `DeliverFn` callback at `client_create` (`crates/client-ffi/src/delivery.rs:8-15`); `tracing::Subscriber`; GTK signals.
**Pros**
- The codebase already establishes this pattern for outbound delivery; events would extend a familiar contract.
- FFI mapping is direct: register an `EventFn` function pointer at `client_create`.
- No internal queue or `Mutex` to maintain.
**Cons**
- The callback fires on the background thread. UI-style consumers (ratatui, GUI toolkits) cannot update state from threads other than the main loop thread and will bridge the callback into a thread-local queue — effectively re-implementing Option A in user code.
- The closure must be `Send + 'static`; capturing application state requires `Arc<Mutex<…>>` or a channel back to the application.
- Sync events arrive on the caller's thread; background events arrive on the background thread. The handler must be correct in both threading contexts, or the callback must forward to the main thread (collapsing into Option A).
#### Comparison
| Criterion | A: poll queue | B: channel | C: callback |
|---|---|---|---|
| Background events delivered via | `poll_event` / `drain_events` | `Receiver<Event>` | direct `Fn(&Event)` invocation |
| FFI fit (`safer-ffi`) | Native opaque + accessors | Degrades to Option A at the boundary | Native function pointer (matches `DeliverFn`) |
| New dependencies | None | None (with `std::sync::mpsc`); otherwise `crossbeam-channel` or `flume` | None |
| Internal state required | `Mutex<VecDeque<Event>>` | Channel internals | None |
| Thread on which the application observes the event | Application thread (next drain) | Application thread (next drain) | Background thread |
| Bridges naturally to UI thread | Yes | Yes | No (requires re-bridging) |
| Backpressure if the application is slow | Client-side queue buffers; bounded with overflow handling | Channel buffers; bound configurable | No buffer; slow callbacks block the background thread |
| Future `Stream` adapter | Wrap `poll_event` in a `Stream` | Swap to async channel (native) | Bridge callback into a channel, then `Stream` |
### App layer
The application drives the event loop. With Option B (selected), each tick drains the `Receiver<Event>` handed back at client construction:
```rust
pub fn tick(&mut self) -> Result<()> {
for event in self.events.try_iter() {
self.handle_event(event);
}
Ok(())
}
```
For reference, Option A would replace `self.events.try_iter()` with `self.client.drain_events()`. Option C moves the drain out of the tick — into the callback — and the callback typically forwards into an application-side channel that is drained on each tick anyway.
## Event Taxonomy
The same `Event` enum is shared across all three client options.
```rust
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum Event {
#[non_exhaustive]
ConversationStarted {
conversation_id: ConversationIdOwned,
},
#[non_exhaustive]
MessageReceived {
conversation_id: ConversationIdOwned,
data: Vec<u8>,
},
#[non_exhaustive]
DeliveryReceipt {
conversation_id: ConversationIdOwned,
envelope_id: EnvelopeId,
},
#[non_exhaustive]
DeliveryFailed {
conversation_id: ConversationIdOwned,
envelope_id: EnvelopeId,
reason: FailureReason,
},
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum FailureReason {
Transport, // synchronous transport error on publish
PeerRejected, // peer signalled rejection (future protocol work)
Timeout, // no receipt within the retry window
}
```
`#[non_exhaustive]` on the enum permits new variants; on each struct variant it permits new fields. Both are additive minor-release changes. Future variants (`ConversationRekeyed`, `ParticipantJoined`, `PresenceChanged`, transport health, key-rotation reminders, …) follow this rule.
Mapping of variants to emit sites:
| Variant | Emitted from |
|---|---|
| `ConversationStarted` (responder side) | `core/conversations/src/inbox/handler.rs:155-162` (replaces `is_new_convo: true`) |
| `MessageReceived` | `core/conversations/src/conversation/privatev1.rs:184-191` (replaces `is_new_convo: false`) |
| `DeliveryReceipt` | `Context::handle_payload` when decoding a `PrivateV1Frame::Receipt` (future protocol work) |
| `DeliveryFailed { Transport }` | Core method that invoked `delivery.publish` and observed a synchronous error |
| `DeliveryFailed { Timeout }` | Client's background retry thread |
Events are the uniform observation channel: they carry both observations the call itself caused (e.g. a sync `DeliveryFailed { Transport }` from `send_content`) and observations from background work (e.g. `DeliveryFailed { Timeout }` from the retry thread). The only thing kept outside `Vec<Event>` is an obvious primary result the caller will use immediately — returned directly for ergonomics. This is why the initiator side does not emit `ConversationStarted`: `create_private_convo` returns the new `ConversationIdOwned` directly as part of its return value.
## Decisions
1. **Sync at the client layer for now.** The core stays sync; the client also stays sync. Migrating to async later is non-structural — `std::sync::mpsc::Receiver<Event>` swaps to `tokio::sync::mpsc::Receiver<Event>` and gains an `impl Stream` shape without changing the chosen mechanism (point 2). Option A would migrate to a `Stream` over a notify primitive; Option C to an `async fn` callback.
1. **Core returns `PayloadOutcome`, a dispatcher-level enum.** Each inbound path inside the core yields its own concrete outcome type: `ConvoOutcome` (`Convo::handle_frame`) carries decrypted contents on an existing conversation; `InboxOutcome` (inbox / inbox_v2 handlers) carries a newly observed conversation plus an optional initial `ConvoOutcome`. `PayloadOutcome` is the dispatcher-level union (`Empty`, `Convo(ConvoOutcome)`, `Inbox(InboxOutcome)`) and is the single type `Context::handle_payload` returns; `From<ConvoOutcome>` / `From<InboxOutcome>` impls keep the per-path handlers free of `PayloadOutcome` in their signatures. The split encodes at the type level what each producer can populate — a `Convo` cannot manufacture a new conversation, so its signature precludes the possibility.
2. **Consumer pattern: Option B — channel handed to the caller.** Different consumer archetypes could favour different shapes — a polling UI loop suits Option A; a low-latency push-driven consumer (toast notifications, daemons) suits Option C — but Option B is preferred: it is the most Rust-idiomatic of the three, has few drawbacks compared to A or C, and offers the smoothest path to async (point 1).
2. **`Event` is an asynchronous notification.** The client's constructor returns a `Receiver<Event>` alongside the client handle. A background poller drives the transport, calls into the core for each inbound payload, translates the resulting `PayloadOutcome` into one event per observation, and pushes them onto the channel. Background work that has no synchronous trigger at all (delivery retry timeouts, future protocol timers) pushes onto the same channel.
## Event flow
3. **Two enums, mapping at the client boundary.** `PayloadOutcome` is the dispatcher-level sum of observations from one payload; `Event` is a discrete app-facing notification. The two enums are allowed to diverge: a protocol-internal observation the app does not need lives only on a core outcome type; a client-only event like `DeliveryFailed { Timeout }` lives only on `Event`. Translation is an explicit per-variant `match` inside the client — not a blanket `From` impl — to preserve that divergence as both sides grow.
A worked example of the decisions above. Two flows cover everything the application observes: a synchronous send initiated by the app, and a background inbound carried by the client's transport poller.
4. **`ConversationClass` is a core boundary type, not a client-only one.** The protocol-versioned `ConversationKind` (`PrivateV1`, `GroupV1`, …) is a storage concern; clients only need the coarse class (`Private`, `Group`). The kind→class mapping happens in core where `NewConversation` is constructed, so adding a new `ConversationKind` is a one-line change in core's mapping site rather than a ripple into every client. The client re-exports `ConversationClass` for consumers, but the canonical definition lives in core alongside the outcome types.
## Events vs errors
Events are asynchronous notifications: things the application learns after the call that might have triggered them has returned. They cross thread boundaries through the channel.
Synchronous failures — publish, parse, store, MLS — stay on `Result<_, ChatError>` on the call that triggered them. They are never events. `DeliveryFailed { reason }` is therefore an event by construction: only background work can raise it, after the original send already returned `Ok`.
## Sequence
Two flows cover everything the application observes: a synchronous send initiated by the app, and inbound bytes carried by the client's transport poller.
```mermaid
sequenceDiagram
@ -280,36 +65,22 @@ sequenceDiagram
participant Core
participant Delivery as DeliveryService
Note over App,Delivery: Outbound — synchronous send initiated by the app
Note over App,Delivery: Outbound — synchronous send
App->>Client: send_message(convo, content)
Client->>Core: send_content(&mut delivery, ...)
Client->>Core: send_content(...)
Core->>Delivery: publish(envelope)
Delivery-->>Core: Ok / Err
Core-->>Client: Ok(Vec<Event>)
Client-->>App: Ok(Vec<Event>)
Core-->>Client: Ok(()) / Err
Client-->>App: Ok(()) / Err
Note over Poller,Delivery: Inbound — background poll loop in the client
Poller->>Poller: poll tick
Note over Poller,Delivery: Inbound — background poller pushes events
Poller->>Delivery: poll
Delivery-->>Poller: payload bytes
Poller->>Core: handle_payload(&mut delivery, payload)
Core-->>Poller: Ok(vec![MessageReceived, ...])
Poller-)App: event via Receiver<Event>
Poller->>Core: handle_payload(payload)
Core-->>Poller: Ok(PayloadOutcome)
Poller->>Poller: translate fields → Event values
Poller-)App: events via Receiver<Event>
Note over App: Next tick — drain the channel
App->>App: for event in events.try_iter() { handle_event(event) }
Note over App: App drains on its own schedule
App->>App: for event in receiver.try_iter() { handle(event) }
```
## References
### Source references
- `core/conversations/src/types.rs:9-20` — current `ContentData` and `AddressedEnvelope`
- `core/conversations/src/context.rs:138-185``Context::handle_payload` (core inbound entry)
- `core/conversations/src/inbox/handler.rs:124-167` — inbox handshake handler (current `is_new_convo` set site)
- `core/conversations/src/conversation/privatev1.rs:184-191, 219-260` — private-conversation handler
- `crates/client/src/client.rs:60-92``ChatClient` public surface
- `crates/client/src/delivery.rs``DeliveryService` trait
- `crates/client-ffi/src/api.rs:49-55, 220-285` — current FFI inbound result shape
- `crates/client-ffi/src/delivery.rs:8-15` — existing FFI callback pattern (`DeliverFn`)
- `bin/chat-cli/src/app.rs:46, 144-180` — current application consumption pattern

48
flake.lock generated
View File

@ -2,16 +2,18 @@
"nodes": {
"logos-delivery": {
"inputs": {
"nixpkgs": "nixpkgs",
"nixpkgs": [
"nixpkgs"
],
"rust-overlay": "rust-overlay",
"zerokit": "zerokit"
},
"locked": {
"lastModified": 1777287099,
"narHash": "sha256-H2gpbDUg6Wy+uIY9wL0t9ICUPN82B/vCnXZ2mo3Wa/E=",
"lastModified": 1779915920,
"narHash": "sha256-rcIgP6MVyUoNEH6xpdLrZtfd4OcvIcMUloX4IhRq5AA=",
"owner": "logos-messaging",
"repo": "logos-delivery",
"rev": "5034086fefe2f32bf95319cdd39aa62fc622e4bc",
"rev": "74057c66224f43b4aa27b42033d4ed52eed5c7a7",
"type": "github"
},
"original": {
@ -22,32 +24,16 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1770464364,
"narHash": "sha256-z5NJPSBwsLf/OfD8WTmh79tlSU8XgIbwmk6qB1/TFzY=",
"lastModified": 1779955849,
"narHash": "sha256-31mhzm2HpzRr/rupWAFfWBmt9SUjzwr5+giv5Nmb/rA=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "23d72dabcb3b12469f57b37170fcbc1789bd7457",
"rev": "a2c6938835fca96e4a10c8561d461efd2f91d04f",
"type": "github"
},
"original": {
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "23d72dabcb3b12469f57b37170fcbc1789bd7457",
"type": "github"
}
},
"nixpkgs_2": {
"locked": {
"lastModified": 1775710090,
"narHash": "sha256-ar3rofg+awPB8QXDaFJhJ2jJhu+KqN/PRCXeyuXR76E=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "4c1018dae018162ec878d42fec712642d214fdfa",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"ref": "nixos-unstable-small",
"repo": "nixpkgs",
"type": "github"
}
@ -55,7 +41,7 @@
"root": {
"inputs": {
"logos-delivery": "logos-delivery",
"nixpkgs": "nixpkgs_2",
"nixpkgs": "nixpkgs",
"rust-overlay": "rust-overlay_3"
}
},
@ -109,11 +95,11 @@
]
},
"locked": {
"lastModified": 1775877051,
"narHash": "sha256-wpSQm2PD/w4uRo2wb8utk0b5hOBkkg/CZ1xICY+qB7M=",
"lastModified": 1779938491,
"narHash": "sha256-khIekZCrhy3lQom4AZTmgBPV3DOFgAiopLUyUtbVGhY=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "08b4f3633471874c8894632ade1b78d75dbda002",
"rev": "02f536e36eaee387594ce2a02d90ff678d056e0f",
"type": "github"
},
"original": {
@ -131,17 +117,15 @@
"rust-overlay": "rust-overlay_2"
},
"locked": {
"lastModified": 1771279884,
"narHash": "sha256-tzkQPwSl4vPTUo1ixHh6NCENjsBDroMKTjifg2q8QX8=",
"owner": "vacp2p",
"repo": "zerokit",
"rev": "53b18098e6d5d046e3eb1ac338a8f4f651432477",
"rev": "5e64cb8822bee65eed6cf459f95ae72b80c6ba63",
"type": "github"
},
"original": {
"owner": "vacp2p",
"repo": "zerokit",
"rev": "53b18098e6d5d046e3eb1ac338a8f4f651432477",
"rev": "5e64cb8822bee65eed6cf459f95ae72b80c6ba63",
"type": "github"
}
}

View File

@ -2,12 +2,17 @@
description = "libchat - Logos Chat cryptographic library";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
# nixos-unstable-small has both crates.io UA fixes (NixOS/nixpkgs#512735,
# NixOS/nixpkgs#524985); nixos-unstable hasn't caught up yet as of 2026-05-28.
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable-small";
rust-overlay = {
url = "github:oxalica/rust-overlay";
inputs.nixpkgs.follows = "nixpkgs";
};
logos-delivery.url = "github:logos-messaging/logos-delivery";
logos-delivery = {
url = "github:logos-messaging/logos-delivery";
inputs.nixpkgs.follows = "nixpkgs";
};
};
outputs = { self, nixpkgs, rust-overlay, logos-delivery }:
@ -79,7 +84,7 @@
}
);
devShells = forAllSystems ({ pkgs }:
devShells = forAllSystems ({ pkgs, ... }:
let
rustToolchain = pkgs.rust-bin.fromRustupToolchainFile ./rust_toolchain.toml;
in
@ -89,6 +94,7 @@
rustToolchain
pkgs.pkg-config
pkgs.cmake
pkgs.perl
];
};
}