From 279477cdeb1d96af2e514aac932c896e3cd9245b Mon Sep 17 00:00:00 2001 From: kaichao Date: Mon, 25 May 2026 11:54:52 +0800 Subject: [PATCH] feat: causal history notify missing messages (#105) * feat: causal history notify missing messages * chore: fix test * chore: fix clippy * chore: update domain seprator --- core/conversations/src/causal_history.rs | 259 ++++++++++++++++++ core/conversations/src/context.rs | 5 + .../src/conversation/group_v1.rs | 38 ++- core/conversations/src/inbox_v2.rs | 20 +- core/conversations/src/lib.rs | 2 + core/conversations/src/proto.rs | 1 + core/conversations/src/utils.rs | 4 +- .../tests/causal_history.rs | 112 ++++++++ 8 files changed, 433 insertions(+), 8 deletions(-) create mode 100644 core/conversations/src/causal_history.rs create mode 100644 core/integration_tests_core/tests/causal_history.rs diff --git a/core/conversations/src/causal_history.rs b/core/conversations/src/causal_history.rs new file mode 100644 index 0000000..3f6b2e8 --- /dev/null +++ b/core/conversations/src/causal_history.rs @@ -0,0 +1,259 @@ +//! Causal-history tracking for group conversations. +//! +//! Implements the *causal history* subset of the Scalable Data Sync (SDS) +//! protocol. Every outbound message carries a Lamport timestamp and the IDs of the +//! messages its sender had most recently seen. A receiver that finds a +//! referenced ID it has never delivered knows a message is missing. +//! +//! Scope: +//! - assign a deterministic message ID + Lamport timestamp to outbound msgs +//! - attach a bounded causal-history frontier to each outbound message +//! - on receive, detect referenced-but-unseen message IDs (gaps) +//! +//! Out of scope here: bloom-filter acknowledgements, +//! resend / outgoing buffer, incoming reorder buffer, Store-based recovery. +//! This is detection only — an out-of-order message is still delivered to +//! the application, but the gap it implies is reported. +//! +//! State is in-memory and session-scoped, matching the crate's current +//! in-memory MLS state. + +use std::cell::RefCell; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::rc::Rc; + +use crate::proto::{Bytes, HistoryEntry, ReliablePayload}; +use crate::utils::{blake2b_hex, hash_size}; + +/// Number of most-recently-seen message IDs attached to each outbound message. +const CAUSAL_HISTORY_LEN: usize = 10; + +/// A message detected as missing: referenced by a delivered message's causal +/// history but never seen locally. +/// +/// This is the hook point for the future client event system (issue #97); +/// until that lands, callers drain these via [`CausalHistoryStore::take_missing`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct MissingMessage { + pub conversation_id: String, + pub message_id: String, +} + +/// Per-conversation causal state. +#[derive(Debug, Default)] +struct ConvoState { + /// Lamport logical clock. + lamport_clock: i32, + /// Every message ID delivered locally (own sends + received). + seen: HashSet, + /// Bounded frontier of recently-seen IDs (oldest first) attached to + /// outbound messages as causal history. + frontier: VecDeque, + /// Missing IDs already reported, so a gap is surfaced exactly once. + reported_missing: HashSet, +} + +impl ConvoState { + fn record_seen(&mut self, id: String) { + if self.seen.insert(id.clone()) { + self.frontier.push_back(id); + while self.frontier.len() > CAUSAL_HISTORY_LEN { + self.frontier.pop_front(); + } + } + } +} + +#[derive(Debug, Default)] +struct Inner { + convos: HashMap, + /// Detected gaps, drained by the client (future #97 event bus). + missing: Vec, +} + +/// Session-scoped causal-history store shared by every `GroupV1Convo` +/// instance. +/// +/// Convos are rebuilt from storage on every inbound message, so this state +/// cannot live on the convo struct — it is shared through `InboxV2`, the +/// same way the MLS provider is. +#[derive(Debug, Clone, Default)] +pub struct CausalHistoryStore { + inner: Rc>, +} + +impl CausalHistoryStore { + pub fn new() -> Self { + Self::default() + } + + /// Build the reliability envelope for an outbound message: advance the + /// Lamport clock, derive a deterministic ID, and attach the causal + /// frontier. + pub fn on_send(&self, conversation_id: &str, sender: &str, content: &[u8]) -> ReliablePayload { + let mut inner = self.inner.borrow_mut(); + let state = inner.convos.entry(conversation_id.to_owned()).or_default(); + + state.lamport_clock += 1; + let lamport = state.lamport_clock; + let message_id = derive_message_id(conversation_id, sender, lamport, content); + + let causal_history = state + .frontier + .iter() + .cloned() + .map(|message_id| HistoryEntry { + message_id, + retrieval_hint: Bytes::new(), + }) + .collect(); + + // Our own message joins the seen-set so it appears in our future + // causal history (and, later, so we can ack peers' references to it). + state.record_seen(message_id.clone()); + + ReliablePayload { + message_id, + channel_id: conversation_id.to_owned(), + lamport_timestamp: lamport, + causal_history, + bloom_filter: Bytes::new(), + content: Bytes::copy_from_slice(content), + } + } + + /// Process an inbound reliability envelope. Records the message as seen, + /// merges the Lamport clock, and returns any referenced message IDs that + /// were never delivered locally (newly detected gaps). + pub fn on_receive( + &self, + conversation_id: &str, + payload: &ReliablePayload, + ) -> Vec { + let mut inner = self.inner.borrow_mut(); + let Inner { convos, missing } = &mut *inner; + let state = convos.entry(conversation_id.to_owned()).or_default(); + + // Lamport merge: the next local send will be strictly greater than + // anything we have observed. + state.lamport_clock = state.lamport_clock.max(payload.lamport_timestamp); + + let mut detected = Vec::new(); + for entry in &payload.causal_history { + let id = &entry.message_id; + if !state.seen.contains(id) && state.reported_missing.insert(id.clone()) { + let m = MissingMessage { + conversation_id: conversation_id.to_owned(), + message_id: id.clone(), + }; + detected.push(m.clone()); + missing.push(m); + } + } + + state.record_seen(payload.message_id.clone()); + detected + } + + /// Drain all gaps detected so far. + /// + /// This is the integration point for the client event system (issue + /// #97); until that lands, callers poll here. + pub fn take_missing(&self) -> Vec { + std::mem::take(&mut self.inner.borrow_mut().missing) + } +} + +/// Deterministic, collision-resistant message ID. +/// +/// A single sender increments its Lamport clock on every send, so +/// `(sender, lamport)` is unique per message; `channel_id` and `content` are +/// folded in as well. Receivers store the field verbatim, so cross-peer +/// agreement does not depend on re-derivation. +fn derive_message_id(channel_id: &str, sender: &str, lamport: i32, content: &[u8]) -> String { + let lamport_be = lamport.to_be_bytes(); + blake2b_hex::(&[ + b"deterministic_frame_id|".as_slice(), + channel_id.as_bytes(), + b"|".as_slice(), + sender.as_bytes(), + b"|".as_slice(), + lamport_be.as_slice(), + b"|".as_slice(), + content, + ]) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn payload( + store: &CausalHistoryStore, + convo: &str, + sender: &str, + body: &[u8], + ) -> ReliablePayload { + store.on_send(convo, sender, body) + } + + #[test] + fn lamport_clock_increments_per_send() { + let s = CausalHistoryStore::new(); + let a = payload(&s, "c", "alice", b"1"); + let b = payload(&s, "c", "alice", b"2"); + assert_eq!(a.lamport_timestamp, 1); + assert_eq!(b.lamport_timestamp, 2); + // Second message's causal history references the first. + assert_eq!(b.causal_history.len(), 1); + assert_eq!(b.causal_history[0].message_id, a.message_id); + } + + #[test] + fn detects_a_gap_when_a_referenced_message_was_never_seen() { + let sender = CausalHistoryStore::new(); + let m1 = payload(&sender, "c", "alice", b"first"); + let _m2 = payload(&sender, "c", "alice", b"second (dropped)"); + let m3 = payload(&sender, "c", "alice", b"third"); + + let receiver = CausalHistoryStore::new(); + assert!(receiver.on_receive("c", &m1).is_empty()); + // m2 is never delivered to the receiver; m3 references it. + let missing = receiver.on_receive("c", &m3); + + assert_eq!(missing.len(), 1); + assert_eq!(missing[0].message_id, _m2.message_id); + assert_eq!(missing[0].conversation_id, "c"); + } + + #[test] + fn no_gap_when_all_messages_are_delivered_in_order() { + let sender = CausalHistoryStore::new(); + let m1 = payload(&sender, "c", "alice", b"a"); + let m2 = payload(&sender, "c", "alice", b"b"); + + let receiver = CausalHistoryStore::new(); + receiver.on_receive("c", &m1); + receiver.on_receive("c", &m2); + assert!(receiver.take_missing().is_empty()); + } + + #[test] + fn a_gap_is_reported_only_once() { + let sender = CausalHistoryStore::new(); + let _m1 = payload(&sender, "c", "alice", b"a"); + let m2 = payload(&sender, "c", "alice", b"b"); + let m3 = payload(&sender, "c", "alice", b"c"); + + let receiver = CausalHistoryStore::new(); + // Neither m1 nor m2 delivered; both m2 and m3 reference m1. + receiver.on_receive("c", &m2); + receiver.on_receive("c", &m3); + let missing = receiver.take_missing(); + let m1_hits = missing + .iter() + .filter(|m| m.message_id == _m1.message_id) + .count(); + assert_eq!(m1_hits, 1); + } +} diff --git a/core/conversations/src/context.rs b/core/conversations/src/context.rs index a01b4f7..58bbc97 100644 --- a/core/conversations/src/context.rs +++ b/core/conversations/src/context.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::{cell::RefCell, rc::Rc}; use crate::account::LogosAccount; +use crate::causal_history::MissingMessage; use crate::conversation::{Convo, GroupConvo}; use crate::{DeliveryService, RegistrationService}; @@ -205,6 +206,10 @@ where .collect()) } + pub fn take_missing_messages(&self) -> Vec { + self.pq_inbox.take_missing_messages() + } + pub fn send_content( &mut self, convo_id: ConversationId, diff --git a/core/conversations/src/conversation/group_v1.rs b/core/conversations/src/conversation/group_v1.rs index eadc441..4bb704d 100644 --- a/core/conversations/src/conversation/group_v1.rs +++ b/core/conversations/src/conversation/group_v1.rs @@ -7,13 +7,16 @@ use std::rc::Rc; use blake2::{Blake2b, Digest, digest::consts::U6}; use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload}; +use chat_proto::logoschat::reliability::ReliablePayload; use crypto::Ed25519VerifyingKey; use openmls::prelude::tls_codec::Deserialize; use openmls::prelude::*; use openmls_libcrux_crypto::Provider as LibcruxProvider; use openmls_traits::signatures::Signer as OpenMlsSigner; +use prost::Message as _; use storage::ConversationKind; +use crate::causal_history::CausalHistoryStore; use crate::types::AccountId; use crate::{ DeliveryService, @@ -66,10 +69,12 @@ pub trait MlsContext { pub struct GroupV1Convo { ctx: Rc>, + account_id: AccountId, ds: Rc>, keypkg_provider: Rc>, mls_group: MlsGroup, convo_id: String, + causal: CausalHistoryStore, } impl std::fmt::Debug for GroupV1Convo @@ -96,8 +101,10 @@ where // Create a new conversation with the creator as the only participant. pub fn new( ctx: Rc>, + account_id: AccountId, ds: Rc>, keypkg_provider: Rc>, + causal: CausalHistoryStore, ) -> Result { let config = Self::mls_create_config(); let mls_group = { @@ -115,18 +122,22 @@ where Ok(Self { ctx, + account_id, ds, keypkg_provider, mls_group, convo_id, + causal, }) } // Constructs a new conversation upon receiving a MlsWelcome message. pub fn new_from_welcome( ctx: Rc>, + account_id: AccountId, ds: Rc>, keypkg_provider: Rc>, + causal: CausalHistoryStore, welcome: Welcome, ) -> Result { let mls_group = { @@ -146,17 +157,21 @@ where Ok(Self { ctx, + account_id, ds, keypkg_provider, mls_group, convo_id, + causal, }) } pub fn load( ctx: Rc>, + account_id: AccountId, ds: Rc>, keypkg_provider: Rc>, + causal: CausalHistoryStore, convo_id: String, group_id: GroupId, ) -> Result { @@ -168,10 +183,12 @@ where Ok(GroupV1Convo { ctx, + account_id, ds, keypkg_provider, mls_group, convo_id, + causal, }) } @@ -264,9 +281,14 @@ where ) -> Result, ChatError> { let ctx_ref = self.ctx.borrow(); let provider = ctx_ref.provider(); + + let sender_id = self.account_id.as_str(); + let reliable = self.causal.on_send(&self.convo_id, sender_id, content); + let wire = reliable.encode_to_vec(); + let mls_message_out = self .mls_group - .create_message(provider, ctx_ref.ident(), content) + .create_message(provider, ctx_ref.ident(), &wire) .unwrap(); let a = AddressedEncryptedPayload { @@ -316,11 +338,15 @@ where .map_err(ChatError::generic)?; match processed.into_content() { - ProcessedMessageContent::ApplicationMessage(msg) => Ok(Some(ContentData { - conversation_id: hex::encode(self.mls_group.group_id().as_slice()), - data: msg.into_bytes(), - is_new_convo: false, - })), + ProcessedMessageContent::ApplicationMessage(msg) => { + let reliable = ReliablePayload::decode(msg.into_bytes().as_slice())?; + self.causal.on_receive(&self.convo_id, &reliable); + Ok(Some(ContentData { + conversation_id: hex::encode(self.mls_group.group_id().as_slice()), + data: reliable.content.to_vec(), + is_new_convo: false, + })) + } ProcessedMessageContent::StagedCommitMessage(commit) => { self.mls_group .merge_staged_commit(provider, *commit) diff --git a/core/conversations/src/inbox_v2.rs b/core/conversations/src/inbox_v2.rs index ef7b030..fd1080e 100644 --- a/core/conversations/src/inbox_v2.rs +++ b/core/conversations/src/inbox_v2.rs @@ -14,6 +14,8 @@ use crate::ChatError; use crate::DeliveryService; use crate::RegistrationService; use crate::account::LogosAccount; +use crate::causal_history::CausalHistoryStore; +use crate::causal_history::MissingMessage; use crate::conversation::GroupConvo; use crate::conversation::group_v1::MlsContext; use crate::conversation::{GroupV1Convo, IdentityProvider}; @@ -82,6 +84,7 @@ pub struct InboxV2 { ds: Rc>, reg_service: Rc>, store: Rc>, + causal: CausalHistoryStore, ctx: Rc>, } @@ -104,6 +107,7 @@ where ds, reg_service, store, + causal: CausalHistoryStore::new(), ctx: Rc::new(RefCell::new(PqMlsContext { ident_provider: account, provider, @@ -139,7 +143,13 @@ where } pub fn create_group_v1(&self) -> Result, ChatError> { - GroupV1Convo::new(self.ctx.clone(), self.ds.clone(), self.reg_service.clone()) + GroupV1Convo::new( + self.ctx.clone(), + self.account_id.clone(), + self.ds.clone(), + self.reg_service.clone(), + self.causal.clone(), + ) } pub fn handle_frame(&self, payload_bytes: &[u8]) -> Result<(), ChatError> { @@ -181,8 +191,10 @@ where let convo = GroupV1Convo::new_from_welcome( self.ctx.clone(), + self.account_id.clone(), self.ds.clone(), self.reg_service.clone(), + self.causal.clone(), welcome, )?; self.persist_convo(convo) @@ -217,14 +229,20 @@ where let group_id = GroupId::from_slice(&group_id_bytes); let convo = GroupV1Convo::load( self.ctx.clone(), + self.account_id.clone(), self.ds.clone(), self.reg_service.clone(), + self.causal.clone(), convo_id, group_id, )?; Ok(convo) } + + pub fn take_missing_messages(&self) -> Vec { + self.causal.take_missing() + } } #[derive(Clone, PartialEq, Message)] diff --git a/core/conversations/src/lib.rs b/core/conversations/src/lib.rs index 0d75124..3439907 100644 --- a/core/conversations/src/lib.rs +++ b/core/conversations/src/lib.rs @@ -1,4 +1,5 @@ mod account; +mod causal_history; mod context; mod conversation; mod crypto; @@ -11,6 +12,7 @@ mod types; mod utils; pub use account::LogosAccount; +pub use causal_history::MissingMessage; pub use chat_sqlite::ChatStorage; pub use chat_sqlite::StorageConfig; pub use context::{Context, ConversationId, ConversationIdOwned, Introduction}; diff --git a/core/conversations/src/proto.rs b/core/conversations/src/proto.rs index 18f3119..894ebfe 100644 --- a/core/conversations/src/proto.rs +++ b/core/conversations/src/proto.rs @@ -4,6 +4,7 @@ pub use chat_proto::logoschat::encryption::{EncryptedPayload, InboxHandshakeV1}; pub use chat_proto::logoschat::envelope::EnvelopeV1; pub use chat_proto::logoschat::inbox::{InboxV1Frame, inbox_v1_frame}; pub use chat_proto::logoschat::invite::InvitePrivateV1; +pub use chat_proto::logoschat::reliability::{HistoryEntry, ReliablePayload}; pub use prost::Message; pub use prost::bytes::Bytes; diff --git a/core/conversations/src/utils.rs b/core/conversations/src/utils.rs index 93eaf85..65ef283 100644 --- a/core/conversations/src/utils.rs +++ b/core/conversations/src/utils.rs @@ -11,7 +11,7 @@ pub fn timestamp_millis() -> i64 { /// Track hash sizes in use across the crate. pub mod hash_size { use blake2::digest::{ - consts::U64, + consts::{U32, U64}, generic_array::ArrayLength, typenum::{IsLessOrEqual, NonZero}, }; @@ -40,6 +40,8 @@ pub mod hash_size { AccountId => U8, /// Conversation ID hash length ConvoId => U6, + /// Causal history message ID hash length (256-bit, collision-resistant) + MessageId => U32, } } diff --git a/core/integration_tests_core/tests/causal_history.rs b/core/integration_tests_core/tests/causal_history.rs new file mode 100644 index 0000000..69333cb --- /dev/null +++ b/core/integration_tests_core/tests/causal_history.rs @@ -0,0 +1,112 @@ +//! End-to-end test for causal-history gap detection in group conversation. +//! +//! Saro and Raya share a group. Saro sends three messages; Raya never +//! receives the second one. The third message's causal history references +//! the missing second message, so Raya must detect and report the gap. + +use std::ops::{Deref, DerefMut}; + +use components::{EphemeralRegistry, LocalBroadcaster, MemStore}; +use libchat::{Context, MissingMessage}; + +struct Client { + inner: Context, +} + +impl Client { + fn init(ctx: Context) -> Self { + Client { inner: ctx } + } + + /// Poll every pending payload and feed it to the protocol. + fn process_messages(&mut self) { + let messages: Vec<_> = { + let mut ds = self.inner.ds(); + std::iter::from_fn(|| ds.poll()).collect() + }; + for data in messages { + self.inner.handle_payload(&data).unwrap(); + } + } + + /// Poll every pending payload and discard it — simulates messages that + /// never reach this client. + fn drop_pending_messages(&mut self) { + let mut ds = self.inner.ds(); + while ds.poll().is_some() {} + } +} + +impl Deref for Client { + type Target = Context; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for Client { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +#[test] +fn missing_group_message_is_detected() { + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + + let saro_ctx = + Context::new_with_name("saro", ds.new_consumer(), rs.clone(), MemStore::new()).unwrap(); + + let raya_ctx = Context::new_with_name("raya", ds.clone(), rs.clone(), MemStore::new()).unwrap(); + + let mut saro = Client::init(saro_ctx); + let mut raya = Client::init(raya_ctx); + + // Saro creates a group with Raya. + let raya_id = raya.account_id().clone(); + let convo_id = saro + .create_group_convo(&[&raya_id]) + .unwrap() + .id() + .to_string(); + + // Raya joins (processes the Welcome + commit). + raya.process_messages(); + + // Message 1 is delivered normally. + saro.get_convo(convo_id.as_str()) + .unwrap() + .send_content(b"first") + .unwrap(); + raya.process_messages(); + assert!( + raya.take_missing_messages().is_empty(), + "no gap expected while every message is delivered" + ); + + // Message 2 is published but never reaches Raya. + saro.get_convo(convo_id.as_str()) + .unwrap() + .send_content(b"second") + .unwrap(); + raya.drop_pending_messages(); + + // Message 3 is delivered; its causal history references the missing M2. + saro.get_convo(convo_id.as_str()) + .unwrap() + .send_content(b"third") + .unwrap(); + raya.process_messages(); + + let missing: Vec = raya.take_missing_messages(); + assert_eq!(missing.len(), 1, "exactly one message should be missing"); + assert_eq!(missing[0].conversation_id, convo_id); + assert!( + !missing[0].message_id.is_empty(), + "the missing message must be identified" + ); + + // Draining clears the report; a resolved gap is not surfaced again. + assert!(raya.take_missing_messages().is_empty()); +}