This commit is contained in:
Jazz Turner-Baggs 2026-06-22 12:15:09 -07:00
parent e055b16761
commit e1f13d94be
No known key found for this signature in database
2 changed files with 49 additions and 48 deletions

View File

@ -9,12 +9,15 @@ use openmls::prelude::tls_codec::Deserialize;
use openmls::prelude::*;
use prost::Message as _;
use shared_traits::IdentIdRef;
use std::collections::VecDeque;
use tracing::debug;
use crate::account_directory::{AccountDirectory, resolve_device_ids};
use crate::conversation::ConversationIdRef;
use crate::inbox_v2::MlsProvider;
use crate::service_context::{ExternalServices, ServiceContext};
use crate::utils::{blake2b_hex, hash_size};
use crate::{
DeliveryService, IdentityProvider,
conversation::{ChatError, Convo, GroupConvo, Identified},
@ -26,6 +29,8 @@ use crate::{
pub struct GroupV1Convo {
mls_group: MlsGroup,
convo_id: String,
// Cache outbound message Id's to filter out re-entrant messages
outbound_msgs: VecDeque<String>,
}
impl std::fmt::Debug for GroupV1Convo {
@ -54,6 +59,7 @@ impl GroupV1Convo {
Ok(Self {
mls_group,
convo_id,
outbound_msgs: VecDeque::new(),
})
}
@ -76,6 +82,7 @@ impl GroupV1Convo {
Ok(Self {
mls_group,
convo_id,
outbound_msgs: VecDeque::new(),
})
}
@ -93,6 +100,7 @@ impl GroupV1Convo {
Ok(GroupV1Convo {
mls_group,
convo_id,
outbound_msgs: VecDeque::new(),
})
}
@ -100,8 +108,6 @@ impl GroupV1Convo {
fn subscribe(ds: &mut impl DeliveryService, convo_id: &str) -> Result<(), ChatError> {
ds.subscribe(&Self::delivery_address_from_id(convo_id))
.map_err(ChatError::generic)?;
ds.subscribe(&Self::ctrl_delivery_address_from_id(convo_id))
.map_err(ChatError::generic)?;
Ok(())
}
@ -129,18 +135,6 @@ impl GroupV1Convo {
Self::delivery_address_from_id(&self.convo_id)
}
fn ctrl_delivery_address_from_id(convo_id: &str) -> String {
let hash = Blake2b::<U6>::new()
.chain_update("ctrl_delivery_addr|")
.chain_update(convo_id)
.finalize();
hex::encode(hash)
}
fn ctrl_delivery_address(&self) -> String {
Self::ctrl_delivery_address_from_id(&self.convo_id)
}
/// Resolve an account to a KeyPackage for *every* device it authorizes.
///
/// First resolves the account to its device ids through the account
@ -178,8 +172,8 @@ impl GroupV1Convo {
fn send_message<S: ExternalServices>(
&mut self,
content: &[u8],
cx: &ServiceContext<S>,
) -> Result<Vec<AddressedEncryptedPayload>, ChatError> {
cx: &mut ServiceContext<S>,
) -> Result<(), ChatError> {
let sender_id = cx.mls_identity.id().as_str();
let reliable = cx.causal.on_send(&self.convo_id, sender_id, content);
let wire = reliable.encode_to_vec();
@ -189,16 +183,35 @@ impl GroupV1Convo {
.create_message(&cx.mls_provider, &cx.mls_identity, &wire)
.unwrap();
let a = AddressedEncryptedPayload {
let msg_bytes = mls_message_out.to_bytes().unwrap();
self.send_payload(cx, msg_bytes)
}
// Publish outboubound payloads to the DeliveryService
fn send_payload<S: ExternalServices>(
&mut self,
cx: &mut ServiceContext<S>,
msg_bytes: Vec<u8>,
) -> Result<(), ChatError> {
// Hash and Cache to detect inbound messages
let msg_hash = blake2b_hex::<hash_size::MessageId>(&[&msg_bytes]);
self.outbound_msgs.push_back(msg_hash);
// Wrap in Payload frames
let aep = AddressedEncryptedPayload {
delivery_address: self.delivery_address(),
data: EncryptedPayload {
encryption: Some(encrypted_payload::Encryption::Plaintext(Plaintext {
payload: mls_message_out.to_bytes().unwrap().into(),
payload: msg_bytes.into(),
})),
},
};
let env = aep.into_envelope(self.convo_id.clone());
Ok(vec![a])
// Send via DS
cx.ds
.publish(env)
.map_err(|e| ChatError::Generic(format!("Publish: {e}")))
}
}
@ -214,13 +227,7 @@ impl<S: ExternalServices> Convo<S> for GroupV1Convo {
cx: &mut ServiceContext<S>,
content: &[u8],
) -> Result<(), ChatError> {
let payloads = self.send_message(content, cx)?;
for payload in payloads {
cx.ds
.publish(payload.into_envelope(self.id().into()))
.map_err(|e| ChatError::Delivery(e.to_string()))?;
}
Ok(())
self.send_message(content, cx)
}
fn handle_frame(
@ -238,7 +245,14 @@ impl<S: ExternalServices> Convo<S> for GroupV1Convo {
}
};
let mls_message =
// Bail early if we sent this message
let msg_hash = blake2b_hex::<hash_size::MessageId>(&[bytes.as_ref()]);
if self.outbound_msgs.contains(&msg_hash) {
debug!("Dropping message, sent from self");
return Ok(ConvoOutcome::empty(self.convo_id.to_string()));
}
let mls_message: MlsMessageIn =
MlsMessageIn::tls_deserialize_exact_bytes(&bytes).map_err(ChatError::generic)?;
let protocol_message: ProtocolMessage = mls_message
@ -332,22 +346,6 @@ impl<S: ExternalServices> GroupConvo<S> for GroupV1Convo {
.invite_user(&mut cx.ds, account_id, &welcome)?;
}
let encrypted_payload = EncryptedPayload {
encryption: Some(encrypted_payload::Encryption::Plaintext(Plaintext {
payload: commit.to_bytes()?.into(),
})),
};
let addr_enc_payload = AddressedEncryptedPayload {
delivery_address: self.ctrl_delivery_address(),
data: encrypted_payload,
};
// Prepare commit message
// TODO: (P1) Make GroupConvos agnostic to framing so its less error prone and more
let env = addr_enc_payload.into_envelope(self.convo_id.clone());
cx.ds
.publish(env)
.map_err(|e| ChatError::Generic(format!("Publish: {e}")))
self.send_payload(cx, commit.to_bytes()?)
}
}

View File

@ -139,16 +139,19 @@ fn saro_raya_message_exchange() {
let (mut raya, raya_events) =
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
let raya_bundle = raya.create_intro_bundle().unwrap();
let saro_convo_id = saro
.create_conversation(&raya_bundle, b"hello raya")
.unwrap();
.create_direct_conversation(raya.addr())
.expect("convo create");
// The invite payload yields ConversationStarted then MessageReceived.
// Wait for raya to process the Welcome and subscribe to the convo delivery
// address before saro sends — MessageBus only fans out to current subscribers,
// so a message sent before raya subscribes would be silently dropped.
let raya_convo_id = expect_event(&raya_events, "ConversationStarted", |e| match e {
Event::ConversationStarted { convo_id, .. } => Ok(convo_id),
other => Err(other),
});
saro.send_message(&saro_convo_id, b"hello raya").unwrap();
expect_event(&raya_events, "MessageReceived", |e| match e {
Event::MessageReceived { convo_id, content } => {
assert_eq!(convo_id, raya_convo_id);