diff --git a/core/conversations/src/conversation/group_v1.rs b/core/conversations/src/conversation/group_v1.rs index dbb87f7..19b77ba 100644 --- a/core/conversations/src/conversation/group_v1.rs +++ b/core/conversations/src/conversation/group_v1.rs @@ -9,12 +9,15 @@ use openmls::prelude::tls_codec::Deserialize; use openmls::prelude::*; use prost::Message as _; use shared_traits::IdentIdRef; +use std::collections::VecDeque; +use tracing::debug; use crate::account_directory::{AccountDirectory, resolve_device_ids}; use crate::conversation::ConversationIdRef; use crate::inbox_v2::MlsProvider; use crate::service_context::{ExternalServices, ServiceContext}; +use crate::utils::{blake2b_hex, hash_size}; use crate::{ DeliveryService, IdentityProvider, conversation::{ChatError, Convo, GroupConvo, Identified}, @@ -26,6 +29,8 @@ use crate::{ pub struct GroupV1Convo { mls_group: MlsGroup, convo_id: String, + // Cache outbound message Id's to filter out re-entrant messages + outbound_msgs: VecDeque, } impl std::fmt::Debug for GroupV1Convo { @@ -54,6 +59,7 @@ impl GroupV1Convo { Ok(Self { mls_group, convo_id, + outbound_msgs: VecDeque::new(), }) } @@ -76,6 +82,7 @@ impl GroupV1Convo { Ok(Self { mls_group, convo_id, + outbound_msgs: VecDeque::new(), }) } @@ -93,6 +100,7 @@ impl GroupV1Convo { Ok(GroupV1Convo { mls_group, convo_id, + outbound_msgs: VecDeque::new(), }) } @@ -100,8 +108,6 @@ impl GroupV1Convo { fn subscribe(ds: &mut impl DeliveryService, convo_id: &str) -> Result<(), ChatError> { ds.subscribe(&Self::delivery_address_from_id(convo_id)) .map_err(ChatError::generic)?; - ds.subscribe(&Self::ctrl_delivery_address_from_id(convo_id)) - .map_err(ChatError::generic)?; Ok(()) } @@ -129,18 +135,6 @@ impl GroupV1Convo { Self::delivery_address_from_id(&self.convo_id) } - fn ctrl_delivery_address_from_id(convo_id: &str) -> String { - let hash = Blake2b::::new() - .chain_update("ctrl_delivery_addr|") - .chain_update(convo_id) - .finalize(); - hex::encode(hash) - } - - fn ctrl_delivery_address(&self) -> String { - Self::ctrl_delivery_address_from_id(&self.convo_id) - } - /// Resolve an account to a KeyPackage for *every* device it authorizes. /// /// First resolves the account to its device ids through the account @@ -178,8 +172,8 @@ impl GroupV1Convo { fn send_message( &mut self, content: &[u8], - cx: &ServiceContext, - ) -> Result, ChatError> { + cx: &mut ServiceContext, + ) -> Result<(), ChatError> { let sender_id = cx.mls_identity.id().as_str(); let reliable = cx.causal.on_send(&self.convo_id, sender_id, content); let wire = reliable.encode_to_vec(); @@ -189,16 +183,35 @@ impl GroupV1Convo { .create_message(&cx.mls_provider, &cx.mls_identity, &wire) .unwrap(); - let a = AddressedEncryptedPayload { + let msg_bytes = mls_message_out.to_bytes().unwrap(); + self.send_payload(cx, msg_bytes) + } + + // Publish outboubound payloads to the DeliveryService + fn send_payload( + &mut self, + cx: &mut ServiceContext, + msg_bytes: Vec, + ) -> Result<(), ChatError> { + // Hash and Cache to detect inbound messages + let msg_hash = blake2b_hex::(&[&msg_bytes]); + self.outbound_msgs.push_back(msg_hash); + + // Wrap in Payload frames + let aep = AddressedEncryptedPayload { delivery_address: self.delivery_address(), data: EncryptedPayload { encryption: Some(encrypted_payload::Encryption::Plaintext(Plaintext { - payload: mls_message_out.to_bytes().unwrap().into(), + payload: msg_bytes.into(), })), }, }; + let env = aep.into_envelope(self.convo_id.clone()); - Ok(vec![a]) + // Send via DS + cx.ds + .publish(env) + .map_err(|e| ChatError::Generic(format!("Publish: {e}"))) } } @@ -214,13 +227,7 @@ impl Convo for GroupV1Convo { cx: &mut ServiceContext, content: &[u8], ) -> Result<(), ChatError> { - let payloads = self.send_message(content, cx)?; - for payload in payloads { - cx.ds - .publish(payload.into_envelope(self.id().into())) - .map_err(|e| ChatError::Delivery(e.to_string()))?; - } - Ok(()) + self.send_message(content, cx) } fn handle_frame( @@ -238,7 +245,14 @@ impl Convo for GroupV1Convo { } }; - let mls_message = + // Bail early if we sent this message + let msg_hash = blake2b_hex::(&[bytes.as_ref()]); + if self.outbound_msgs.contains(&msg_hash) { + debug!("Dropping message, sent from self"); + return Ok(ConvoOutcome::empty(self.convo_id.to_string())); + } + + let mls_message: MlsMessageIn = MlsMessageIn::tls_deserialize_exact_bytes(&bytes).map_err(ChatError::generic)?; let protocol_message: ProtocolMessage = mls_message @@ -332,22 +346,6 @@ impl GroupConvo for GroupV1Convo { .invite_user(&mut cx.ds, account_id, &welcome)?; } - let encrypted_payload = EncryptedPayload { - encryption: Some(encrypted_payload::Encryption::Plaintext(Plaintext { - payload: commit.to_bytes()?.into(), - })), - }; - - let addr_enc_payload = AddressedEncryptedPayload { - delivery_address: self.ctrl_delivery_address(), - data: encrypted_payload, - }; - // Prepare commit message - // TODO: (P1) Make GroupConvos agnostic to framing so its less error prone and more - let env = addr_enc_payload.into_envelope(self.convo_id.clone()); - - cx.ds - .publish(env) - .map_err(|e| ChatError::Generic(format!("Publish: {e}"))) + self.send_payload(cx, commit.to_bytes()?) } } diff --git a/crates/client/tests/saro_and_raya.rs b/crates/client/tests/saro_and_raya.rs index c8a83c5..b9dfa87 100644 --- a/crates/client/tests/saro_and_raya.rs +++ b/crates/client/tests/saro_and_raya.rs @@ -139,16 +139,19 @@ fn saro_raya_message_exchange() { let (mut raya, raya_events) = create_test_client(bus.clone(), reg_service.clone()).expect("client create"); - let raya_bundle = raya.create_intro_bundle().unwrap(); let saro_convo_id = saro - .create_conversation(&raya_bundle, b"hello raya") - .unwrap(); + .create_direct_conversation(raya.addr()) + .expect("convo create"); - // The invite payload yields ConversationStarted then MessageReceived. + // Wait for raya to process the Welcome and subscribe to the convo delivery + // address before saro sends — MessageBus only fans out to current subscribers, + // so a message sent before raya subscribes would be silently dropped. let raya_convo_id = expect_event(&raya_events, "ConversationStarted", |e| match e { Event::ConversationStarted { convo_id, .. } => Ok(convo_id), other => Err(other), }); + + saro.send_message(&saro_convo_id, b"hello raya").unwrap(); expect_event(&raya_events, "MessageReceived", |e| match e { Event::MessageReceived { convo_id, content } => { assert_eq!(convo_id, raya_convo_id);