remove de-mls user and ds instance

This commit is contained in:
seemenkina 2026-06-11 17:16:16 +03:00
parent 010153d55d
commit 520fb46f48
No known key found for this signature in database
5 changed files with 216 additions and 365 deletions

113
Cargo.lock generated
View File

@ -774,7 +774,7 @@ dependencies = [
"objc2-foundation",
"parking_lot",
"percent-encoding",
"windows-sys 0.60.2",
"windows-sys 0.59.0",
"x11rb",
]
@ -1890,7 +1890,7 @@ dependencies = [
[[package]]
name = "de-mls"
version = "3.0.0"
source = "git+https://github.com/vacp2p/de-mls?branch=main#a797191ca187fe0b057cb2035ef29f488b678764"
source = "git+https://github.com/vacp2p/de-mls?branch=develop#d838e832994fd1d14f624783741bc60b31510fa0"
dependencies = [
"hashgraph-like-consensus",
"indexmap 2.14.0",
@ -1900,7 +1900,6 @@ dependencies = [
"openmls_traits 0.5.0",
"prost",
"prost-build",
"serde_json",
"sha2 0.11.0",
"thiserror",
"tracing",
@ -2198,7 +2197,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb"
dependencies = [
"libc",
"windows-sys 0.61.2",
"windows-sys 0.59.0",
]
[[package]]
@ -3843,7 +3842,7 @@ version = "0.50.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
dependencies = [
"windows-sys 0.61.2",
"windows-sys 0.59.0",
]
[[package]]
@ -4684,7 +4683,7 @@ dependencies = [
"once_cell",
"socket2",
"tracing",
"windows-sys 0.60.2",
"windows-sys 0.59.0",
]
[[package]]
@ -5138,7 +5137,7 @@ dependencies = [
"errno",
"libc",
"linux-raw-sys 0.12.1",
"windows-sys 0.61.2",
"windows-sys 0.59.0",
]
[[package]]
@ -5195,7 +5194,7 @@ dependencies = [
"security-framework",
"security-framework-sys",
"webpki-root-certs",
"windows-sys 0.61.2",
"windows-sys 0.59.0",
]
[[package]]
@ -5868,7 +5867,7 @@ dependencies = [
"getrandom 0.4.2",
"once_cell",
"rustix 1.1.4",
"windows-sys 0.61.2",
"windows-sys 0.59.0",
]
[[package]]
@ -6572,7 +6571,7 @@ version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
dependencies = [
"windows-sys 0.61.2",
"windows-sys 0.59.0",
]
[[package]]
@ -6646,7 +6645,7 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [
"windows-targets 0.52.6",
"windows-targets",
]
[[package]]
@ -6655,16 +6654,7 @@ version = "0.59.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b"
dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-sys"
version = "0.60.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb"
dependencies = [
"windows-targets 0.53.5",
"windows-targets",
]
[[package]]
@ -6682,31 +6672,14 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
dependencies = [
"windows_aarch64_gnullvm 0.52.6",
"windows_aarch64_msvc 0.52.6",
"windows_i686_gnu 0.52.6",
"windows_i686_gnullvm 0.52.6",
"windows_i686_msvc 0.52.6",
"windows_x86_64_gnu 0.52.6",
"windows_x86_64_gnullvm 0.52.6",
"windows_x86_64_msvc 0.52.6",
]
[[package]]
name = "windows-targets"
version = "0.53.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3"
dependencies = [
"windows-link",
"windows_aarch64_gnullvm 0.53.1",
"windows_aarch64_msvc 0.53.1",
"windows_i686_gnu 0.53.1",
"windows_i686_gnullvm 0.53.1",
"windows_i686_msvc 0.53.1",
"windows_x86_64_gnu 0.53.1",
"windows_x86_64_gnullvm 0.53.1",
"windows_x86_64_msvc 0.53.1",
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_gnullvm",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc",
]
[[package]]
@ -6715,96 +6688,48 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
[[package]]
name = "windows_aarch64_msvc"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006"
[[package]]
name = "windows_i686_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
[[package]]
name = "windows_i686_gnu"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3"
[[package]]
name = "windows_i686_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
[[package]]
name = "windows_i686_gnullvm"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c"
[[package]]
name = "windows_i686_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
[[package]]
name = "windows_i686_msvc"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
[[package]]
name = "windows_x86_64_gnu"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "windows_x86_64_msvc"
version = "0.53.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650"
[[package]]
name = "winnow"
version = "1.0.3"

View File

@ -15,7 +15,7 @@ storage = { workspace = true }
# External dependencies (sorted)
alloy = "2.0"
chat-proto = { git = "https://github.com/logos-messaging/chat_proto", branch = "main" }
de-mls = { git = "https://github.com/vacp2p/de-mls", branch = "main" }
de-mls = { git = "https://github.com/vacp2p/de-mls", branch = "develop" }
hashgraph-like-consensus = "0.5.1"
hex = "0.4.3"
openmls = "0.8.1"

View File

@ -7,7 +7,7 @@ use std::rc::Rc;
use blake2::{Blake2b, Digest, digest::consts::U6};
use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload};
use de_mls::app::ConversationState;
use de_mls::session::ConversationState;
use openmls::prelude::tls_codec::Deserialize;
use openmls::prelude::*;

View File

@ -5,28 +5,29 @@
use alloy::signers::local::PrivateKeySigner;
use blake2::{Blake2b, Digest, digest::consts::U6};
use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload};
use de_mls::app::{ConsensusContext, ConversationConfig, SessionTick, User, UserPlugins};
use de_mls::core::{ConversationState, ScoringConfig, SessionEvent, StewardListConfig};
use de_mls::core::{
ConsensusPlugin, ConsensusServiceFor, ConversationEvent, ConversationPluginsFactory,
ConversationState, ScoringConfig, StewardListConfig,
};
use de_mls::defaults::{
DefaultConsensusPlugin, DefaultConversationPluginsFactory, MemoryDeMlsStorage,
};
use de_mls::ds::{APP_MSG_SUBTOPIC, DeliveryServiceError, InboundPacket, OutboundPacket};
use de_mls::member_id::MemberId;
use de_mls::mls_crypto::MlsCredentials;
use de_mls::protos::de_mls::messages::v1::{
AppMessage as AppMessageProto, MemberWelcome, app_message,
};
use de_mls::session::{Conversation, ConversationConfig, ConversationDeps};
use hashgraph_like_consensus::signing::EthereumConsensusSigner;
use libchat::WakeupService;
use prost::Message;
use rand::{self, Rng};
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::Duration;
use tracing::{debug, info, instrument};
use tracing::{info, instrument};
use crate::AccountId;
use crate::conversation::{ConversationIdRef, ExternalServices, ServiceContext};
use crate::inbox_v2::MlsIdentityProvider;
use crate::{
AddressedEncryptedPayload, ContentData, DeliveryService, RegistrationService,
conversation::{BaseConvo, BaseGroupConvo, ChatError, Id},
@ -55,89 +56,74 @@ impl MemberId for LocalDemlsMember {
}
}
#[derive(Debug)]
// This Maps a Demls::DeliveryService to a crate::service_traits::DeliveryService
// It works by caching outbound messages to a Vec which is eventually drained when
// The ServiceContext is available.
//
// All methods in Convo must call drain, to ensure that messages go out.
pub struct BufferDs {
queue: Vec<OutboundPacket>,
struct DemlsSetup {
member: LocalDemlsMember,
factory: DefaultConversationPluginsFactory,
consensus_storage: <DefaultConsensusPlugin as ConsensusPlugin>::ConsensusStorage,
consensus_signer: EthereumConsensusSigner,
app_id: Vec<u8>, // random bytes; echo-dedup key
config: ConversationConfig, // the ms-scale test timers, as before
}
impl BufferDs {
pub fn new() -> Self {
Self { queue: vec![] }
impl DemlsSetup {
fn new(identity_name: String) -> Result<Self, ChatError> {
let member = LocalDemlsMember::new(identity_name);
let credentials = Arc::new(MlsCredentials::from_member_id(&member)?);
let factory = DefaultConversationPluginsFactory::new(
Arc::new(MemoryDeMlsStorage::new()),
credentials,
);
// TODO(config): TEST-ONLY millisecond timers. de-mls deadlines are real
// wall-clock, so the default 60s timers never fire under fast virtual
// time. Production needs a real config injected from the caller, not
// these hardcoded values.
let config = ConversationConfig {
commit_inactivity_duration: Duration::from_millis(50),
freeze_duration: Duration::from_millis(20),
voting_delay: Duration::from_millis(30),
election_voting_delay: Duration::from_millis(30),
consensus_timeout: Duration::from_millis(150),
proposal_expiration: Duration::from_millis(2000),
..ConversationConfig::default()
};
Ok(DemlsSetup {
member,
factory,
consensus_storage: DefaultConsensusPlugin::new_storage(),
consensus_signer: EthereumConsensusSigner::new(PrivateKeySigner::random()),
app_id: rand_string(5).as_bytes().to_vec(),
config,
})
}
// Warn: Messages are not sent untill drain is called, which is after the return from User.
// If de-mls relies on interactive sends, this will not work.
pub fn drain<S: ExternalServices>(
&mut self,
service_ctx: &mut ServiceContext<S>,
) -> Result<(), ChatError> {
// Swap the Vec out; Own then existing and replace with a new empty vec.
for pkt in self.queue.drain(..) {
debug!(
app = pkt.app_id.as_slice(),
convo = pkt.conversation_id,
topic = pkt.subtopic,
pkt = pkt.payload.as_slice(),
"Draining"
);
let hash = Blake2b::<U6>::new()
.chain_update("delivery_addr|")
.chain_update(&pkt.conversation_id)
.finalize();
let delivery_address = hex::encode(hash);
// All Payloads leaving GroupV2 are a GroupV2Frame
let frame = GroupV2Frame {
payload: Some(GroupV2Payload::DeMlsWrapper(pkt.payload.into())),
sender_app_id: pkt.app_id.clone(), // pkt.app_id is the sender's User app_id
};
// Wrap in EncryptedPayload
let payload = AddressedEncryptedPayload {
// Note: Likely a mismatch herem as de-mls is expecting a specific topic.
delivery_address,
data: EncryptedPayload {
encryption: Some(encrypted_payload::Encryption::Plaintext(Plaintext {
payload: frame.encode_to_vec().into(),
})),
},
};
let env = payload.into_envelope(pkt.conversation_id.clone());
service_ctx.ds.publish(env).map_err(ChatError::generic)?;
/// Call exactly once per Conversation construction.
fn deps(
&self,
) -> ConversationDeps<'_, DefaultConsensusPlugin, DefaultConversationPluginsFactory> {
ConversationDeps {
plugins: &self.factory,
consensus: ConsensusServiceFor::<DefaultConsensusPlugin>::new_with_components(
self.consensus_storage.clone(),
DefaultConsensusPlugin::new_event_bus(),
self.consensus_signer.clone(),
10,
),
identity: &self.member,
app_id: Arc::from(self.app_id.as_slice()),
config: self.config.clone(),
scoring_config: ScoringConfig::default(),
steward_list_config: StewardListConfig::default(),
}
Ok(())
}
}
impl de_mls::ds::DeliveryService for BufferDs {
type Error = DeliveryServiceError;
fn publish(&mut self, packet: de_mls::ds::OutboundPacket) -> Result<(), Self::Error> {
info!(topic = packet.subtopic, "Publish");
self.queue.push(packet);
Ok(())
}
fn subscribe(&mut self, _delivery_address: &str) -> Result<(), Self::Error> {
todo!()
}
}
pub struct GroupV2Convo {
convo_id: String,
user: User<DefaultConsensusPlugin, DefaultConversationPluginsFactory>,
// DeMLS takes shared ownership over the DS, so its incompatible with the &mut ServiceContext
// Use a wrapper for now, and then look at refactoring.
buffer_ds: Arc<Mutex<BufferDs>>,
app_id: String,
setup: DemlsSetup,
conversation: Option<Conversation<DefaultConsensusPlugin, DefaultConversationPluginsFactory>>,
/// Member-ids we proposed via add_member. WelcomeReady now fires on
/// every member; we forward a welcome only to joiners WE invited.
pending_invites: Vec<Vec<u8>>,
}
impl std::fmt::Debug for GroupV2Convo {
@ -157,121 +143,58 @@ fn rand_string(n: usize) -> String {
}
impl GroupV2Convo {
/// Build a de-mls `User` (plugins + BufferDs transport) without starting
/// any conversation. Shared by `new` (creator) and `new_pending` (joiner).
fn build_demls(
identity_name: String,
) -> Result<
(
User<DefaultConsensusPlugin, DefaultConversationPluginsFactory>,
Arc<Mutex<BufferDs>>,
String,
),
ChatError,
> {
let identity = LocalDemlsMember::new(identity_name);
let credentials = Arc::new(MlsCredentials::from_member_id(&identity)?);
let storage = Arc::new(MemoryDeMlsStorage::new());
let conversation_plugins = DefaultConversationPluginsFactory::new(storage, credentials);
let consensus_signer = EthereumConsensusSigner::new(PrivateKeySigner::random());
let consensus = ConsensusContext::<DefaultConsensusPlugin>::new(consensus_signer);
// TODO(config): TEST-ONLY millisecond timers. de-mls deadlines are real
// wall-clock, so the default 60s timers never fire under fast virtual
// time. Production needs a real config injected from the caller, not
// these hardcoded values.
let conversation_config = ConversationConfig {
commit_inactivity_duration: Duration::from_millis(50),
freeze_duration: Duration::from_millis(20),
voting_delay: Duration::from_millis(30),
election_voting_delay: Duration::from_millis(30),
consensus_timeout: Duration::from_millis(150),
proposal_expiration: Duration::from_millis(2000),
..ConversationConfig::default()
};
let plugins = UserPlugins {
conversation_plugins,
consensus,
default_conversation_config: conversation_config,
default_scoring_config: ScoringConfig::default(),
default_steward_list_config: StewardListConfig::default(),
};
let transport = Arc::new(Mutex::new(BufferDs::new()));
let user = User::new_with_plugins(Box::new(identity), plugins, transport.clone());
Ok((user, transport, rand_string(5)))
}
pub fn new<S: ExternalServices>(
service_ctx: &mut ServiceContext<S>,
) -> Result<Self, ChatError> {
let setup = DemlsSetup::new(service_ctx.identity_provider.friendly_name())?;
let convo_id = rand_string(5);
let identity_name = service_ctx.identity_provider.friendly_name();
let (mut user, transport, app_id) = Self::build_demls(identity_name)?;
user.start_conversation(convo_id.as_str(), true)?;
// Ensure that the BufferDs gets drained
transport.lock().unwrap().drain(service_ctx)?;
Ok(Self {
let conversation = Conversation::create(&convo_id, setup.deps())?;
Ok(GroupV2Convo {
convo_id,
user,
buffer_ds: transport,
app_id,
setup,
conversation: Some(conversation),
pending_invites: vec![],
})
}
/// Joiner side: build a de-mls `User` and register its key package under
/// the account name, but do NOT start a conversation. `convo_id` stays
/// empty until [`Self::accept_welcome`] fills it.
/// Joiner side: register a fresh key package under the account name,
/// but do NOT start a conversation. `convo_id` stays empty until
/// [`Self::accept_welcome`] fills it.
pub fn new_pending<S: ExternalServices>(
service_ctx: &mut ServiceContext<S>,
) -> Result<Self, ChatError> {
let name = service_ctx.identity_provider.friendly_name();
let (user, transport, app_id) = Self::build_demls(name.clone())?;
let kp = user.generate_key_package()?;
let setup = DemlsSetup::new(name.clone())?;
let kp = setup.factory.generate_key_package()?;
service_ctx
.rs
.register(&name, kp.as_bytes().to_vec())
.map_err(ChatError::generic)?;
Ok(Self {
Ok(GroupV2Convo {
convo_id: String::new(),
user,
buffer_ds: transport,
app_id,
setup,
conversation: None,
pending_invites: vec![],
})
}
/// Joiner side: ingest a de-mls welcome handed over the InboxV2 1-1
/// channel. Attaches MLS (filling `convo_id`), replays the bundled
/// `ConversationSync`, then subscribes to the conversation address.
/// channel. `from_welcome` attaches MLS and applies the bundled
/// `ConversationSync` in one call; we then subscribe to the
/// conversation address and flush the join broadcast.
pub fn accept_welcome<S: ExternalServices>(
&mut self,
service_ctx: &mut ServiceContext<S>,
welcome: &MemberWelcome,
) -> Result<(), ChatError> {
let (convo_id, tick) = self.user.accept_welcome(&welcome.welcome_bytes)?;
self.convo_id = convo_id;
if !welcome.conversation_sync_bytes.is_empty() {
let pkt = InboundPacket::new(
welcome.conversation_sync_bytes.clone(),
APP_MSG_SUBTOPIC,
&self.convo_id,
self.user.app_id().to_vec(),
0,
);
self.user.process_inbound_packet(pkt)?;
}
let events = self.user.drain_events(&self.convo_id)?;
self.init(service_ctx)?;
self.after_op(service_ctx, tick, &events)
let conv = Conversation::from_welcome(self.setup.deps(), welcome)?
.ok_or_else(|| ChatError::generic("welcome not addressed to this member"))?;
self.convo_id = conv.id().to_string();
self.conversation = Some(conv);
self.init(service_ctx)?; // subscribe
self.after_op(service_ctx)?; // flush join broadcast + schedule wakeup
Ok(())
}
fn delivery_address_from_id(convo_id: &str) -> String {
@ -281,24 +204,6 @@ impl GroupV2Convo {
.finalize();
hex::encode(hash)
}
#[allow(unused)]
fn delivery_address(&self) -> String {
Self::delivery_address_from_id(&self.convo_id)
}
fn ctrl_delivery_address_from_id(convo_id: &str) -> String {
Self::delivery_address_from_id(convo_id)
}
#[allow(unused)]
fn ctrl_delivery_address(&self) -> String {
Self::ctrl_delivery_address_from_id(&self.convo_id)
}
// Needed by Demls
fn app_id(&self) -> &str {
&self.app_id
}
}
impl Id for GroupV2Convo {
@ -313,18 +218,10 @@ where
{
fn init(&self, service_ctx: &mut super::ServiceContext<S>) -> Result<(), ChatError> {
// Configure the delivery service to listen for the required delivery addresses.
service_ctx
.ds
.subscribe(&Self::delivery_address_from_id(&self.convo_id))
.map_err(ChatError::generic)?;
service_ctx
.ds
.subscribe(&Self::ctrl_delivery_address_from_id(&self.convo_id))
.map_err(ChatError::generic)?;
// Ensure that the BufferDs gets drained
self.buffer_ds.lock().unwrap().drain(service_ctx)?;
Ok(())
}
@ -334,13 +231,12 @@ where
service_ctx: &mut super::ServiceContext<S>,
content: &[u8],
) -> Result<(), ChatError> {
let _signer = MlsIdentityProvider(&service_ctx.identity_provider);
let tick = self
.user
.send_app_message(&self.convo_id, content.to_vec())?;
// Ensure that the BufferDs gets drained - done inside after_op
self.after_op(service_ctx, tick, &vec![])?;
let conv = self
.conversation
.as_mut()
.ok_or_else(|| ChatError::generic("conversation not found"))?;
conv.send_message(content.to_vec())?;
self.after_op(service_ctx)?;
Ok(())
}
@ -362,29 +258,30 @@ where
_ => return Ok(None),
};
// Fake a InboundPacket
let packet = InboundPacket {
payload: inner,
subtopic: APP_MSG_SUBTOPIC.to_string(), // Assume APP TOPIC, Welcome Messages go to InboxV2
conversation_id: self.convo_id.to_string(),
app_id: frame.sender_app_id,
timestamp: 0,
};
info!(len = packet.payload.len(), "Inbound Pkt");
let tick = self.user.process_inbound_packet(packet)?;
let events = self.user.drain_events(&self.convo_id)?;
let out = self.events_to_content(events.clone());
self.after_op(service_ctx, tick, &events)?;
Ok(out)
let conv = self
.conversation
.as_mut()
.ok_or_else(|| ChatError::generic("no conversation"))?;
conv.process_inbound(&frame.sender_app_id, &inner)?;
conv.poll();
let events = self.after_op(service_ctx)?; // route + publish + re-arm, returns events
Ok(self.events_to_content(&events))
}
#[instrument(name = "groupv2.wakeup", skip_all, fields(user_id = %ctx.identity_provider.friendly_name()))]
fn wakeup(&mut self, ctx: &mut ServiceContext<S>) -> Result<(), ChatError> {
info!(app = self.app_id(), "Wakeup");
let tick = self.user.poll_session(&self.convo_id)?;
let events = self.user.drain_events(&self.convo_id)?;
self.after_op(ctx, tick, &events)
info!(convo = %self.convo_id, "Wakeup");
let Some(conv) = self.conversation.as_mut() else {
return Ok(()); // pending joiner: no deadlines exist yet
};
let outcome = conv.poll();
if outcome.leave_requested {
// Commit ejected us (or join expired). Real handling - drops
// this convo from its map;
tracing::warn!(convo = %self.convo_id, "conversation requested teardown");
}
self.after_op(ctx)?; // publish what poll produced + re-arm alarm
Ok(())
}
}
@ -398,25 +295,38 @@ where
service_ctx: &mut ServiceContext<S>,
members: &[&AccountId],
) -> Result<(), ChatError> {
let mut last_tick = SessionTick {
next_wakeup_in: None,
};
// Record who WE invited before touching the conversation: after_op
// forwards a welcome only to joiners in pending_invites (member-id
// bytes == account name bytes for LocalDemlsMember).
let mut kps = Vec::with_capacity(members.len());
for member in members {
let kp_bytes = service_ctx
.rs
.retrieve(member)
.map_err(ChatError::generic)?
.ok_or_else(|| ChatError::generic("No key package"))?;
last_tick = self.user.add_member(&self.convo_id, &kp_bytes)?;
self.pending_invites
.push(member.as_str().as_bytes().to_vec());
kps.push(kp_bytes);
}
let events = self.user.drain_events(&self.convo_id)?;
self.after_op(service_ctx, last_tick, &events)
let conv = self
.conversation
.as_mut()
.ok_or_else(|| ChatError::generic("no conversation"))?;
for kp_bytes in &kps {
conv.add_member(kp_bytes)?;
}
self.after_op(service_ctx)?;
Ok(())
}
fn conversation_state(&self) -> Result<ConversationState, ChatError> {
self.user
.get_conversation_state(&self.convo_id)
.map_err(ChatError::DeMlsGeneric)
Ok(self
.conversation
.as_ref()
.map(|c| c.state())
.unwrap_or(ConversationState::PendingJoin))
}
}
@ -424,54 +334,70 @@ impl GroupV2Convo {
fn after_op<S: ExternalServices>(
&mut self,
service_ctx: &mut ServiceContext<S>,
tick: SessionTick,
events: &[SessionEvent],
) -> Result<(), ChatError> {
// Route each welcome to the joiners it names over their InboxV2 1-1
// channel. The welcome carries `joiner_identities` (member-id bytes =
// account name), so any node that commits an Add can address delivery
// — no local invite tracking, and batch Adds route correctly.
for evt in events {
if let SessionEvent::WelcomeReady(welcome) = evt {
) -> Result<Vec<ConversationEvent>, ChatError> {
let Some(conv) = self.conversation.as_ref() else {
return Ok(Vec::new()); // still pending join — nothing buffered
};
// Pull everything first (these are &self, take-all):
let events = conv.drain_events();
let outbound = conv.drain_outbound(); // Vec<de_mls::session::Outbound>
let wakeup = conv.next_wakeup_in();
// 1. Route welcomes for joiners WE invited (event fires on every member now).
for evt in &events {
if let ConversationEvent::WelcomeReady { welcome, .. } = evt {
for joiner in &welcome.joiner_identities {
let name = String::from_utf8(joiner.clone()).map_err(ChatError::generic)?;
let account = AccountId::new(name);
crate::inbox_v2::invite_user_v2(&mut service_ctx.ds, &account, welcome)?;
if let Some(i) = self.pending_invites.iter().position(|p| p == joiner) {
self.pending_invites.remove(i);
let name = String::from_utf8(joiner.clone()).map_err(ChatError::generic)?;
crate::inbox_v2::invite_user_v2(
&mut service_ctx.ds,
&AccountId::new(name),
welcome,
)?;
}
}
}
}
self.buffer_ds.lock().unwrap().drain(service_ctx)?;
if let Some(d) = tick.next_wakeup_in {
// TODO(chat): WakeupService is second-granularity but de-mls
// deadlines are sub-second; `as_secs().max(1)` floors them up to 1s,
// silently over-waiting. Needs a millisecond-capable wakeup.
// 2. Publish
for out in outbound {
let frame = GroupV2Frame {
payload: Some(GroupV2Payload::DeMlsWrapper(out.payload.into())),
sender_app_id: out.sender, // was pkt.app_id
};
let payload = AddressedEncryptedPayload {
delivery_address: Self::delivery_address_from_id(&out.conversation_id),
data: EncryptedPayload {
encryption: Some(encrypted_payload::Encryption::Plaintext(Plaintext {
payload: frame.encode_to_vec().into(),
})),
},
};
service_ctx
.ds
.publish(payload.into_envelope(out.conversation_id))
.map_err(ChatError::generic)?;
}
// 3. Re-arm the alarm with the conversation's earliest deadline.
if let Some(d) = wakeup {
service_ctx.wakeup_service.wakeup_in(d, &self.convo_id);
}
Ok(())
Ok(events)
}
fn events_to_content(&mut self, events: Vec<SessionEvent>) -> Option<ContentData> {
let mut latest: Option<ContentData> = None;
for evt in events {
match evt {
SessionEvent::AppMessage(AppMessageProto { payload: Some(p) }) => match p {
app_message::Payload::ConversationMessage(cm) => {
latest = Some(ContentData {
conversation_id: self.convo_id.clone().into(),
data: cm.message,
is_new_convo: false,
});
}
// All other types is an inside group traffic — not chat content.
_ => {}
},
_ => {}
}
}
latest
fn events_to_content(&self, events: &[ConversationEvent]) -> Option<ContentData> {
events.iter().find_map(|evt| match evt {
ConversationEvent::AppMessage(AppMessageProto {
payload: Some(app_message::Payload::ConversationMessage(cm)),
}) => Some(ContentData {
conversation_id: self.convo_id.clone().into(),
data: cm.message.clone(),
is_new_convo: false,
}),
_ => None,
})
}
}

View File

@ -1,4 +1,4 @@
use de_mls::{app::UserError, mls_crypto::MlsError};
use de_mls::{mls_crypto::MlsError, session::ConversationError};
use openmls::prelude::tls_codec;
pub use thiserror::Error;
@ -15,7 +15,7 @@ pub enum ChatError {
#[error("Demls: {0}")]
DemlsWrapped(#[from] MlsError),
#[error("Demls generic: {0}")]
DeMlsGeneric(#[from] UserError),
DeMlsGeneric(#[from] ConversationError),
}
impl ChatError {