feat: causal history notify missing messages (#105)

* feat: causal history notify missing messages

* chore: fix test

* chore: fix clippy

* chore: update domain seprator
This commit is contained in:
kaichao 2026-05-25 11:54:52 +08:00 committed by GitHub
parent fa68714e2f
commit 279477cdeb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 433 additions and 8 deletions

View File

@ -0,0 +1,259 @@
//! Causal-history tracking for group conversations.
//!
//! Implements the *causal history* subset of the Scalable Data Sync (SDS)
//! protocol. Every outbound message carries a Lamport timestamp and the IDs of the
//! messages its sender had most recently seen. A receiver that finds a
//! referenced ID it has never delivered knows a message is missing.
//!
//! Scope:
//! - assign a deterministic message ID + Lamport timestamp to outbound msgs
//! - attach a bounded causal-history frontier to each outbound message
//! - on receive, detect referenced-but-unseen message IDs (gaps)
//!
//! Out of scope here: bloom-filter acknowledgements,
//! resend / outgoing buffer, incoming reorder buffer, Store-based recovery.
//! This is detection only — an out-of-order message is still delivered to
//! the application, but the gap it implies is reported.
//!
//! State is in-memory and session-scoped, matching the crate's current
//! in-memory MLS state.
use std::cell::RefCell;
use std::collections::{HashMap, HashSet, VecDeque};
use std::rc::Rc;
use crate::proto::{Bytes, HistoryEntry, ReliablePayload};
use crate::utils::{blake2b_hex, hash_size};
/// Number of most-recently-seen message IDs attached to each outbound message.
const CAUSAL_HISTORY_LEN: usize = 10;
/// A message detected as missing: referenced by a delivered message's causal
/// history but never seen locally.
///
/// This is the hook point for the future client event system (issue #97);
/// until that lands, callers drain these via [`CausalHistoryStore::take_missing`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MissingMessage {
pub conversation_id: String,
pub message_id: String,
}
/// Per-conversation causal state.
#[derive(Debug, Default)]
struct ConvoState {
/// Lamport logical clock.
lamport_clock: i32,
/// Every message ID delivered locally (own sends + received).
seen: HashSet<String>,
/// Bounded frontier of recently-seen IDs (oldest first) attached to
/// outbound messages as causal history.
frontier: VecDeque<String>,
/// Missing IDs already reported, so a gap is surfaced exactly once.
reported_missing: HashSet<String>,
}
impl ConvoState {
fn record_seen(&mut self, id: String) {
if self.seen.insert(id.clone()) {
self.frontier.push_back(id);
while self.frontier.len() > CAUSAL_HISTORY_LEN {
self.frontier.pop_front();
}
}
}
}
#[derive(Debug, Default)]
struct Inner {
convos: HashMap<String, ConvoState>,
/// Detected gaps, drained by the client (future #97 event bus).
missing: Vec<MissingMessage>,
}
/// Session-scoped causal-history store shared by every `GroupV1Convo`
/// instance.
///
/// Convos are rebuilt from storage on every inbound message, so this state
/// cannot live on the convo struct — it is shared through `InboxV2`, the
/// same way the MLS provider is.
#[derive(Debug, Clone, Default)]
pub struct CausalHistoryStore {
inner: Rc<RefCell<Inner>>,
}
impl CausalHistoryStore {
pub fn new() -> Self {
Self::default()
}
/// Build the reliability envelope for an outbound message: advance the
/// Lamport clock, derive a deterministic ID, and attach the causal
/// frontier.
pub fn on_send(&self, conversation_id: &str, sender: &str, content: &[u8]) -> ReliablePayload {
let mut inner = self.inner.borrow_mut();
let state = inner.convos.entry(conversation_id.to_owned()).or_default();
state.lamport_clock += 1;
let lamport = state.lamport_clock;
let message_id = derive_message_id(conversation_id, sender, lamport, content);
let causal_history = state
.frontier
.iter()
.cloned()
.map(|message_id| HistoryEntry {
message_id,
retrieval_hint: Bytes::new(),
})
.collect();
// Our own message joins the seen-set so it appears in our future
// causal history (and, later, so we can ack peers' references to it).
state.record_seen(message_id.clone());
ReliablePayload {
message_id,
channel_id: conversation_id.to_owned(),
lamport_timestamp: lamport,
causal_history,
bloom_filter: Bytes::new(),
content: Bytes::copy_from_slice(content),
}
}
/// Process an inbound reliability envelope. Records the message as seen,
/// merges the Lamport clock, and returns any referenced message IDs that
/// were never delivered locally (newly detected gaps).
pub fn on_receive(
&self,
conversation_id: &str,
payload: &ReliablePayload,
) -> Vec<MissingMessage> {
let mut inner = self.inner.borrow_mut();
let Inner { convos, missing } = &mut *inner;
let state = convos.entry(conversation_id.to_owned()).or_default();
// Lamport merge: the next local send will be strictly greater than
// anything we have observed.
state.lamport_clock = state.lamport_clock.max(payload.lamport_timestamp);
let mut detected = Vec::new();
for entry in &payload.causal_history {
let id = &entry.message_id;
if !state.seen.contains(id) && state.reported_missing.insert(id.clone()) {
let m = MissingMessage {
conversation_id: conversation_id.to_owned(),
message_id: id.clone(),
};
detected.push(m.clone());
missing.push(m);
}
}
state.record_seen(payload.message_id.clone());
detected
}
/// Drain all gaps detected so far.
///
/// This is the integration point for the client event system (issue
/// #97); until that lands, callers poll here.
pub fn take_missing(&self) -> Vec<MissingMessage> {
std::mem::take(&mut self.inner.borrow_mut().missing)
}
}
/// Deterministic, collision-resistant message ID.
///
/// A single sender increments its Lamport clock on every send, so
/// `(sender, lamport)` is unique per message; `channel_id` and `content` are
/// folded in as well. Receivers store the field verbatim, so cross-peer
/// agreement does not depend on re-derivation.
fn derive_message_id(channel_id: &str, sender: &str, lamport: i32, content: &[u8]) -> String {
let lamport_be = lamport.to_be_bytes();
blake2b_hex::<hash_size::MessageId>(&[
b"deterministic_frame_id|".as_slice(),
channel_id.as_bytes(),
b"|".as_slice(),
sender.as_bytes(),
b"|".as_slice(),
lamport_be.as_slice(),
b"|".as_slice(),
content,
])
}
#[cfg(test)]
mod tests {
use super::*;
fn payload(
store: &CausalHistoryStore,
convo: &str,
sender: &str,
body: &[u8],
) -> ReliablePayload {
store.on_send(convo, sender, body)
}
#[test]
fn lamport_clock_increments_per_send() {
let s = CausalHistoryStore::new();
let a = payload(&s, "c", "alice", b"1");
let b = payload(&s, "c", "alice", b"2");
assert_eq!(a.lamport_timestamp, 1);
assert_eq!(b.lamport_timestamp, 2);
// Second message's causal history references the first.
assert_eq!(b.causal_history.len(), 1);
assert_eq!(b.causal_history[0].message_id, a.message_id);
}
#[test]
fn detects_a_gap_when_a_referenced_message_was_never_seen() {
let sender = CausalHistoryStore::new();
let m1 = payload(&sender, "c", "alice", b"first");
let _m2 = payload(&sender, "c", "alice", b"second (dropped)");
let m3 = payload(&sender, "c", "alice", b"third");
let receiver = CausalHistoryStore::new();
assert!(receiver.on_receive("c", &m1).is_empty());
// m2 is never delivered to the receiver; m3 references it.
let missing = receiver.on_receive("c", &m3);
assert_eq!(missing.len(), 1);
assert_eq!(missing[0].message_id, _m2.message_id);
assert_eq!(missing[0].conversation_id, "c");
}
#[test]
fn no_gap_when_all_messages_are_delivered_in_order() {
let sender = CausalHistoryStore::new();
let m1 = payload(&sender, "c", "alice", b"a");
let m2 = payload(&sender, "c", "alice", b"b");
let receiver = CausalHistoryStore::new();
receiver.on_receive("c", &m1);
receiver.on_receive("c", &m2);
assert!(receiver.take_missing().is_empty());
}
#[test]
fn a_gap_is_reported_only_once() {
let sender = CausalHistoryStore::new();
let _m1 = payload(&sender, "c", "alice", b"a");
let m2 = payload(&sender, "c", "alice", b"b");
let m3 = payload(&sender, "c", "alice", b"c");
let receiver = CausalHistoryStore::new();
// Neither m1 nor m2 delivered; both m2 and m3 reference m1.
receiver.on_receive("c", &m2);
receiver.on_receive("c", &m3);
let missing = receiver.take_missing();
let m1_hits = missing
.iter()
.filter(|m| m.message_id == _m1.message_id)
.count();
assert_eq!(m1_hits, 1);
}
}

