From 4df23aad634513b42190266c7bf0064f95b1b8da Mon Sep 17 00:00:00 2001 From: kaichao Date: Tue, 2 Jun 2026 21:55:19 +0800 Subject: [PATCH] Include sender information on missing messages (#120) * feat: prefix sender id * chore: add message struct for sender info * chore: refactor struct name for frontier * chore: reuse duplicate test * chore: fix clippy * feat: use sender_id in wire * chore: remove result * chore: fix nix build * chore: bump chat_proto version --- Cargo.lock | 2 +- core/conversations/Cargo.toml | 2 +- core/conversations/src/causal_history.rs | 100 ++++++++++++++---- core/conversations/src/lib.rs | 2 +- .../tests/causal_history.rs | 7 +- flake.nix | 2 +- 6 files changed, 88 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 35aeac1..0e1ba08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -291,7 +291,7 @@ dependencies = [ [[package]] name = "chat-proto" version = "0.1.0" -source = "git+https://github.com/logos-messaging/chat_proto#44d5360c41d721a011d20ee69a75a85357b33b0e" +source = "git+https://github.com/logos-messaging/chat_proto?rev=37ec98a151f6d50aab2905802ac0a896477e62ea#37ec98a151f6d50aab2905802ac0a896477e62ea" dependencies = [ "prost", ] diff --git a/core/conversations/Cargo.toml b/core/conversations/Cargo.toml index eff4823..9e80862 100644 --- a/core/conversations/Cargo.toml +++ b/core/conversations/Cargo.toml @@ -15,7 +15,7 @@ storage = { workspace = true } # External dependencies (sorted) base64 = "0.22" -chat-proto = { git = "https://github.com/logos-messaging/chat_proto" } +chat-proto = { git = "https://github.com/logos-messaging/chat_proto", rev = "37ec98a151f6d50aab2905802ac0a896477e62ea" } double-ratchets = { path = "../double-ratchets" } hex = "0.4.3" openmls = { version = "0.8.1", features = ["libcrux-provider"] } diff --git a/core/conversations/src/causal_history.rs b/core/conversations/src/causal_history.rs index 3f6b2e8..331da7d 100644 --- a/core/conversations/src/causal_history.rs +++ b/core/conversations/src/causal_history.rs @@ -25,6 +25,40 @@ use std::rc::Rc; use crate::proto::{Bytes, HistoryEntry, ReliablePayload}; use crate::utils::{blake2b_hex, hash_size}; +/// Frontier includes the message's metadata which can be referened by other +/// messages inside a conversation. +/// +/// Carries the sender's `account_id` alongside a deterministic +/// content/Lamport hash, so receivers can attribute referenced-but-unseen +/// IDs to a peer without consulting local state. The sender component is a +/// **routing hint, not authoritative**: when a missing message is recovered, +/// authorship is verified against the MLS leaf credential. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Frontier { + sender_id: String, + message_id: String, +} + +impl Frontier { + /// Construct a fresh `Frontier` for an outbound message. + pub fn new(sender_id: String, message_id: String) -> Self { + Self { + sender_id, + message_id, + } + } + + /// Sender's `account_id`, verbatim. Treat as a routing hint only. + pub fn sender_id(&self) -> &str { + &self.sender_id + } + + /// Deterministic hash of `(channel, sender, lamport, content)`. + pub fn message_id(&self) -> &str { + &self.message_id + } +} + /// Number of most-recently-seen message IDs attached to each outbound message. const CAUSAL_HISTORY_LEN: usize = 10; @@ -36,7 +70,7 @@ const CAUSAL_HISTORY_LEN: usize = 10; #[derive(Debug, Clone, PartialEq, Eq)] pub struct MissingMessage { pub conversation_id: String, - pub message_id: String, + pub frontier: Frontier, } /// Per-conversation causal state. @@ -45,20 +79,20 @@ struct ConvoState { /// Lamport logical clock. lamport_clock: i32, /// Every message ID delivered locally (own sends + received). - seen: HashSet, + seen: HashSet, /// Bounded frontier of recently-seen IDs (oldest first) attached to /// outbound messages as causal history. - frontier: VecDeque, + frontiers: VecDeque, /// Missing IDs already reported, so a gap is surfaced exactly once. - reported_missing: HashSet, + reported_missing: HashSet, } impl ConvoState { - fn record_seen(&mut self, id: String) { - if self.seen.insert(id.clone()) { - self.frontier.push_back(id); - while self.frontier.len() > CAUSAL_HISTORY_LEN { - self.frontier.pop_front(); + fn record_seen(&mut self, info: Frontier) { + if self.seen.insert(info.clone()) { + self.frontiers.push_back(info); + while self.frontiers.len() > CAUSAL_HISTORY_LEN { + self.frontiers.pop_front(); } } } @@ -97,23 +131,25 @@ impl CausalHistoryStore { state.lamport_clock += 1; let lamport = state.lamport_clock; let message_id = derive_message_id(conversation_id, sender, lamport, content); + let frontier = Frontier::new(sender.to_string(), message_id.clone()); let causal_history = state - .frontier + .frontiers .iter() - .cloned() - .map(|message_id| HistoryEntry { - message_id, + .map(|f| HistoryEntry { + message_id: f.message_id.clone(), + sender_id: f.sender_id.clone(), retrieval_hint: Bytes::new(), }) .collect(); // Our own message joins the seen-set so it appears in our future // causal history (and, later, so we can ack peers' references to it). - state.record_seen(message_id.clone()); + state.record_seen(frontier); ReliablePayload { message_id, + sender_id: sender.to_owned(), channel_id: conversation_id.to_owned(), lamport_timestamp: lamport, causal_history, @@ -140,18 +176,22 @@ impl CausalHistoryStore { let mut detected = Vec::new(); for entry in &payload.causal_history { - let id = &entry.message_id; - if !state.seen.contains(id) && state.reported_missing.insert(id.clone()) { + let frontier = Frontier::new(entry.sender_id.clone(), entry.message_id.clone()); + if !state.seen.contains(&frontier) && state.reported_missing.insert(frontier.clone()) { let m = MissingMessage { conversation_id: conversation_id.to_owned(), - message_id: id.clone(), + frontier, }; detected.push(m.clone()); missing.push(m); } } - state.record_seen(payload.message_id.clone()); + state.record_seen(Frontier::new( + payload.sender_id.clone(), + payload.message_id.clone(), + )); + detected } @@ -213,7 +253,7 @@ mod tests { fn detects_a_gap_when_a_referenced_message_was_never_seen() { let sender = CausalHistoryStore::new(); let m1 = payload(&sender, "c", "alice", b"first"); - let _m2 = payload(&sender, "c", "alice", b"second (dropped)"); + let m2 = payload(&sender, "c", "alice", b"second (dropped)"); let m3 = payload(&sender, "c", "alice", b"third"); let receiver = CausalHistoryStore::new(); @@ -222,7 +262,8 @@ mod tests { let missing = receiver.on_receive("c", &m3); assert_eq!(missing.len(), 1); - assert_eq!(missing[0].message_id, _m2.message_id); + assert_eq!(missing[0].frontier.message_id(), m2.message_id); + assert_eq!(missing[0].frontier.sender_id(), m2.sender_id); assert_eq!(missing[0].conversation_id, "c"); } @@ -238,10 +279,25 @@ mod tests { assert!(receiver.take_missing().is_empty()); } + #[test] + fn missing_message_carries_sender_id_of_the_original_author() { + let alice = CausalHistoryStore::new(); + let m1 = payload(&alice, "c", "alice", b"first"); + let _m2 = payload(&alice, "c", "alice", b"second (dropped)"); + let m3 = payload(&alice, "c", "alice", b"third"); + + let receiver = CausalHistoryStore::new(); + receiver.on_receive("c", &m1); + let missing = receiver.on_receive("c", &m3); + + assert_eq!(missing.len(), 1); + assert_eq!(missing[0].frontier.sender_id(), "alice"); + } + #[test] fn a_gap_is_reported_only_once() { let sender = CausalHistoryStore::new(); - let _m1 = payload(&sender, "c", "alice", b"a"); + let m1 = payload(&sender, "c", "alice", b"a"); let m2 = payload(&sender, "c", "alice", b"b"); let m3 = payload(&sender, "c", "alice", b"c"); @@ -252,7 +308,7 @@ mod tests { let missing = receiver.take_missing(); let m1_hits = missing .iter() - .filter(|m| m.message_id == _m1.message_id) + .filter(|m| m.frontier.message_id() == m1.message_id) .count(); assert_eq!(m1_hits, 1); } diff --git a/core/conversations/src/lib.rs b/core/conversations/src/lib.rs index 7bf1c82..f20de26 100644 --- a/core/conversations/src/lib.rs +++ b/core/conversations/src/lib.rs @@ -13,7 +13,7 @@ mod types; mod utils; pub use account::LogosAccount; -pub use causal_history::MissingMessage; +pub use causal_history::{Frontier, MissingMessage}; pub use chat_sqlite::ChatStorage; pub use chat_sqlite::StorageConfig; pub use context::{Context, ConversationId, Introduction}; diff --git a/core/integration_tests_core/tests/causal_history.rs b/core/integration_tests_core/tests/causal_history.rs index 69333cb..f648fe8 100644 --- a/core/integration_tests_core/tests/causal_history.rs +++ b/core/integration_tests_core/tests/causal_history.rs @@ -103,9 +103,14 @@ fn missing_group_message_is_detected() { assert_eq!(missing.len(), 1, "exactly one message should be missing"); assert_eq!(missing[0].conversation_id, convo_id); assert!( - !missing[0].message_id.is_empty(), + !missing[0].frontier.message_id().is_empty(), "the missing message must be identified" ); + assert_eq!( + missing[0].frontier.sender_id(), + saro.account_id().as_str(), + "missing-message sender hint should attribute to Saro" + ); // Draining clears the report; a resolved gap is not surfaced again. assert!(raya.take_missing_messages().is_empty()); diff --git a/flake.nix b/flake.nix index ed50c4a..b31738a 100644 --- a/flake.nix +++ b/flake.nix @@ -55,7 +55,7 @@ cargoLock = { lockFile = ./Cargo.lock; outputHashes = { - "chat-proto-0.1.0" = "sha256-aCl80VOIkd/GK3gnmRuFoSAvPBfeE/FKCaNlLt5AbUU="; + "chat-proto-0.1.0" = "sha256-hiFH/EwTpJd9RLtq1uF2CilzinedfR2o4jvqFaDhk+g="; }; };