Include sender information on missing messages (#120)

* feat: prefix sender id

* chore: add message struct for sender info

* chore: refactor struct name for frontier

* chore: reuse duplicate test

* chore: fix clippy

* feat: use sender_id in wire

* chore: remove result

* chore: fix nix build

* chore: bump chat_proto version
This commit is contained in:
kaichao 2026-06-02 21:55:19 +08:00 committed by GitHub
parent 2d3ad27d51
commit 4df23aad63
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 88 additions and 27 deletions

2
Cargo.lock generated
View File

@ -291,7 +291,7 @@ dependencies = [
[[package]]
name = "chat-proto"
version = "0.1.0"
source = "git+https://github.com/logos-messaging/chat_proto#44d5360c41d721a011d20ee69a75a85357b33b0e"
source = "git+https://github.com/logos-messaging/chat_proto?rev=37ec98a151f6d50aab2905802ac0a896477e62ea#37ec98a151f6d50aab2905802ac0a896477e62ea"
dependencies = [
"prost",
]

View File

@ -15,7 +15,7 @@ storage = { workspace = true }
# External dependencies (sorted)
base64 = "0.22"
chat-proto = { git = "https://github.com/logos-messaging/chat_proto" }
chat-proto = { git = "https://github.com/logos-messaging/chat_proto", rev = "37ec98a151f6d50aab2905802ac0a896477e62ea" }
double-ratchets = { path = "../double-ratchets" }
hex = "0.4.3"
openmls = { version = "0.8.1", features = ["libcrux-provider"] }

View File

@ -25,6 +25,40 @@ use std::rc::Rc;
use crate::proto::{Bytes, HistoryEntry, ReliablePayload};
use crate::utils::{blake2b_hex, hash_size};
/// Frontier includes the message's metadata which can be referened by other
/// messages inside a conversation.
///
/// Carries the sender's `account_id` alongside a deterministic
/// content/Lamport hash, so receivers can attribute referenced-but-unseen
/// IDs to a peer without consulting local state. The sender component is a
/// **routing hint, not authoritative**: when a missing message is recovered,
/// authorship is verified against the MLS leaf credential.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Frontier {
sender_id: String,
message_id: String,
}
impl Frontier {
/// Construct a fresh `Frontier` for an outbound message.
pub fn new(sender_id: String, message_id: String) -> Self {
Self {
sender_id,
message_id,
}
}
/// Sender's `account_id`, verbatim. Treat as a routing hint only.
pub fn sender_id(&self) -> &str {
&self.sender_id
}
/// Deterministic hash of `(channel, sender, lamport, content)`.
pub fn message_id(&self) -> &str {
&self.message_id
}
}
/// Number of most-recently-seen message IDs attached to each outbound message.
const CAUSAL_HISTORY_LEN: usize = 10;
@ -36,7 +70,7 @@ const CAUSAL_HISTORY_LEN: usize = 10;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MissingMessage {
pub conversation_id: String,
pub message_id: String,
pub frontier: Frontier,
}
/// Per-conversation causal state.
@ -45,20 +79,20 @@ struct ConvoState {
/// Lamport logical clock.
lamport_clock: i32,
/// Every message ID delivered locally (own sends + received).
seen: HashSet<String>,
seen: HashSet<Frontier>,
/// Bounded frontier of recently-seen IDs (oldest first) attached to
/// outbound messages as causal history.
frontier: VecDeque<String>,
frontiers: VecDeque<Frontier>,
/// Missing IDs already reported, so a gap is surfaced exactly once.
reported_missing: HashSet<String>,
reported_missing: HashSet<Frontier>,
}
impl ConvoState {
fn record_seen(&mut self, id: String) {
if self.seen.insert(id.clone()) {
self.frontier.push_back(id);
while self.frontier.len() > CAUSAL_HISTORY_LEN {
self.frontier.pop_front();
fn record_seen(&mut self, info: Frontier) {
if self.seen.insert(info.clone()) {
self.frontiers.push_back(info);
while self.frontiers.len() > CAUSAL_HISTORY_LEN {
self.frontiers.pop_front();
}
}
}
@ -97,23 +131,25 @@ impl CausalHistoryStore {
state.lamport_clock += 1;
let lamport = state.lamport_clock;
let message_id = derive_message_id(conversation_id, sender, lamport, content);
let frontier = Frontier::new(sender.to_string(), message_id.clone());
let causal_history = state
.frontier
.frontiers
.iter()
.cloned()
.map(|message_id| HistoryEntry {
message_id,
.map(|f| HistoryEntry {
message_id: f.message_id.clone(),
sender_id: f.sender_id.clone(),
retrieval_hint: Bytes::new(),
})
.collect();
// Our own message joins the seen-set so it appears in our future
// causal history (and, later, so we can ack peers' references to it).
state.record_seen(message_id.clone());
state.record_seen(frontier);
ReliablePayload {
message_id,
sender_id: sender.to_owned(),
channel_id: conversation_id.to_owned(),
lamport_timestamp: lamport,
causal_history,
@ -140,18 +176,22 @@ impl CausalHistoryStore {
let mut detected = Vec::new();
for entry in &payload.causal_history {
let id = &entry.message_id;
if !state.seen.contains(id) && state.reported_missing.insert(id.clone()) {
let frontier = Frontier::new(entry.sender_id.clone(), entry.message_id.clone());
if !state.seen.contains(&frontier) && state.reported_missing.insert(frontier.clone()) {
let m = MissingMessage {
conversation_id: conversation_id.to_owned(),
message_id: id.clone(),
frontier,
};
detected.push(m.clone());
missing.push(m);
}
}
state.record_seen(payload.message_id.clone());
state.record_seen(Frontier::new(
payload.sender_id.clone(),
payload.message_id.clone(),
));
detected
}
@ -213,7 +253,7 @@ mod tests {
fn detects_a_gap_when_a_referenced_message_was_never_seen() {
let sender = CausalHistoryStore::new();
let m1 = payload(&sender, "c", "alice", b"first");
let _m2 = payload(&sender, "c", "alice", b"second (dropped)");
let m2 = payload(&sender, "c", "alice", b"second (dropped)");
let m3 = payload(&sender, "c", "alice", b"third");
let receiver = CausalHistoryStore::new();
@ -222,7 +262,8 @@ mod tests {
let missing = receiver.on_receive("c", &m3);
assert_eq!(missing.len(), 1);
assert_eq!(missing[0].message_id, _m2.message_id);
assert_eq!(missing[0].frontier.message_id(), m2.message_id);
assert_eq!(missing[0].frontier.sender_id(), m2.sender_id);
assert_eq!(missing[0].conversation_id, "c");
}
@ -238,10 +279,25 @@ mod tests {
assert!(receiver.take_missing().is_empty());
}
#[test]
fn missing_message_carries_sender_id_of_the_original_author() {
let alice = CausalHistoryStore::new();
let m1 = payload(&alice, "c", "alice", b"first");
let _m2 = payload(&alice, "c", "alice", b"second (dropped)");
let m3 = payload(&alice, "c", "alice", b"third");
let receiver = CausalHistoryStore::new();
receiver.on_receive("c", &m1);
let missing = receiver.on_receive("c", &m3);
assert_eq!(missing.len(), 1);
assert_eq!(missing[0].frontier.sender_id(), "alice");
}
#[test]
fn a_gap_is_reported_only_once() {
let sender = CausalHistoryStore::new();
let _m1 = payload(&sender, "c", "alice", b"a");
let m1 = payload(&sender, "c", "alice", b"a");
let m2 = payload(&sender, "c", "alice", b"b");
let m3 = payload(&sender, "c", "alice", b"c");
@ -252,7 +308,7 @@ mod tests {
let missing = receiver.take_missing();
let m1_hits = missing
.iter()
.filter(|m| m.message_id == _m1.message_id)
.filter(|m| m.frontier.message_id() == m1.message_id)
.count();
assert_eq!(m1_hits, 1);
}

View File

@ -13,7 +13,7 @@ mod types;
mod utils;
pub use account::LogosAccount;
pub use causal_history::MissingMessage;
pub use causal_history::{Frontier, MissingMessage};
pub use chat_sqlite::ChatStorage;
pub use chat_sqlite::StorageConfig;
pub use context::{Context, ConversationId, Introduction};

View File

@ -103,9 +103,14 @@ fn missing_group_message_is_detected() {
assert_eq!(missing.len(), 1, "exactly one message should be missing");
assert_eq!(missing[0].conversation_id, convo_id);
assert!(
!missing[0].message_id.is_empty(),
!missing[0].frontier.message_id().is_empty(),
"the missing message must be identified"
);
assert_eq!(
missing[0].frontier.sender_id(),
saro.account_id().as_str(),
"missing-message sender hint should attribute to Saro"
);
// Draining clears the report; a resolved gap is not surfaced again.
assert!(raya.take_missing_messages().is_empty());

View File

@ -55,7 +55,7 @@
cargoLock = {
lockFile = ./Cargo.lock;
outputHashes = {
"chat-proto-0.1.0" = "sha256-aCl80VOIkd/GK3gnmRuFoSAvPBfeE/FKCaNlLt5AbUU=";
"chat-proto-0.1.0" = "sha256-hiFH/EwTpJd9RLtq1uF2CilzinedfR2o4jvqFaDhk+g=";
};
};