From 0bfdf70a7cf2f86974d817118b971f1f59f01bb1 Mon Sep 17 00:00:00 2001 From: seemenkina Date: Mon, 1 Jun 2026 18:23:11 +0700 Subject: [PATCH] Refactor de-mls integration: enhance InboxV2 for GroupV2 handling, and improve BufferDs for welcome event processing. --- Cargo.lock | 2 +- core/core_client/src/conversation/group_v2.rs | 198 ++++++++++++------ core/core_client/src/core_client.rs | 1 - core/core_client/src/inbox_v2.rs | 70 ++++++- .../integration_tests_core/tests/dev_tests.rs | 52 +++-- 5 files changed, 234 insertions(+), 89 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0556985..5c70c00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1881,7 +1881,7 @@ dependencies = [ [[package]] name = "de-mls" version = "3.0.0" -source = "git+https://github.com/vacp2p/de-mls?branch=main#deafb714d2bfc0c98af3f661161b92f5d621c306" +source = "git+https://github.com/vacp2p/de-mls?branch=main#b59183e1d92fdd08b99bb5e3ba1389ca9b60d68a" dependencies = [ "hashgraph-like-consensus", "indexmap 2.14.0", diff --git a/core/core_client/src/conversation/group_v2.rs b/core/core_client/src/conversation/group_v2.rs index 13b8720..3fe3e7a 100644 --- a/core/core_client/src/conversation/group_v2.rs +++ b/core/core_client/src/conversation/group_v2.rs @@ -33,6 +33,7 @@ use libchat::WakeupService; use prost::Message; use rand::{self, Rng}; use std::sync::{Arc, Mutex}; +use std::time::Duration; use tracing::info; use crate::AccountId; @@ -42,6 +43,7 @@ use crate::{ AddressedEncryptedPayload, ContentData, DeliveryService, RegistrationService, conversation::{BaseConvo, BaseGroupConvo, ChatError, Id}, }; +use libchat::IdentityProvider; /// This is a Test Wrapper of Demls MemberId Trait /// Libchat has its own trait that will need to be intergrated at somepoint. @@ -73,29 +75,11 @@ impl MemberId for LocalDemlsMember { // All methods in Convo must call drain, to ensure that messages go out. pub struct BufferDs { queue: Vec, - welcomes: Vec, } impl BufferDs { pub fn new() -> Self { - Self { - queue: vec![], - welcomes: vec![], - } - } - - /// Lift welcomes out of session events into the welcome queue. - /// Other event variants are ignored — they're not "things to send." - fn retrive_welcome_event(&mut self, events: &[SessionEvent]) { - for evt in events { - info!(event = format!("{:?}", evt), "Event Loop"); - - if let SessionEvent::WelcomeReady(w) = evt { - self.welcomes.push(w.clone()); - } - - dbg!(&self.welcomes); - } + Self { queue: vec![] } } // Warn: Messages are not sent untill drain is called, which is after the return from User. @@ -122,6 +106,7 @@ impl BufferDs { // All Payloads leaving GroupV2 are a GroupV2Frame let frame = GroupV2Frame { payload: Some(GroupV2Payload::DeMlsWrapper(pkt.payload.into())), + sender_app_id: pkt.app_id.clone(), // pkt.app_id is the sender's User app_id }; // Wrap in EncryptedPayload @@ -135,26 +120,11 @@ impl BufferDs { }, }; - // TODO(libchat: "Verify payloads routing"): - // GroupV2Frame currently drops sender app_id. de-mls's - // process_inbound_packet uses app_id to filter self-messages — without - // round-tripping the sender's id through the frame, every inbound packet - // looks like a self-echo and gets dropped. - let env = payload.into_envelope(pkt.conversation_id.clone()); service_ctx.ds.publish(env).map_err(ChatError::generic)?; } - // TODO: build proper convertion ao welcome bundle - // for w in self.welcomes.drain(..) { - // let envelope = build_inbox_welcome_envelope(w); - // service_ctx - // .ds - // .publish(envelope) - // .map_err(ChatError::generic)?; - // } - Ok(()) } } @@ -180,6 +150,11 @@ pub struct GroupV2Convo { // Use a wrapper for now, and then look at refactoring. buffer_ds: Arc>, app_id: String, + /// Inviter side: accounts we called `add_member` for, awaiting their + /// `WelcomeReady`. Each `WelcomeReady` is routed to one popped entry — see + /// the correlation caveat in `after_op` (only safe for one outstanding + /// invite today). + pending_invites: Vec, } impl std::fmt::Debug for GroupV2Convo { @@ -199,36 +174,60 @@ fn rand_string(n: usize) -> String { } impl GroupV2Convo { - pub fn new( - service_ctx: &mut ServiceContext, - ) -> Result { - // Create new instances of all the dependencies that User needs. - // Once working, these can be moved to be shared across different convo instances. - let convo_id = rand_string(5); - let signer = PrivateKeySigner::random(); - // let identity = WalletIdentity::from_wallet(signer.address()); - let identity = LocalDemlsMember::new(signer.address().to_string()); - + /// Build a de-mls `User` (plugins + BufferDs transport) without starting + /// any conversation. Shared by `new` (creator) and `new_pending` (joiner). + fn build_demls( + identity_name: String, + ) -> Result< + ( + User, + Arc>, + String, + ), + ChatError, + > { + let identity = LocalDemlsMember::new(identity_name); let credentials = Arc::new(MlsCredentials::from_member_id(&identity).map_err(ChatError::generic)?); let storage = Arc::new(MemoryDeMlsStorage::new()); let conversation_plugins = DefaultConversationPluginsFactory::new(storage, credentials); - let consensus_signer = EthereumConsensusSigner::new(signer); + let consensus_signer = EthereumConsensusSigner::new(PrivateKeySigner::random()); let consensus = ConsensusContext::::new(consensus_signer); + // TODO(config): TEST-ONLY millisecond timers. de-mls deadlines are real + // wall-clock, so the default 60s timers never fire under fast virtual + // time. Production needs a real config injected from the caller, not + // these hardcoded values. + let conversation_config = ConversationConfig { + commit_inactivity_duration: Duration::from_millis(50), + freeze_duration: Duration::from_millis(20), + voting_delay: Duration::from_millis(30), + election_voting_delay: Duration::from_millis(30), + consensus_timeout: Duration::from_millis(150), + proposal_expiration: Duration::from_millis(2000), + ..ConversationConfig::default() + }; + let plugins = UserPlugins { conversation_plugins, consensus, - default_conversation_config: ConversationConfig::default(), + default_conversation_config: conversation_config, default_scoring_config: ScoringConfig::default(), default_steward_list_config: StewardListConfig::default(), }; - let ds = BufferDs::new(); - let transport = Arc::new(Mutex::new(ds)); + let transport = Arc::new(Mutex::new(BufferDs::new())); + let user = User::new_with_plugins(Box::new(identity), plugins, transport.clone()); + Ok((user, transport, rand_string(5))) + } - let mut user = User::new_with_plugins(Box::new(identity), plugins, transport.clone()); + pub fn new( + service_ctx: &mut ServiceContext, + ) -> Result { + let convo_id = rand_string(5); + let identity_name = service_ctx.identity_provider.friendly_name(); + let (mut user, transport, app_id) = Self::build_demls(identity_name)?; run_async!( user.start_conversation(convo_id.as_str(), true) @@ -243,10 +242,66 @@ impl GroupV2Convo { convo_id, user, buffer_ds: transport, - app_id: rand_string(5), + app_id, + pending_invites: vec![], }) } + /// Joiner side: build a de-mls `User` and register its key package under + /// the account name, but do NOT start a conversation. `convo_id` stays + /// empty until [`Self::accept_welcome`] fills it. + pub fn new_pending( + service_ctx: &mut ServiceContext, + ) -> Result { + let name = service_ctx.identity_provider.friendly_name(); + let (user, transport, app_id) = Self::build_demls(name.clone())?; + + let kp = user.generate_key_package().map_err(ChatError::generic)?; + service_ctx + .rs + .register(&name, kp.as_bytes().to_vec()) + .map_err(ChatError::generic)?; + + Ok(Self { + convo_id: String::new(), + user, + buffer_ds: transport, + app_id, + pending_invites: vec![], + }) + } + + /// Joiner side: ingest a de-mls welcome handed over the InboxV2 1-1 + /// channel. Attaches MLS (filling `convo_id`), replays the bundled + /// `ConversationSync`, then subscribes to the conversation address. + pub fn accept_welcome( + &mut self, + service_ctx: &mut ServiceContext, + welcome: &MemberWelcome, + ) -> Result<(), ChatError> { + let (convo_id, tick) = run_async!(self.user.accept_welcome(&welcome.welcome_bytes).await) + .map_err(ChatError::generic)?; + self.convo_id = convo_id; + + if !welcome.conversation_sync_bytes.is_empty() { + let pkt = InboundPacket::new( + welcome.conversation_sync_bytes.clone(), + APP_MSG_SUBTOPIC, + &self.convo_id, + self.user.app_id().to_vec(), + 0, + ); + run_async!(self.user.process_inbound_packet(pkt).await).map_err(ChatError::generic)?; + } + + let events = self + .user + .drain_events(&self.convo_id) + .map_err(ChatError::generic)?; + self.init(service_ctx)?; + self.after_op(service_ctx, tick, &events) + } + fn delivery_address_from_id(convo_id: &str) -> String { let hash = Blake2b::::new() .chain_update("delivery_addr|") @@ -330,13 +385,18 @@ where return Err(ChatError::generic("Expected plaintext")); } }; + let frame = GroupV2Frame::decode(bytes.as_ref()).map_err(ChatError::generic)?; + let inner = match frame.payload { + Some(GroupV2Payload::DeMlsWrapper(b)) => b.to_vec(), + _ => return Ok(None), + }; // Fake a InboundPacket let packet = InboundPacket { - payload: bytes.to_vec(), + payload: inner, subtopic: APP_MSG_SUBTOPIC.to_string(), // Assume APP TOPIC, Welcome Messages go to InboxV2 conversation_id: self.convo_id.to_string(), - app_id: self.app_id().as_bytes().to_vec(), + app_id: frame.sender_app_id, timestamp: 0, }; @@ -381,9 +441,9 @@ where .ok_or_else(|| ChatError::generic("No key package"))?; last_tick = run_async!(self.user.add_member(&self.convo_id, &kp_bytes).await) .map_err(ChatError::generic)?; - // TODO(libchat: "Parse welcomes and create GroupV2"): - // remember `member` so we can route the eventual WelcomeReady - // event to its delivery_address. Needs API decision with libchat. + // Remember who we invited so after_op can route their + // WelcomeReady to their InboxV2 channel (FIFO). + self.pending_invites.push((*member).clone()); } let events = self .user @@ -395,16 +455,30 @@ where impl GroupV2Convo { fn after_op( - &self, + &mut self, service_ctx: &mut ServiceContext, tick: SessionTick, events: &[SessionEvent], ) -> Result<(), ChatError> { - let mut buf = self.buffer_ds.lock().unwrap(); - buf.retrive_welcome_event(events); - buf.drain(service_ctx)?; - drop(buf); + // Route any welcome our commit produced to the matching invitee over + // their InboxV2 1-1 channel. + // + // TODO(chat): welcome→invitee routing is positional, so only safe for + // one outstanding invite. `MemberWelcome` doesn't identify its joiner; + // batch invites need a real welcome→account match. + for evt in events { + if let SessionEvent::WelcomeReady(welcome) = evt + && let Some(account) = self.pending_invites.pop() + { + crate::inbox_v2::invite_user_v2(&mut service_ctx.ds, &account, welcome)?; + } + } + + self.buffer_ds.lock().unwrap().drain(service_ctx)?; if let Some(d) = tick.next_wakeup_in { + // TODO(chat): WakeupService is second-granularity but de-mls + // deadlines are sub-second; `as_secs().max(1)` floors them up to 1s, + // silently over-waiting. Needs a millisecond-capable wakeup. service_ctx .wakeup_service .wakeup_in(d.as_secs().max(1) as u32, &self.convo_id); @@ -440,8 +514,10 @@ use prost::{Oneof, bytes::Bytes}; #[derive(Clone, PartialEq, Message)] pub struct GroupV2Frame { - #[prost(oneof = "GroupV2Payload", tags = "1")] + #[prost(oneof = "GroupV2Payload", tags = "2, 3")] pub payload: Option, + #[prost(bytes = "vec", tag = "4")] + pub sender_app_id: Vec, } #[derive(Clone, PartialEq, Oneof)] diff --git a/core/core_client/src/core_client.rs b/core/core_client/src/core_client.rs index 5c3c8ec..d8c1891 100644 --- a/core/core_client/src/core_client.rs +++ b/core/core_client/src/core_client.rs @@ -284,7 +284,6 @@ where // Dispatch encrypted payload to Inbox, and register the created Conversation fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result, ChatError> { if let Some(convo) = self.pq_inbox.handle_frame(&mut self.service_ctx, payload)? { - let convo: Box> = Box::new(convo); self.register_convo(ConvoTypeOwned::Group(convo))?; } Ok(None) diff --git a/core/core_client/src/inbox_v2.rs b/core/core_client/src/inbox_v2.rs index 8515d1f..b8a081a 100644 --- a/core/core_client/src/inbox_v2.rs +++ b/core/core_client/src/inbox_v2.rs @@ -3,6 +3,7 @@ use std::ops::Deref; use std::rc::Rc; use chat_proto::logoschat::envelope::EnvelopeV1; +use de_mls::protos::de_mls::messages::v1::MemberWelcome; use openmls::prelude::tls_codec::Serialize; use openmls::prelude::*; use openmls_libcrux_crypto::CryptoProvider as LibcruxCryptoProvider; @@ -20,6 +21,7 @@ use crate::DeliveryService; use crate::IdentityProvider; use crate::RegistrationService; use crate::conversation::BaseConvo; +use crate::conversation::BaseGroupConvo; use crate::conversation::ExternalServices; use crate::conversation::GroupV2Convo; use crate::conversation::ServiceContext; @@ -94,6 +96,28 @@ pub trait MlsProvider: OpenMlsProvider { ) -> Result<(), ChatError>; } +/// Deliver a de-mls welcome to `account_id` over its InboxV2 1-1 channel. +/// Function mirroring the GroupV1 `invite_user` path, but carrying a de-mls `MemberWelcome`. +pub fn invite_user_v2( + ds: &mut DS, + account_id: &AccountId, + welcome: &MemberWelcome, +) -> Result<(), ChatError> { + let frame = InboxV2Frame { + payload: Some(InviteType::GroupV2(welcome.encode_to_vec())), + }; + let envelope = EnvelopeV1 { + conversation_hint: conversation_id_for(account_id), + salt: 0, + payload: frame.encode_to_vec().into(), + }; + ds.publish(AddressedEnvelope { + delivery_address: delivery_address_for(account_id), + data: envelope.encode_to_vec(), + }) + .map_err(ChatError::generic) +} + /// This is a PQ based provider that uses in memory storage. pub struct MlsEphemeralPqProvider { crypto: LibcruxCryptoProvider, @@ -165,6 +189,7 @@ pub struct InboxV2 { account_id: AccountId, _store: Rc>, mls_provider: Rc>, + pending_demls: RefCell>, } impl InboxV2 { @@ -179,6 +204,7 @@ impl InboxV2 { account_id, _store, mls_provider: Rc::new(RefCell::new(provider)), + pending_demls: RefCell::new(None), } } @@ -212,7 +238,15 @@ impl InboxV2 { &service_ctx.identity_provider.friendly_name(), keypackage_bytes, ) - .map_err(ChatError::generic) + .map_err(ChatError::generic)?; + + // de-mls (GroupV2) joiner: build a conversation-less User and register + // its de-mls key package under the same account name. This shadows the + // OpenMLS key package above in the registry; GroupV2 is the path the + // de-mls integration exercises. + *self.pending_demls.borrow_mut() = Some(GroupV2Convo::new_pending(service_ctx)?); + + Ok(()) } #[allow(unused)] @@ -261,17 +295,33 @@ impl InboxV2 { &self, service_ctx: &mut ServiceContext, payload_bytes: &[u8], - ) -> Result>, ChatError> { - let inbox_frame = InboxV2Frame::decode(payload_bytes)?; - + ) -> Result>>, ChatError> { + // On a broadcast transport the inbox address also receives traffic + // that isn't an invite (or that prost decodes into an empty frame). + // Treat anything we can't interpret as "not for us" and skip it, + // rather than failing the whole poll cycle. + let Ok(inbox_frame) = InboxV2Frame::decode(payload_bytes) else { + return Ok(None); + }; let Some(payload) = inbox_frame.payload else { - return Err(ChatError::Generic("InboxV2Payload missing".into())); + return Ok(None); }; match payload { - InviteType::GroupV1(group_v1_heavy_invite) => self - .handle_heavy_invite(service_ctx, group_v1_heavy_invite) - .map(Some), + InviteType::GroupV1(inv) => { + Ok(Some(Box::new(self.handle_heavy_invite(service_ctx, inv)?))) + } + InviteType::GroupV2(welcome_bytes) => { + let mut convo = self + .pending_demls + .borrow_mut() + .take() + .ok_or_else(|| ChatError::generic("no pending de-mls convo"))?; + let mw = + MemberWelcome::decode(welcome_bytes.as_slice()).map_err(ChatError::generic)?; + convo.accept_welcome(service_ctx, &mw)?; + Ok(Some(Box::new(convo))) + } } } @@ -307,7 +357,7 @@ impl InboxV2 { #[derive(Clone, PartialEq, Message)] pub struct InboxV2Frame { - #[prost(oneof = "InviteType", tags = "1")] + #[prost(oneof = "InviteType", tags = "1, 2")] pub payload: Option, } @@ -315,6 +365,8 @@ pub struct InboxV2Frame { pub enum InviteType { #[prost(message, tag = "1")] GroupV1(GroupV1HeavyInvite), + #[prost(bytes, tag = "2")] + GroupV2(Vec), } #[derive(Clone, PartialEq, Message)] diff --git a/core/integration_tests_core/tests/dev_tests.rs b/core/integration_tests_core/tests/dev_tests.rs index 0b55c5b..4e3dd1e 100644 --- a/core/integration_tests_core/tests/dev_tests.rs +++ b/core/integration_tests_core/tests/dev_tests.rs @@ -125,6 +125,10 @@ fn process(clients: &mut Vec, wakeups: &mut Vec, for client in clients.as_mut_slice() { client.process_messages(); } + + // de-mls deadlines are real wall-clock; sleep so the millisecond-scale + // commit/consensus timers actually elapse between poll cycles. + std::thread::sleep(std::time::Duration::from_millis(60)); } } @@ -203,8 +207,10 @@ fn pretty_print(prefix: impl Into) -> Box { let prefix = prefix.into(); Box::new(move |c: ContentData| { let cid = hex_trunc(c.conversation_id.as_bytes()); - let content = String::from_utf8(c.data).unwrap(); - println!("{} ({:?}) {}", prefix, cid, content) + let content = String::from_utf8_lossy(&c.data); + // Log via tracing (not println!) so received messages appear inline in + // the same INFO stream as the de-mls events, without needing --nocapture. + info!(target: "chat", convo = ?cid, "{prefix} received: {content}"); }) } @@ -369,22 +375,34 @@ fn core_client() { .create_group_convo(&[&clients[RAYA].account_id()]) .unwrap(); - // Manaully process the DS - process_all(&mut clients, &mut wakeups); - process_all(&mut clients, &mut wakeups); - // s_convo.send_content(b"HI").unwrap(); + // Bounded driver: de-mls reschedules its steward poll every tick, so a + // drain-until-empty loop (`process_all`) never terminates. Step a fixed + // number of seconds instead, like the de-mls integration tests do. + // + // This carries the commit through, fires `WelcomeReady`, routes the + // welcome to Raya's InboxV2 1-1 channel, and lets her `accept_welcome`. + // Run extra cycles afterward so Raya polls her inbox and joins after the + // welcome is published. + process(&mut clients, &mut wakeups, 80); - // Manaully process the DS - process_all(&mut clients, &mut wakeups); + // Raya joined via the invite path. + let raya_convos = clients[RAYA].list_conversations().unwrap(); + assert!( + !raya_convos.is_empty(), + "Raya should have joined the conversation via the welcome invite" + ); - // // TODO: Needs Invite path working first - // let convo_id = clients[RAYA].list_conversations().unwrap().pop().unwrap(); - // let r_convo = clients[RAYA].convo(&convo_id).expect("Convo exists"); - // process(&mut clients, &mut wakeups, 10); - // r_convo.send_content(b"PEW").unwrap(); - // process(&mut clients, &mut wakeups, 10); + // Saro sends a message; Raya receives it (look for "Raya received: HI" + // in the log). + info!(target: "chat", "Saro -> sending: HI"); + s_convo.send_content(b"HI").unwrap(); + process(&mut clients, &mut wakeups, 20); - // s_convo.send_content(b"SARO again").unwrap(); - // process(&mut clients, &mut wakeups, 10); - println!("Hello"); + // Raya replies; Saro receives it (look for "Saro received: hi back"). + let raya_convo = clients[RAYA] + .convo(&raya_convos[0]) + .expect("Raya must have a usable conversation handle"); + info!(target: "chat", "Raya -> sending: hi back"); + raya_convo.send_content(b"hi back").unwrap(); + process(&mut clients, &mut wakeups, 20); }