Update de-mls dependency source and version; refactor GroupV2 to use MemberId and SessionEvent, and enhance BufferDs for welcome event handling.

This commit is contained in:
seemenkina 2026-05-25 17:00:06 +07:00
parent 5b6ee7d746
commit af84ce31b1
No known key found for this signature in database
3 changed files with 139 additions and 47 deletions

15
Cargo.lock generated
View File

@ -1881,9 +1881,10 @@ dependencies = [
[[package]]
name = "de-mls"
version = "3.0.0"
source = "git+https://github.com/vacp2p/de-mls#8be7a9ef1748fe0216bf7272ec4d034f26047122"
source = "git+https://github.com/vacp2p/de-mls?branch=main#deafb714d2bfc0c98af3f661161b92f5d621c306"
dependencies = [
"hashgraph-like-consensus",
"indexmap 2.14.0",
"openmls",
"openmls_basic_credential",
"openmls_rust_crypto 0.5.1",
@ -2187,7 +2188,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb"
dependencies = [
"libc",
"windows-sys 0.59.0",
"windows-sys 0.61.2",
]
[[package]]
@ -3850,7 +3851,7 @@ version = "0.50.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
dependencies = [
"windows-sys 0.59.0",
"windows-sys 0.61.2",
]
[[package]]
@ -5168,7 +5169,7 @@ dependencies = [
"errno",
"libc",
"linux-raw-sys 0.12.1",
"windows-sys 0.59.0",
"windows-sys 0.61.2",
]
[[package]]
@ -5225,7 +5226,7 @@ dependencies = [
"security-framework",
"security-framework-sys",
"webpki-root-certs",
"windows-sys 0.59.0",
"windows-sys 0.61.2",
]
[[package]]
@ -5887,7 +5888,7 @@ dependencies = [
"getrandom 0.4.2",
"once_cell",
"rustix 1.1.4",
"windows-sys 0.59.0",
"windows-sys 0.61.2",
]
[[package]]
@ -6613,7 +6614,7 @@ version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
dependencies = [
"windows-sys 0.59.0",
"windows-sys 0.61.2",
]
[[package]]

View File

@ -17,7 +17,7 @@ storage = { workspace = true }
# External dependencies (sorted)
alloy = "1.8.3"
chat-proto = { git = "https://github.com/logos-messaging/chat_proto", branch ="version_rollback" }
de-mls = { git = "https://github.com/vacp2p/de-mls" }
de-mls = { git = "https://github.com/vacp2p/de-mls", branch = "main"}
hashgraph-like-consensus = "0.4.0"
hex = "0.4.3"
openmls = "0.8.1"

View File

@ -17,15 +17,19 @@ macro_rules! run_async {
use alloy::signers::local::PrivateKeySigner;
use blake2::{Blake2b, Digest, digest::consts::U6};
use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload};
use de_mls::app::{ConsensusContext, ConversationConfig, User, UserPlugins};
use de_mls::core::{ScoringConfig, StewardListConfig};
use de_mls::app::{ConsensusContext, ConversationConfig, SessionTick, User, UserPlugins};
use de_mls::core::{ScoringConfig, SessionEvent, StewardListConfig};
use de_mls::defaults::{
DefaultConsensusPlugin, DefaultConversationPluginsFactory, MemoryDeMlsStorage,
};
use de_mls::ds::{APP_MSG_SUBTOPIC, DeliveryServiceError, InboundPacket, OutboundPacket};
use de_mls::identity::Identity;
use de_mls::member_id::MemberId;
use de_mls::mls_crypto::MlsCredentials;
use de_mls::protos::de_mls::messages::v1::{
AppMessage as AppMessageProto, MemberWelcome, app_message,
};
use hashgraph_like_consensus::signing::EthereumConsensusSigner;
use libchat::WakeupService;
use prost::Message;
use rand::{self, Rng};
use std::sync::{Arc, Mutex};
@ -40,24 +44,24 @@ use crate::{
const APP_NAME: &str = "sdkchat";
/// This is a Test Wrapper of Demls Identitity Trait
/// Linchat has its own trait that will need to be intergrated at somepoint.
pub struct LocalDemlsIdent {
/// This is a Test Wrapper of Demls MemberId Trait
/// Libchat has its own trait that will need to be intergrated at somepoint.
pub struct LocalDemlsMember {
name: String,
}
impl LocalDemlsIdent {
impl LocalDemlsMember {
pub fn new(name: impl Into<String>) -> Self {
Self { name: name.into() }
}
}
impl Identity for LocalDemlsIdent {
fn identity_bytes(&self) -> &[u8] {
impl MemberId for LocalDemlsMember {
fn member_id_bytes(&self) -> &[u8] {
self.name.as_bytes()
}
fn identity_display(&self) -> &str {
fn member_id_display(&self) -> &str {
&self.name
}
}
@ -70,11 +74,25 @@ impl Identity for LocalDemlsIdent {
// All methods in Convo must call drain, to ensure that messages go out.
pub struct BufferDs {
queue: Vec<OutboundPacket>,
welcomes: Vec<MemberWelcome>,
}
impl BufferDs {
pub fn new() -> Self {
Self { queue: vec![] }
Self {
queue: vec![],
welcomes: vec![],
}
}
/// Lift welcomes out of session events into the welcome queue.
/// Other event variants are ignored — they're not "things to send."
fn retrive_welcome_event(&mut self, events: &[SessionEvent]) {
for evt in events {
if let SessionEvent::WelcomeReady(w) = evt {
self.welcomes.push(w.clone());
}
}
}
// Warn: Messages are not sent untill drain is called, which is after the return from User.
@ -85,7 +103,11 @@ impl BufferDs {
) -> Result<(), ChatError> {
// Swap the Vec out; Own then existing and replace with a new empty vec.
for pkt in self.queue.drain(..) {
let delivery_address = pkt.delivery_address().to_string();
let hash = Blake2b::<U6>::new()
.chain_update("delivery_addr|")
.chain_update(&pkt.conversation_id)
.finalize();
let delivery_address = hex::encode(hash);
// All Payloads leaving GroupV2 are a GroupV2Frame
let frame = GroupV2Frame {
payload: Some(GroupV2Payload::DeMlsWrapper(pkt.payload.into())),
@ -102,11 +124,26 @@ impl BufferDs {
},
};
// TODO(libchat: "Verify payloads routing"):
// GroupV2Frame currently drops sender app_id. de-mls's
// process_inbound_packet uses app_id to filter self-messages — without
// round-tripping the sender's id through the frame, every inbound packet
// looks like a self-echo and gets dropped.
let env = payload.into_envelope(pkt.conversation_id.clone());
service_ctx.ds.publish(env).map_err(ChatError::generic)?;
}
// TODO: build proper convertion ao welcome bundle
// for w in self.welcomes.drain(..) {
// let envelope = build_inbox_welcome_envelope(w);
// service_ctx
// .ds
// .publish(envelope)
// .map_err(ChatError::generic)?;
// }
Ok(())
}
}
@ -157,10 +194,10 @@ impl GroupV2Convo {
let convo_id = rand_string(5);
let signer = PrivateKeySigner::random();
// let identity = WalletIdentity::from_wallet(signer.address());
let identity = LocalDemlsIdent::new(signer.address().to_string());
let identity = LocalDemlsMember::new(signer.address().to_string());
let credentials =
Arc::new(MlsCredentials::from_identity(&identity).map_err(ChatError::generic)?);
Arc::new(MlsCredentials::from_member_id(&identity).map_err(ChatError::generic)?);
let storage = Arc::new(MemoryDeMlsStorage::new());
let conversation_plugins = DefaultConversationPluginsFactory::new(storage, credentials);
@ -253,13 +290,18 @@ where
fn send_content(
&mut self,
service_ctx: &mut super::ServiceContext<S>,
_content: &[u8],
content: &[u8],
) -> Result<(), ChatError> {
let _signer = MlsIdentityProvider(&service_ctx.identity_provider);
// TODO: Send content
// Ensure that the BufferDs gets drained
self.buffer_ds.lock().unwrap().drain(service_ctx)?;
let tick = run_async!(
self.user
.send_app_message(&self.convo_id, content.to_vec())
.await
.unwrap()
);
// Ensure that the BufferDs gets drained - done inside after_op
self.after_op(service_ctx, tick, &vec![])?;
Ok(())
}
@ -284,20 +326,23 @@ where
timestamp: 0,
};
run_async!(self.user.process_inbound_packet(packet).await.unwrap());
// TODO: Return Content types; This is moving towards an event system soon, so ignore if getting
// the concrete Content is difficult
// Ensure that the BufferDs gets drained
self.buffer_ds.lock().unwrap().drain(service_ctx)?;
Ok(None)
let tick = run_async!(self.user.process_inbound_packet(packet).await.unwrap());
let events = self
.user
.drain_events(&self.convo_id)
.map_err(ChatError::generic)?;
let out = self.events_to_content(events.clone());
self.after_op(service_ctx, tick, &events)?;
Ok(out)
}
fn wakeup(&mut self, _service_ctx: &mut ServiceContext<S>) -> Result<(), ChatError> {
// todo!()
Ok(())
fn wakeup(&mut self, ctx: &mut ServiceContext<S>) -> Result<(), ChatError> {
let tick = run_async!(self.user.poll_session(&self.convo_id).await.unwrap());
let events = self
.user
.drain_events(&self.convo_id)
.map_err(ChatError::generic)?;
self.after_op(ctx, tick, &events)
}
}
@ -310,23 +355,69 @@ where
service_ctx: &mut ServiceContext<S>,
members: &[&AccountId],
) -> Result<(), ChatError> {
let mut last_tick = SessionTick {
next_wakeup_in: None,
};
for member in members {
let key_package_opt = service_ctx
let kp_bytes = service_ctx
.rs
.retrieve(member)
.map_err(ChatError::generic)?
.ok_or_else(|| ChatError::generic("No key package"))?;
last_tick = run_async!(self.user.add_member(&self.convo_id, &kp_bytes).await)
.map_err(ChatError::generic)?;
// TODO(libchat: "Parse welcomes and create GroupV2"):
// remember `member` so we can route the eventual WelcomeReady
// event to its delivery_address. Needs API decision with libchat.
}
let events = self
.user
.drain_events(&self.convo_id)
.map_err(ChatError::generic)?;
self.after_op(service_ctx, last_tick, &events)
}
}
let Some(_key_package_bytes) = key_package_opt else {
return Err(ChatError::generic("No Keypackage"));
};
impl GroupV2Convo {
fn after_op<S: ExternalServices>(
&self,
service_ctx: &mut ServiceContext<S>,
tick: SessionTick,
events: &[SessionEvent],
) -> Result<(), ChatError> {
let mut buf = self.buffer_ds.lock().unwrap();
buf.retrive_welcome_event(events);
buf.drain(service_ctx)?;
drop(buf);
if let Some(d) = tick.next_wakeup_in {
service_ctx
.wakeup_service
.wakeup_in(d.as_secs().max(1) as u32, &self.convo_id);
}
Ok(())
}
// todo!("Implement function which adds member via Keypackage");
fn events_to_content(&mut self, events: Vec<SessionEvent>) -> Option<ContentData> {
let mut latest: Option<ContentData> = None;
for evt in events {
match evt {
SessionEvent::AppMessage(AppMessageProto { payload: Some(p) }) => match p {
app_message::Payload::ConversationMessage(cm) => {
latest = Some(ContentData {
conversation_id: self.convo_id.clone().into(),
data: cm.message,
is_new_convo: false,
});
}
// All other types is an inside group traffic — not chat content.
_ => {}
},
_ => {}
}
}
// Ensure that the BufferDs gets drained
self.buffer_ds.lock().unwrap().drain(service_ctx)?;
Ok(())
latest
}
}