From af84ce31b1290a06925fa791d4d51b00abdcf4dd Mon Sep 17 00:00:00 2001 From: seemenkina Date: Mon, 25 May 2026 17:00:06 +0700 Subject: [PATCH] Update de-mls dependency source and version; refactor GroupV2 to use MemberId and SessionEvent, and enhance BufferDs for welcome event handling. --- Cargo.lock | 15 +- core/core_client/Cargo.toml | 2 +- core/core_client/src/conversation/group_v2.rs | 169 ++++++++++++++---- 3 files changed, 139 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 58b95f4..0556985 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1881,9 +1881,10 @@ dependencies = [ [[package]] name = "de-mls" version = "3.0.0" -source = "git+https://github.com/vacp2p/de-mls#8be7a9ef1748fe0216bf7272ec4d034f26047122" +source = "git+https://github.com/vacp2p/de-mls?branch=main#deafb714d2bfc0c98af3f661161b92f5d621c306" dependencies = [ "hashgraph-like-consensus", + "indexmap 2.14.0", "openmls", "openmls_basic_credential", "openmls_rust_crypto 0.5.1", @@ -2187,7 +2188,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3850,7 +3851,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -5168,7 +5169,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.12.1", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -5225,7 +5226,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -5887,7 +5888,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix 1.1.4", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -6613,7 +6614,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/core/core_client/Cargo.toml b/core/core_client/Cargo.toml index e730269..6199b58 100644 --- a/core/core_client/Cargo.toml +++ b/core/core_client/Cargo.toml @@ -17,7 +17,7 @@ storage = { workspace = true } # External dependencies (sorted) alloy = "1.8.3" chat-proto = { git = "https://github.com/logos-messaging/chat_proto", branch ="version_rollback" } -de-mls = { git = "https://github.com/vacp2p/de-mls" } +de-mls = { git = "https://github.com/vacp2p/de-mls", branch = "main"} hashgraph-like-consensus = "0.4.0" hex = "0.4.3" openmls = "0.8.1" diff --git a/core/core_client/src/conversation/group_v2.rs b/core/core_client/src/conversation/group_v2.rs index aad2f66..8afddb5 100644 --- a/core/core_client/src/conversation/group_v2.rs +++ b/core/core_client/src/conversation/group_v2.rs @@ -17,15 +17,19 @@ macro_rules! run_async { use alloy::signers::local::PrivateKeySigner; use blake2::{Blake2b, Digest, digest::consts::U6}; use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload}; -use de_mls::app::{ConsensusContext, ConversationConfig, User, UserPlugins}; -use de_mls::core::{ScoringConfig, StewardListConfig}; +use de_mls::app::{ConsensusContext, ConversationConfig, SessionTick, User, UserPlugins}; +use de_mls::core::{ScoringConfig, SessionEvent, StewardListConfig}; use de_mls::defaults::{ DefaultConsensusPlugin, DefaultConversationPluginsFactory, MemoryDeMlsStorage, }; use de_mls::ds::{APP_MSG_SUBTOPIC, DeliveryServiceError, InboundPacket, OutboundPacket}; -use de_mls::identity::Identity; +use de_mls::member_id::MemberId; use de_mls::mls_crypto::MlsCredentials; +use de_mls::protos::de_mls::messages::v1::{ + AppMessage as AppMessageProto, MemberWelcome, app_message, +}; use hashgraph_like_consensus::signing::EthereumConsensusSigner; +use libchat::WakeupService; use prost::Message; use rand::{self, Rng}; use std::sync::{Arc, Mutex}; @@ -40,24 +44,24 @@ use crate::{ const APP_NAME: &str = "sdkchat"; -/// This is a Test Wrapper of Demls Identitity Trait -/// Linchat has its own trait that will need to be intergrated at somepoint. -pub struct LocalDemlsIdent { +/// This is a Test Wrapper of Demls MemberId Trait +/// Libchat has its own trait that will need to be intergrated at somepoint. +pub struct LocalDemlsMember { name: String, } -impl LocalDemlsIdent { +impl LocalDemlsMember { pub fn new(name: impl Into) -> Self { Self { name: name.into() } } } -impl Identity for LocalDemlsIdent { - fn identity_bytes(&self) -> &[u8] { +impl MemberId for LocalDemlsMember { + fn member_id_bytes(&self) -> &[u8] { self.name.as_bytes() } - fn identity_display(&self) -> &str { + fn member_id_display(&self) -> &str { &self.name } } @@ -70,11 +74,25 @@ impl Identity for LocalDemlsIdent { // All methods in Convo must call drain, to ensure that messages go out. pub struct BufferDs { queue: Vec, + welcomes: Vec, } impl BufferDs { pub fn new() -> Self { - Self { queue: vec![] } + Self { + queue: vec![], + welcomes: vec![], + } + } + + /// Lift welcomes out of session events into the welcome queue. + /// Other event variants are ignored — they're not "things to send." + fn retrive_welcome_event(&mut self, events: &[SessionEvent]) { + for evt in events { + if let SessionEvent::WelcomeReady(w) = evt { + self.welcomes.push(w.clone()); + } + } } // Warn: Messages are not sent untill drain is called, which is after the return from User. @@ -85,7 +103,11 @@ impl BufferDs { ) -> Result<(), ChatError> { // Swap the Vec out; Own then existing and replace with a new empty vec. for pkt in self.queue.drain(..) { - let delivery_address = pkt.delivery_address().to_string(); + let hash = Blake2b::::new() + .chain_update("delivery_addr|") + .chain_update(&pkt.conversation_id) + .finalize(); + let delivery_address = hex::encode(hash); // All Payloads leaving GroupV2 are a GroupV2Frame let frame = GroupV2Frame { payload: Some(GroupV2Payload::DeMlsWrapper(pkt.payload.into())), @@ -102,11 +124,26 @@ impl BufferDs { }, }; + // TODO(libchat: "Verify payloads routing"): + // GroupV2Frame currently drops sender app_id. de-mls's + // process_inbound_packet uses app_id to filter self-messages — without + // round-tripping the sender's id through the frame, every inbound packet + // looks like a self-echo and gets dropped. + let env = payload.into_envelope(pkt.conversation_id.clone()); service_ctx.ds.publish(env).map_err(ChatError::generic)?; } + // TODO: build proper convertion ao welcome bundle + // for w in self.welcomes.drain(..) { + // let envelope = build_inbox_welcome_envelope(w); + // service_ctx + // .ds + // .publish(envelope) + // .map_err(ChatError::generic)?; + // } + Ok(()) } } @@ -157,10 +194,10 @@ impl GroupV2Convo { let convo_id = rand_string(5); let signer = PrivateKeySigner::random(); // let identity = WalletIdentity::from_wallet(signer.address()); - let identity = LocalDemlsIdent::new(signer.address().to_string()); + let identity = LocalDemlsMember::new(signer.address().to_string()); let credentials = - Arc::new(MlsCredentials::from_identity(&identity).map_err(ChatError::generic)?); + Arc::new(MlsCredentials::from_member_id(&identity).map_err(ChatError::generic)?); let storage = Arc::new(MemoryDeMlsStorage::new()); let conversation_plugins = DefaultConversationPluginsFactory::new(storage, credentials); @@ -253,13 +290,18 @@ where fn send_content( &mut self, service_ctx: &mut super::ServiceContext, - _content: &[u8], + content: &[u8], ) -> Result<(), ChatError> { let _signer = MlsIdentityProvider(&service_ctx.identity_provider); - // TODO: Send content - // Ensure that the BufferDs gets drained - self.buffer_ds.lock().unwrap().drain(service_ctx)?; + let tick = run_async!( + self.user + .send_app_message(&self.convo_id, content.to_vec()) + .await + .unwrap() + ); + // Ensure that the BufferDs gets drained - done inside after_op + self.after_op(service_ctx, tick, &vec![])?; Ok(()) } @@ -284,20 +326,23 @@ where timestamp: 0, }; - run_async!(self.user.process_inbound_packet(packet).await.unwrap()); - - // TODO: Return Content types; This is moving towards an event system soon, so ignore if getting - // the concrete Content is difficult - - // Ensure that the BufferDs gets drained - self.buffer_ds.lock().unwrap().drain(service_ctx)?; - - Ok(None) + let tick = run_async!(self.user.process_inbound_packet(packet).await.unwrap()); + let events = self + .user + .drain_events(&self.convo_id) + .map_err(ChatError::generic)?; + let out = self.events_to_content(events.clone()); + self.after_op(service_ctx, tick, &events)?; + Ok(out) } - fn wakeup(&mut self, _service_ctx: &mut ServiceContext) -> Result<(), ChatError> { - // todo!() - Ok(()) + fn wakeup(&mut self, ctx: &mut ServiceContext) -> Result<(), ChatError> { + let tick = run_async!(self.user.poll_session(&self.convo_id).await.unwrap()); + let events = self + .user + .drain_events(&self.convo_id) + .map_err(ChatError::generic)?; + self.after_op(ctx, tick, &events) } } @@ -310,23 +355,69 @@ where service_ctx: &mut ServiceContext, members: &[&AccountId], ) -> Result<(), ChatError> { + let mut last_tick = SessionTick { + next_wakeup_in: None, + }; for member in members { - let key_package_opt = service_ctx + let kp_bytes = service_ctx .rs .retrieve(member) + .map_err(ChatError::generic)? + .ok_or_else(|| ChatError::generic("No key package"))?; + last_tick = run_async!(self.user.add_member(&self.convo_id, &kp_bytes).await) .map_err(ChatError::generic)?; + // TODO(libchat: "Parse welcomes and create GroupV2"): + // remember `member` so we can route the eventual WelcomeReady + // event to its delivery_address. Needs API decision with libchat. + } + let events = self + .user + .drain_events(&self.convo_id) + .map_err(ChatError::generic)?; + self.after_op(service_ctx, last_tick, &events) + } +} - let Some(_key_package_bytes) = key_package_opt else { - return Err(ChatError::generic("No Keypackage")); - }; +impl GroupV2Convo { + fn after_op( + &self, + service_ctx: &mut ServiceContext, + tick: SessionTick, + events: &[SessionEvent], + ) -> Result<(), ChatError> { + let mut buf = self.buffer_ds.lock().unwrap(); + buf.retrive_welcome_event(events); + buf.drain(service_ctx)?; + drop(buf); + if let Some(d) = tick.next_wakeup_in { + service_ctx + .wakeup_service + .wakeup_in(d.as_secs().max(1) as u32, &self.convo_id); + } + Ok(()) + } - // todo!("Implement function which adds member via Keypackage"); + fn events_to_content(&mut self, events: Vec) -> Option { + let mut latest: Option = None; + + for evt in events { + match evt { + SessionEvent::AppMessage(AppMessageProto { payload: Some(p) }) => match p { + app_message::Payload::ConversationMessage(cm) => { + latest = Some(ContentData { + conversation_id: self.convo_id.clone().into(), + data: cm.message, + is_new_convo: false, + }); + } + // All other types is an inside group traffic — not chat content. + _ => {} + }, + _ => {} + } } - // Ensure that the BufferDs gets drained - self.buffer_ds.lock().unwrap().drain(service_ctx)?; - - Ok(()) + latest } }