View File

@ -3,6 +3,7 @@ use std::sync::Arc;
use std::{cell::RefCell, rc::Rc};
use crate::account::LogosAccount;
use crate::causal_history::MissingMessage;
use crate::conversation::{Convo, GroupConvo};
use crate::{DeliveryService, RegistrationService};
@ -205,6 +206,10 @@ where
.collect())
}
pub fn take_missing_messages(&self) -> Vec<MissingMessage> {
self.pq_inbox.take_missing_messages()
}
pub fn send_content(
&mut self,
convo_id: ConversationId,

View File

@ -7,13 +7,16 @@ use std::rc::Rc;
use blake2::{Blake2b, Digest, digest::consts::U6};
use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload};
use chat_proto::logoschat::reliability::ReliablePayload;
use crypto::Ed25519VerifyingKey;
use openmls::prelude::tls_codec::Deserialize;
use openmls::prelude::*;
use openmls_libcrux_crypto::Provider as LibcruxProvider;
use openmls_traits::signatures::Signer as OpenMlsSigner;
use prost::Message as _;
use storage::ConversationKind;
use crate::causal_history::CausalHistoryStore;
use crate::types::AccountId;
use crate::{
DeliveryService,
@ -66,10 +69,12 @@ pub trait MlsContext {
pub struct GroupV1Convo<MlsCtx, DS, KP> {
ctx: Rc<RefCell<MlsCtx>>,
account_id: AccountId,
ds: Rc<RefCell<DS>>,
keypkg_provider: Rc<RefCell<KP>>,
mls_group: MlsGroup,
convo_id: String,
causal: CausalHistoryStore,
}
impl<MlsCtx, DS, KP> std::fmt::Debug for GroupV1Convo<MlsCtx, DS, KP>
@ -96,8 +101,10 @@ where
// Create a new conversation with the creator as the only participant.
pub fn new(
ctx: Rc<RefCell<MlsCtx>>,
account_id: AccountId,
ds: Rc<RefCell<DS>>,
keypkg_provider: Rc<RefCell<KP>>,
causal: CausalHistoryStore,
) -> Result<Self, ChatError> {
let config = Self::mls_create_config();
let mls_group = {
@ -115,18 +122,22 @@ where
Ok(Self {
ctx,
account_id,
ds,
keypkg_provider,
mls_group,
convo_id,
causal,
})
}
// Constructs a new conversation upon receiving a MlsWelcome message.
pub fn new_from_welcome(
ctx: Rc<RefCell<MlsCtx>>,
account_id: AccountId,
ds: Rc<RefCell<DS>>,
keypkg_provider: Rc<RefCell<KP>>,
causal: CausalHistoryStore,
welcome: Welcome,
) -> Result<Self, ChatError> {
let mls_group = {
@ -146,17 +157,21 @@ where
Ok(Self {
ctx,
account_id,
ds,
keypkg_provider,
mls_group,
convo_id,
causal,
})
}
pub fn load(
ctx: Rc<RefCell<MlsCtx>>,
account_id: AccountId,
ds: Rc<RefCell<DS>>,
keypkg_provider: Rc<RefCell<KP>>,
causal: CausalHistoryStore,
convo_id: String,
group_id: GroupId,
) -> Result<Self, ChatError> {
@ -168,10 +183,12 @@ where
Ok(GroupV1Convo {
ctx,
account_id,
ds,
keypkg_provider,
mls_group,
convo_id,
causal,
})
}
@ -264,9 +281,14 @@ where
) -> Result<Vec<AddressedEncryptedPayload>, ChatError> {
let ctx_ref = self.ctx.borrow();
let provider = ctx_ref.provider();
let sender_id = self.account_id.as_str();
let reliable = self.causal.on_send(&self.convo_id, sender_id, content);
let wire = reliable.encode_to_vec();
let mls_message_out = self
.mls_group
.create_message(provider, ctx_ref.ident(), content)
.create_message(provider, ctx_ref.ident(), &wire)
.unwrap();
let a = AddressedEncryptedPayload {
@ -316,11 +338,15 @@ where
.map_err(ChatError::generic)?;
match processed.into_content() {
ProcessedMessageContent::ApplicationMessage(msg) => Ok(Some(ContentData {
conversation_id: hex::encode(self.mls_group.group_id().as_slice()),
data: msg.into_bytes(),
is_new_convo: false,
})),
ProcessedMessageContent::ApplicationMessage(msg) => {
let reliable = ReliablePayload::decode(msg.into_bytes().as_slice())?;
self.causal.on_receive(&self.convo_id, &reliable);
Ok(Some(ContentData {
conversation_id: hex::encode(self.mls_group.group_id().as_slice()),
data: reliable.content.to_vec(),
is_new_convo: false,
}))
}
ProcessedMessageContent::StagedCommitMessage(commit) => {
self.mls_group
.merge_staged_commit(provider, *commit)

View File

@ -14,6 +14,8 @@ use crate::ChatError;
use crate::DeliveryService;
use crate::RegistrationService;
use crate::account::LogosAccount;
use crate::causal_history::CausalHistoryStore;
use crate::causal_history::MissingMessage;
use crate::conversation::GroupConvo;
use crate::conversation::group_v1::MlsContext;
use crate::conversation::{GroupV1Convo, IdentityProvider};
@ -82,6 +84,7 @@ pub struct InboxV2<DS, RS, CS> {
ds: Rc<RefCell<DS>>,
reg_service: Rc<RefCell<RS>>,
store: Rc<RefCell<CS>>,
causal: CausalHistoryStore,
ctx: Rc<RefCell<PqMlsContext>>,
}
@ -104,6 +107,7 @@ where
ds,
reg_service,
store,
causal: CausalHistoryStore::new(),
ctx: Rc::new(RefCell::new(PqMlsContext {
ident_provider: account,
provider,
@ -139,7 +143,13 @@ where
}
pub fn create_group_v1(&self) -> Result<GroupV1Convo<PqMlsContext, DS, RS>, ChatError> {
GroupV1Convo::new(self.ctx.clone(), self.ds.clone(), self.reg_service.clone())
GroupV1Convo::new(
self.ctx.clone(),
self.account_id.clone(),
self.ds.clone(),
self.reg_service.clone(),
self.causal.clone(),
)
}
pub fn handle_frame(&self, payload_bytes: &[u8]) -> Result<(), ChatError> {
@ -181,8 +191,10 @@ where
let convo = GroupV1Convo::new_from_welcome(
self.ctx.clone(),
self.account_id.clone(),
self.ds.clone(),
self.reg_service.clone(),
self.causal.clone(),
welcome,
)?;
self.persist_convo(convo)
@ -217,14 +229,20 @@ where
let group_id = GroupId::from_slice(&group_id_bytes);
let convo = GroupV1Convo::load(
self.ctx.clone(),
self.account_id.clone(),
self.ds.clone(),
self.reg_service.clone(),
self.causal.clone(),
convo_id,
group_id,
)?;
Ok(convo)
}
pub fn take_missing_messages(&self) -> Vec<MissingMessage> {
self.causal.take_missing()
}
}
#[derive(Clone, PartialEq, Message)]

View File

@ -1,4 +1,5 @@
mod account;
mod causal_history;
mod context;
mod conversation;
mod crypto;
@ -11,6 +12,7 @@ mod types;
mod utils;
pub use account::LogosAccount;
pub use causal_history::MissingMessage;
pub use chat_sqlite::ChatStorage;
pub use chat_sqlite::StorageConfig;
pub use context::{Context, ConversationId, ConversationIdOwned, Introduction};

View File

@ -4,6 +4,7 @@ pub use chat_proto::logoschat::encryption::{EncryptedPayload, InboxHandshakeV1};
pub use chat_proto::logoschat::envelope::EnvelopeV1;
pub use chat_proto::logoschat::inbox::{InboxV1Frame, inbox_v1_frame};
pub use chat_proto::logoschat::invite::InvitePrivateV1;
pub use chat_proto::logoschat::reliability::{HistoryEntry, ReliablePayload};
pub use prost::Message;
pub use prost::bytes::Bytes;

View File

@ -11,7 +11,7 @@ pub fn timestamp_millis() -> i64 {
/// Track hash sizes in use across the crate.
pub mod hash_size {
use blake2::digest::{
consts::U64,
consts::{U32, U64},
generic_array::ArrayLength,
typenum::{IsLessOrEqual, NonZero},
};
@ -40,6 +40,8 @@ pub mod hash_size {
AccountId => U8,
/// Conversation ID hash length
ConvoId => U6,
/// Causal history message ID hash length (256-bit, collision-resistant)
MessageId => U32,
}
}

View File

@ -0,0 +1,112 @@
//! End-to-end test for causal-history gap detection in group conversation.
//!
//! Saro and Raya share a group. Saro sends three messages; Raya never
//! receives the second one. The third message's causal history references
//! the missing second message, so Raya must detect and report the gap.
use std::ops::{Deref, DerefMut};
use components::{EphemeralRegistry, LocalBroadcaster, MemStore};
use libchat::{Context, MissingMessage};
struct Client {
inner: Context<LocalBroadcaster, EphemeralRegistry, MemStore>,
}
impl Client {
fn init(ctx: Context<LocalBroadcaster, EphemeralRegistry, MemStore>) -> Self {
Client { inner: ctx }
}
/// Poll every pending payload and feed it to the protocol.
fn process_messages(&mut self) {
let messages: Vec<_> = {
let mut ds = self.inner.ds();
std::iter::from_fn(|| ds.poll()).collect()
};
for data in messages {
self.inner.handle_payload(&data).unwrap();
}
}
/// Poll every pending payload and discard it — simulates messages that
/// never reach this client.
fn drop_pending_messages(&mut self) {
let mut ds = self.inner.ds();
while ds.poll().is_some() {}
}
}
impl Deref for Client {
type Target = Context<LocalBroadcaster, EphemeralRegistry, MemStore>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl DerefMut for Client {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
#[test]
fn missing_group_message_is_detected() {
let ds = LocalBroadcaster::new();
let rs = EphemeralRegistry::new();
let saro_ctx =
Context::new_with_name("saro", ds.new_consumer(), rs.clone(), MemStore::new()).unwrap();
let raya_ctx = Context::new_with_name("raya", ds.clone(), rs.clone(), MemStore::new()).unwrap();
let mut saro = Client::init(saro_ctx);
let mut raya = Client::init(raya_ctx);
// Saro creates a group with Raya.
let raya_id = raya.account_id().clone();
let convo_id = saro
.create_group_convo(&[&raya_id])
.unwrap()
.id()
.to_string();
// Raya joins (processes the Welcome + commit).
raya.process_messages();
// Message 1 is delivered normally.
saro.get_convo(convo_id.as_str())
.unwrap()
.send_content(b"first")
.unwrap();
raya.process_messages();
assert!(
raya.take_missing_messages().is_empty(),
"no gap expected while every message is delivered"
);
// Message 2 is published but never reaches Raya.
saro.get_convo(convo_id.as_str())
.unwrap()
.send_content(b"second")
.unwrap();
raya.drop_pending_messages();
// Message 3 is delivered; its causal history references the missing M2.
saro.get_convo(convo_id.as_str())
.unwrap()
.send_content(b"third")
.unwrap();
raya.process_messages();
let missing: Vec<MissingMessage> = raya.take_missing_messages();
assert_eq!(missing.len(), 1, "exactly one message should be missing");
assert_eq!(missing[0].conversation_id, convo_id);
assert!(
!missing[0].message_id.is_empty(),
"the missing message must be identified"
);
// Draining clears the report; a resolved gap is not surfaced again.
assert!(raya.take_missing_messages().is_empty());
}