From 520fb46f4883490e605404a469cccca4caaa3c6b Mon Sep 17 00:00:00 2001 From: seemenkina Date: Thu, 11 Jun 2026 17:16:16 +0300 Subject: [PATCH] remove de-mls user and ds instance --- Cargo.lock | 113 +---- core/core_client/Cargo.toml | 2 +- core/core_client/src/conversation/group_v1.rs | 2 +- core/core_client/src/conversation/group_v2.rs | 460 ++++++++---------- core/core_client/src/errors.rs | 4 +- 5 files changed, 216 insertions(+), 365 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f251203..cdc430c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -774,7 +774,7 @@ dependencies = [ "objc2-foundation", "parking_lot", "percent-encoding", - "windows-sys 0.60.2", + "windows-sys 0.59.0", "x11rb", ] @@ -1890,7 +1890,7 @@ dependencies = [ [[package]] name = "de-mls" version = "3.0.0" -source = "git+https://github.com/vacp2p/de-mls?branch=main#a797191ca187fe0b057cb2035ef29f488b678764" +source = "git+https://github.com/vacp2p/de-mls?branch=develop#d838e832994fd1d14f624783741bc60b31510fa0" dependencies = [ "hashgraph-like-consensus", "indexmap 2.14.0", @@ -1900,7 +1900,6 @@ dependencies = [ "openmls_traits 0.5.0", "prost", "prost-build", - "serde_json", "sha2 0.11.0", "thiserror", "tracing", @@ -2198,7 +2197,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -3843,7 +3842,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -4684,7 +4683,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.60.2", + "windows-sys 0.59.0", ] [[package]] @@ -5138,7 +5137,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.12.1", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -5195,7 +5194,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -5868,7 +5867,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix 1.1.4", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -6572,7 +6571,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -6646,7 +6645,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.6", + "windows-targets", ] [[package]] @@ -6655,16 +6654,7 @@ version = "0.59.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" dependencies = [ - "windows-targets 0.52.6", -] - -[[package]] -name = "windows-sys" -version = "0.60.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" -dependencies = [ - "windows-targets 0.53.5", + "windows-targets", ] [[package]] @@ -6682,31 +6672,14 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm 0.52.6", - "windows_aarch64_msvc 0.52.6", - "windows_i686_gnu 0.52.6", - "windows_i686_gnullvm 0.52.6", - "windows_i686_msvc 0.52.6", - "windows_x86_64_gnu 0.52.6", - "windows_x86_64_gnullvm 0.52.6", - "windows_x86_64_msvc 0.52.6", -] - -[[package]] -name = "windows-targets" -version = "0.53.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" -dependencies = [ - "windows-link", - "windows_aarch64_gnullvm 0.53.1", - "windows_aarch64_msvc 0.53.1", - "windows_i686_gnu 0.53.1", - "windows_i686_gnullvm 0.53.1", - "windows_i686_msvc 0.53.1", - "windows_x86_64_gnu 0.53.1", - "windows_x86_64_gnullvm 0.53.1", - "windows_x86_64_msvc 0.53.1", + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", ] [[package]] @@ -6715,96 +6688,48 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" - [[package]] name = "windows_aarch64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" -[[package]] -name = "windows_aarch64_msvc" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" - [[package]] name = "windows_i686_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" -[[package]] -name = "windows_i686_gnu" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" - [[package]] name = "windows_i686_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" -[[package]] -name = "windows_i686_gnullvm" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" - [[package]] name = "windows_i686_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" -[[package]] -name = "windows_i686_msvc" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" - [[package]] name = "windows_x86_64_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" -[[package]] -name = "windows_x86_64_gnu" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" - [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" - [[package]] name = "windows_x86_64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" -[[package]] -name = "windows_x86_64_msvc" -version = "0.53.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" - [[package]] name = "winnow" version = "1.0.3" diff --git a/core/core_client/Cargo.toml b/core/core_client/Cargo.toml index b8b3e53..523f898 100644 --- a/core/core_client/Cargo.toml +++ b/core/core_client/Cargo.toml @@ -15,7 +15,7 @@ storage = { workspace = true } # External dependencies (sorted) alloy = "2.0" chat-proto = { git = "https://github.com/logos-messaging/chat_proto", branch = "main" } -de-mls = { git = "https://github.com/vacp2p/de-mls", branch = "main" } +de-mls = { git = "https://github.com/vacp2p/de-mls", branch = "develop" } hashgraph-like-consensus = "0.5.1" hex = "0.4.3" openmls = "0.8.1" diff --git a/core/core_client/src/conversation/group_v1.rs b/core/core_client/src/conversation/group_v1.rs index fd4310c..be5f4fc 100644 --- a/core/core_client/src/conversation/group_v1.rs +++ b/core/core_client/src/conversation/group_v1.rs @@ -7,7 +7,7 @@ use std::rc::Rc; use blake2::{Blake2b, Digest, digest::consts::U6}; use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload}; -use de_mls::app::ConversationState; +use de_mls::session::ConversationState; use openmls::prelude::tls_codec::Deserialize; use openmls::prelude::*; diff --git a/core/core_client/src/conversation/group_v2.rs b/core/core_client/src/conversation/group_v2.rs index d69c367..3fee1c0 100644 --- a/core/core_client/src/conversation/group_v2.rs +++ b/core/core_client/src/conversation/group_v2.rs @@ -5,28 +5,29 @@ use alloy::signers::local::PrivateKeySigner; use blake2::{Blake2b, Digest, digest::consts::U6}; use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload}; -use de_mls::app::{ConsensusContext, ConversationConfig, SessionTick, User, UserPlugins}; -use de_mls::core::{ConversationState, ScoringConfig, SessionEvent, StewardListConfig}; +use de_mls::core::{ + ConsensusPlugin, ConsensusServiceFor, ConversationEvent, ConversationPluginsFactory, + ConversationState, ScoringConfig, StewardListConfig, +}; use de_mls::defaults::{ DefaultConsensusPlugin, DefaultConversationPluginsFactory, MemoryDeMlsStorage, }; -use de_mls::ds::{APP_MSG_SUBTOPIC, DeliveryServiceError, InboundPacket, OutboundPacket}; use de_mls::member_id::MemberId; use de_mls::mls_crypto::MlsCredentials; use de_mls::protos::de_mls::messages::v1::{ AppMessage as AppMessageProto, MemberWelcome, app_message, }; +use de_mls::session::{Conversation, ConversationConfig, ConversationDeps}; use hashgraph_like_consensus::signing::EthereumConsensusSigner; use libchat::WakeupService; use prost::Message; use rand::{self, Rng}; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::Duration; -use tracing::{debug, info, instrument}; +use tracing::{info, instrument}; use crate::AccountId; use crate::conversation::{ConversationIdRef, ExternalServices, ServiceContext}; -use crate::inbox_v2::MlsIdentityProvider; use crate::{ AddressedEncryptedPayload, ContentData, DeliveryService, RegistrationService, conversation::{BaseConvo, BaseGroupConvo, ChatError, Id}, @@ -55,89 +56,74 @@ impl MemberId for LocalDemlsMember { } } -#[derive(Debug)] -// This Maps a Demls::DeliveryService to a crate::service_traits::DeliveryService -// It works by caching outbound messages to a Vec which is eventually drained when -// The ServiceContext is available. -// -// All methods in Convo must call drain, to ensure that messages go out. -pub struct BufferDs { - queue: Vec, +struct DemlsSetup { + member: LocalDemlsMember, + factory: DefaultConversationPluginsFactory, + consensus_storage: ::ConsensusStorage, + consensus_signer: EthereumConsensusSigner, + app_id: Vec, // random bytes; echo-dedup key + config: ConversationConfig, // the ms-scale test timers, as before } -impl BufferDs { - pub fn new() -> Self { - Self { queue: vec![] } +impl DemlsSetup { + fn new(identity_name: String) -> Result { + let member = LocalDemlsMember::new(identity_name); + let credentials = Arc::new(MlsCredentials::from_member_id(&member)?); + let factory = DefaultConversationPluginsFactory::new( + Arc::new(MemoryDeMlsStorage::new()), + credentials, + ); + // TODO(config): TEST-ONLY millisecond timers. de-mls deadlines are real + // wall-clock, so the default 60s timers never fire under fast virtual + // time. Production needs a real config injected from the caller, not + // these hardcoded values. + let config = ConversationConfig { + commit_inactivity_duration: Duration::from_millis(50), + freeze_duration: Duration::from_millis(20), + voting_delay: Duration::from_millis(30), + election_voting_delay: Duration::from_millis(30), + consensus_timeout: Duration::from_millis(150), + proposal_expiration: Duration::from_millis(2000), + ..ConversationConfig::default() + }; + Ok(DemlsSetup { + member, + factory, + consensus_storage: DefaultConsensusPlugin::new_storage(), + consensus_signer: EthereumConsensusSigner::new(PrivateKeySigner::random()), + app_id: rand_string(5).as_bytes().to_vec(), + config, + }) } - // Warn: Messages are not sent untill drain is called, which is after the return from User. - // If de-mls relies on interactive sends, this will not work. - pub fn drain( - &mut self, - service_ctx: &mut ServiceContext, - ) -> Result<(), ChatError> { - // Swap the Vec out; Own then existing and replace with a new empty vec. - for pkt in self.queue.drain(..) { - debug!( - app = pkt.app_id.as_slice(), - convo = pkt.conversation_id, - topic = pkt.subtopic, - pkt = pkt.payload.as_slice(), - "Draining" - ); - - let hash = Blake2b::::new() - .chain_update("delivery_addr|") - .chain_update(&pkt.conversation_id) - .finalize(); - let delivery_address = hex::encode(hash); - // All Payloads leaving GroupV2 are a GroupV2Frame - let frame = GroupV2Frame { - payload: Some(GroupV2Payload::DeMlsWrapper(pkt.payload.into())), - sender_app_id: pkt.app_id.clone(), // pkt.app_id is the sender's User app_id - }; - - // Wrap in EncryptedPayload - let payload = AddressedEncryptedPayload { - // Note: Likely a mismatch herem as de-mls is expecting a specific topic. - delivery_address, - data: EncryptedPayload { - encryption: Some(encrypted_payload::Encryption::Plaintext(Plaintext { - payload: frame.encode_to_vec().into(), - })), - }, - }; - - let env = payload.into_envelope(pkt.conversation_id.clone()); - - service_ctx.ds.publish(env).map_err(ChatError::generic)?; + /// Call exactly once per Conversation construction. + fn deps( + &self, + ) -> ConversationDeps<'_, DefaultConsensusPlugin, DefaultConversationPluginsFactory> { + ConversationDeps { + plugins: &self.factory, + consensus: ConsensusServiceFor::::new_with_components( + self.consensus_storage.clone(), + DefaultConsensusPlugin::new_event_bus(), + self.consensus_signer.clone(), + 10, + ), + identity: &self.member, + app_id: Arc::from(self.app_id.as_slice()), + config: self.config.clone(), + scoring_config: ScoringConfig::default(), + steward_list_config: StewardListConfig::default(), } - - Ok(()) - } -} - -impl de_mls::ds::DeliveryService for BufferDs { - type Error = DeliveryServiceError; - - fn publish(&mut self, packet: de_mls::ds::OutboundPacket) -> Result<(), Self::Error> { - info!(topic = packet.subtopic, "Publish"); - self.queue.push(packet); - Ok(()) - } - - fn subscribe(&mut self, _delivery_address: &str) -> Result<(), Self::Error> { - todo!() } } pub struct GroupV2Convo { convo_id: String, - user: User, - // DeMLS takes shared ownership over the DS, so its incompatible with the &mut ServiceContext - // Use a wrapper for now, and then look at refactoring. - buffer_ds: Arc>, - app_id: String, + setup: DemlsSetup, + conversation: Option>, + /// Member-ids we proposed via add_member. WelcomeReady now fires on + /// every member; we forward a welcome only to joiners WE invited. + pending_invites: Vec>, } impl std::fmt::Debug for GroupV2Convo { @@ -157,121 +143,58 @@ fn rand_string(n: usize) -> String { } impl GroupV2Convo { - /// Build a de-mls `User` (plugins + BufferDs transport) without starting - /// any conversation. Shared by `new` (creator) and `new_pending` (joiner). - fn build_demls( - identity_name: String, - ) -> Result< - ( - User, - Arc>, - String, - ), - ChatError, - > { - let identity = LocalDemlsMember::new(identity_name); - let credentials = Arc::new(MlsCredentials::from_member_id(&identity)?); - let storage = Arc::new(MemoryDeMlsStorage::new()); - let conversation_plugins = DefaultConversationPluginsFactory::new(storage, credentials); - - let consensus_signer = EthereumConsensusSigner::new(PrivateKeySigner::random()); - let consensus = ConsensusContext::::new(consensus_signer); - - // TODO(config): TEST-ONLY millisecond timers. de-mls deadlines are real - // wall-clock, so the default 60s timers never fire under fast virtual - // time. Production needs a real config injected from the caller, not - // these hardcoded values. - let conversation_config = ConversationConfig { - commit_inactivity_duration: Duration::from_millis(50), - freeze_duration: Duration::from_millis(20), - voting_delay: Duration::from_millis(30), - election_voting_delay: Duration::from_millis(30), - consensus_timeout: Duration::from_millis(150), - proposal_expiration: Duration::from_millis(2000), - ..ConversationConfig::default() - }; - - let plugins = UserPlugins { - conversation_plugins, - consensus, - default_conversation_config: conversation_config, - default_scoring_config: ScoringConfig::default(), - default_steward_list_config: StewardListConfig::default(), - }; - - let transport = Arc::new(Mutex::new(BufferDs::new())); - let user = User::new_with_plugins(Box::new(identity), plugins, transport.clone()); - Ok((user, transport, rand_string(5))) - } - pub fn new( service_ctx: &mut ServiceContext, ) -> Result { + let setup = DemlsSetup::new(service_ctx.identity_provider.friendly_name())?; let convo_id = rand_string(5); - let identity_name = service_ctx.identity_provider.friendly_name(); - let (mut user, transport, app_id) = Self::build_demls(identity_name)?; - - user.start_conversation(convo_id.as_str(), true)?; - - // Ensure that the BufferDs gets drained - transport.lock().unwrap().drain(service_ctx)?; - - Ok(Self { + let conversation = Conversation::create(&convo_id, setup.deps())?; + Ok(GroupV2Convo { convo_id, - user, - buffer_ds: transport, - app_id, + setup, + conversation: Some(conversation), + pending_invites: vec![], }) } - /// Joiner side: build a de-mls `User` and register its key package under - /// the account name, but do NOT start a conversation. `convo_id` stays - /// empty until [`Self::accept_welcome`] fills it. + /// Joiner side: register a fresh key package under the account name, + /// but do NOT start a conversation. `convo_id` stays empty until + /// [`Self::accept_welcome`] fills it. pub fn new_pending( service_ctx: &mut ServiceContext, ) -> Result { let name = service_ctx.identity_provider.friendly_name(); - let (user, transport, app_id) = Self::build_demls(name.clone())?; - - let kp = user.generate_key_package()?; + let setup = DemlsSetup::new(name.clone())?; + let kp = setup.factory.generate_key_package()?; service_ctx .rs .register(&name, kp.as_bytes().to_vec()) .map_err(ChatError::generic)?; - Ok(Self { + Ok(GroupV2Convo { convo_id: String::new(), - user, - buffer_ds: transport, - app_id, + setup, + conversation: None, + pending_invites: vec![], }) } /// Joiner side: ingest a de-mls welcome handed over the InboxV2 1-1 - /// channel. Attaches MLS (filling `convo_id`), replays the bundled - /// `ConversationSync`, then subscribes to the conversation address. + /// channel. `from_welcome` attaches MLS and applies the bundled + /// `ConversationSync` in one call; we then subscribe to the + /// conversation address and flush the join broadcast. pub fn accept_welcome( &mut self, service_ctx: &mut ServiceContext, welcome: &MemberWelcome, ) -> Result<(), ChatError> { - let (convo_id, tick) = self.user.accept_welcome(&welcome.welcome_bytes)?; - self.convo_id = convo_id; - - if !welcome.conversation_sync_bytes.is_empty() { - let pkt = InboundPacket::new( - welcome.conversation_sync_bytes.clone(), - APP_MSG_SUBTOPIC, - &self.convo_id, - self.user.app_id().to_vec(), - 0, - ); - self.user.process_inbound_packet(pkt)?; - } - - let events = self.user.drain_events(&self.convo_id)?; - self.init(service_ctx)?; - self.after_op(service_ctx, tick, &events) + let conv = Conversation::from_welcome(self.setup.deps(), welcome)? + .ok_or_else(|| ChatError::generic("welcome not addressed to this member"))?; + self.convo_id = conv.id().to_string(); + self.conversation = Some(conv); + self.init(service_ctx)?; // subscribe + self.after_op(service_ctx)?; // flush join broadcast + schedule wakeup + Ok(()) } fn delivery_address_from_id(convo_id: &str) -> String { @@ -281,24 +204,6 @@ impl GroupV2Convo { .finalize(); hex::encode(hash) } - - #[allow(unused)] - fn delivery_address(&self) -> String { - Self::delivery_address_from_id(&self.convo_id) - } - - fn ctrl_delivery_address_from_id(convo_id: &str) -> String { - Self::delivery_address_from_id(convo_id) - } - #[allow(unused)] - fn ctrl_delivery_address(&self) -> String { - Self::ctrl_delivery_address_from_id(&self.convo_id) - } - - // Needed by Demls - fn app_id(&self) -> &str { - &self.app_id - } } impl Id for GroupV2Convo { @@ -313,18 +218,10 @@ where { fn init(&self, service_ctx: &mut super::ServiceContext) -> Result<(), ChatError> { // Configure the delivery service to listen for the required delivery addresses. - service_ctx .ds .subscribe(&Self::delivery_address_from_id(&self.convo_id)) .map_err(ChatError::generic)?; - service_ctx - .ds - .subscribe(&Self::ctrl_delivery_address_from_id(&self.convo_id)) - .map_err(ChatError::generic)?; - - // Ensure that the BufferDs gets drained - self.buffer_ds.lock().unwrap().drain(service_ctx)?; Ok(()) } @@ -334,13 +231,12 @@ where service_ctx: &mut super::ServiceContext, content: &[u8], ) -> Result<(), ChatError> { - let _signer = MlsIdentityProvider(&service_ctx.identity_provider); - - let tick = self - .user - .send_app_message(&self.convo_id, content.to_vec())?; - // Ensure that the BufferDs gets drained - done inside after_op - self.after_op(service_ctx, tick, &vec![])?; + let conv = self + .conversation + .as_mut() + .ok_or_else(|| ChatError::generic("conversation not found"))?; + conv.send_message(content.to_vec())?; + self.after_op(service_ctx)?; Ok(()) } @@ -362,29 +258,30 @@ where _ => return Ok(None), }; - // Fake a InboundPacket - let packet = InboundPacket { - payload: inner, - subtopic: APP_MSG_SUBTOPIC.to_string(), // Assume APP TOPIC, Welcome Messages go to InboxV2 - conversation_id: self.convo_id.to_string(), - app_id: frame.sender_app_id, - timestamp: 0, - }; - - info!(len = packet.payload.len(), "Inbound Pkt"); - let tick = self.user.process_inbound_packet(packet)?; - let events = self.user.drain_events(&self.convo_id)?; - let out = self.events_to_content(events.clone()); - self.after_op(service_ctx, tick, &events)?; - Ok(out) + let conv = self + .conversation + .as_mut() + .ok_or_else(|| ChatError::generic("no conversation"))?; + conv.process_inbound(&frame.sender_app_id, &inner)?; + conv.poll(); + let events = self.after_op(service_ctx)?; // route + publish + re-arm, returns events + Ok(self.events_to_content(&events)) } #[instrument(name = "groupv2.wakeup", skip_all, fields(user_id = %ctx.identity_provider.friendly_name()))] fn wakeup(&mut self, ctx: &mut ServiceContext) -> Result<(), ChatError> { - info!(app = self.app_id(), "Wakeup"); - let tick = self.user.poll_session(&self.convo_id)?; - let events = self.user.drain_events(&self.convo_id)?; - self.after_op(ctx, tick, &events) + info!(convo = %self.convo_id, "Wakeup"); + let Some(conv) = self.conversation.as_mut() else { + return Ok(()); // pending joiner: no deadlines exist yet + }; + let outcome = conv.poll(); + if outcome.leave_requested { + // Commit ejected us (or join expired). Real handling - drops + // this convo from its map; + tracing::warn!(convo = %self.convo_id, "conversation requested teardown"); + } + self.after_op(ctx)?; // publish what poll produced + re-arm alarm + Ok(()) } } @@ -398,25 +295,38 @@ where service_ctx: &mut ServiceContext, members: &[&AccountId], ) -> Result<(), ChatError> { - let mut last_tick = SessionTick { - next_wakeup_in: None, - }; + // Record who WE invited before touching the conversation: after_op + // forwards a welcome only to joiners in pending_invites (member-id + // bytes == account name bytes for LocalDemlsMember). + let mut kps = Vec::with_capacity(members.len()); for member in members { let kp_bytes = service_ctx .rs .retrieve(member) .map_err(ChatError::generic)? .ok_or_else(|| ChatError::generic("No key package"))?; - last_tick = self.user.add_member(&self.convo_id, &kp_bytes)?; + self.pending_invites + .push(member.as_str().as_bytes().to_vec()); + kps.push(kp_bytes); } - let events = self.user.drain_events(&self.convo_id)?; - self.after_op(service_ctx, last_tick, &events) + + let conv = self + .conversation + .as_mut() + .ok_or_else(|| ChatError::generic("no conversation"))?; + for kp_bytes in &kps { + conv.add_member(kp_bytes)?; + } + self.after_op(service_ctx)?; + Ok(()) } fn conversation_state(&self) -> Result { - self.user - .get_conversation_state(&self.convo_id) - .map_err(ChatError::DeMlsGeneric) + Ok(self + .conversation + .as_ref() + .map(|c| c.state()) + .unwrap_or(ConversationState::PendingJoin)) } } @@ -424,54 +334,70 @@ impl GroupV2Convo { fn after_op( &mut self, service_ctx: &mut ServiceContext, - tick: SessionTick, - events: &[SessionEvent], - ) -> Result<(), ChatError> { - // Route each welcome to the joiners it names over their InboxV2 1-1 - // channel. The welcome carries `joiner_identities` (member-id bytes = - // account name), so any node that commits an Add can address delivery - // — no local invite tracking, and batch Adds route correctly. - for evt in events { - if let SessionEvent::WelcomeReady(welcome) = evt { + ) -> Result, ChatError> { + let Some(conv) = self.conversation.as_ref() else { + return Ok(Vec::new()); // still pending join — nothing buffered + }; + // Pull everything first (these are &self, take-all): + let events = conv.drain_events(); + let outbound = conv.drain_outbound(); // Vec + let wakeup = conv.next_wakeup_in(); + + // 1. Route welcomes for joiners WE invited (event fires on every member now). + for evt in &events { + if let ConversationEvent::WelcomeReady { welcome, .. } = evt { for joiner in &welcome.joiner_identities { - let name = String::from_utf8(joiner.clone()).map_err(ChatError::generic)?; - let account = AccountId::new(name); - crate::inbox_v2::invite_user_v2(&mut service_ctx.ds, &account, welcome)?; + if let Some(i) = self.pending_invites.iter().position(|p| p == joiner) { + self.pending_invites.remove(i); + let name = String::from_utf8(joiner.clone()).map_err(ChatError::generic)?; + crate::inbox_v2::invite_user_v2( + &mut service_ctx.ds, + &AccountId::new(name), + welcome, + )?; + } } } } - self.buffer_ds.lock().unwrap().drain(service_ctx)?; - if let Some(d) = tick.next_wakeup_in { - // TODO(chat): WakeupService is second-granularity but de-mls - // deadlines are sub-second; `as_secs().max(1)` floors them up to 1s, - // silently over-waiting. Needs a millisecond-capable wakeup. + // 2. Publish + for out in outbound { + let frame = GroupV2Frame { + payload: Some(GroupV2Payload::DeMlsWrapper(out.payload.into())), + sender_app_id: out.sender, // was pkt.app_id + }; + let payload = AddressedEncryptedPayload { + delivery_address: Self::delivery_address_from_id(&out.conversation_id), + data: EncryptedPayload { + encryption: Some(encrypted_payload::Encryption::Plaintext(Plaintext { + payload: frame.encode_to_vec().into(), + })), + }, + }; + service_ctx + .ds + .publish(payload.into_envelope(out.conversation_id)) + .map_err(ChatError::generic)?; + } + + // 3. Re-arm the alarm with the conversation's earliest deadline. + if let Some(d) = wakeup { service_ctx.wakeup_service.wakeup_in(d, &self.convo_id); } - Ok(()) + Ok(events) } - fn events_to_content(&mut self, events: Vec) -> Option { - let mut latest: Option = None; - - for evt in events { - match evt { - SessionEvent::AppMessage(AppMessageProto { payload: Some(p) }) => match p { - app_message::Payload::ConversationMessage(cm) => { - latest = Some(ContentData { - conversation_id: self.convo_id.clone().into(), - data: cm.message, - is_new_convo: false, - }); - } - // All other types is an inside group traffic — not chat content. - _ => {} - }, - _ => {} - } - } - - latest + fn events_to_content(&self, events: &[ConversationEvent]) -> Option { + events.iter().find_map(|evt| match evt { + ConversationEvent::AppMessage(AppMessageProto { + payload: Some(app_message::Payload::ConversationMessage(cm)), + }) => Some(ContentData { + conversation_id: self.convo_id.clone().into(), + data: cm.message.clone(), + is_new_convo: false, + }), + _ => None, + }) } } diff --git a/core/core_client/src/errors.rs b/core/core_client/src/errors.rs index cf620d0..1c06d80 100644 --- a/core/core_client/src/errors.rs +++ b/core/core_client/src/errors.rs @@ -1,4 +1,4 @@ -use de_mls::{app::UserError, mls_crypto::MlsError}; +use de_mls::{mls_crypto::MlsError, session::ConversationError}; use openmls::prelude::tls_codec; pub use thiserror::Error; @@ -15,7 +15,7 @@ pub enum ChatError { #[error("Demls: {0}")] DemlsWrapped(#[from] MlsError), #[error("Demls generic: {0}")] - DeMlsGeneric(#[from] UserError), + DeMlsGeneric(#[from] ConversationError), } impl ChatError {