Add scaffold for de-mls integration

This commit is contained in:
Jazz Turner-Baggs 2026-05-21 14:20:12 -07:00
parent 00de6d8e75
commit 5b6ee7d746
No known key found for this signature in database
14 changed files with 3952 additions and 307 deletions

3500
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -15,14 +15,14 @@ storage = { workspace = true }
# External dependencies (sorted)
base64 = "0.22"
chat-proto = { git = "https://github.com/logos-messaging/chat_proto" }
chat-proto = { git = "https://github.com/logos-messaging/chat_proto", branch ="version_rollback" }
double-ratchets = { path = "../double-ratchets" }
hex = "0.4.3"
openmls = { version = "0.8.1", features = ["libcrux-provider"] }
openmls_libcrux_crypto = "0.3.1"
openmls_memory_storage = "0.5.0"
openmls_traits = "0.5.0"
prost = "0.14.1"
prost = "0.13.5"
rand_core = { version = "0.6" }
safer-ffi = "0.1.13"
thiserror = "2.0.17"

View File

@ -14,6 +14,6 @@ pub use chat_sqlite::StorageConfig;
pub use context::{Context, ConversationId, ConversationIdOwned, Introduction};
pub use conversation::GroupConvo;
pub use errors::ChatError;
pub use service_traits::{DeliveryService, IdentityProvider, RegistrationService};
pub use service_traits::{DeliveryService, IdentityProvider, RegistrationService, WakeupService};
pub use types::{AccountId, AddressedEncryptedPayload, AddressedEnvelope, ContentData};
pub use utils::hex_trunc;

View File

@ -5,7 +5,10 @@ use std::{fmt::Debug, fmt::Display};
use crypto::{Ed25519Signature, Ed25519VerifyingKey};
use crate::types::{AccountId, AddressedEnvelope};
use crate::{
ConversationId,
types::{AccountId, AddressedEnvelope},
};
/// A Delivery service is responsible for payload transport.
/// This interface allows Conversations to send payloads on the wire as well as
@ -65,3 +68,7 @@ impl<T: IdentityProvider> IdentityProvider for &T {
(**self).public_key()
}
}
pub trait WakeupService: Debug {
fn wakeup_in(&mut self, secs: u32, convo_id: ConversationId);
}

View File

@ -15,11 +15,18 @@ libchat = { workspace = true }
storage = { workspace = true }
# External dependencies (sorted)
chat-proto = { git = "https://github.com/logos-messaging/chat_proto" }
thiserror = "2.0.18"
prost = "0.14.3"
alloy = "1.8.3"
chat-proto = { git = "https://github.com/logos-messaging/chat_proto", branch ="version_rollback" }
de-mls = { git = "https://github.com/vacp2p/de-mls" }
hashgraph-like-consensus = "0.4.0"
hex = "0.4.3"
openmls = "0.8.1"
openmls_libcrux_crypto = "0.3.1"
openmls_memory_storage = "0.5.0"
openmls_rust_crypto = "0.5.0"
openmls_traits = "0.5.0"
prost = "0.13.5"
rand = "0.9"
thiserror = "2.0.18"
tokio = "1.52.3"
tracing = "0.1.44"

View File

@ -3,13 +3,13 @@ mod group_v2;
use crate::{AccountId, ContentData, DeliveryService, RegistrationService};
use chat_proto::logoschat::encryption::EncryptedPayload;
use libchat::IdentityProvider;
use libchat::{IdentityProvider, WakeupService};
use std::fmt::Debug;
pub use crate::ChatError;
pub use group_v1::GroupV1Convo;
pub use group_v2::GroupV2Convo;
pub type ConversationIdRef<'a> = &'a str;
pub type ConversationId = String;
@ -20,6 +20,7 @@ pub trait ExternalServices: Debug {
type IP: IdentityProvider;
type DS: DeliveryService;
type RS: RegistrationService;
type WS: WakeupService;
}
#[derive(Debug)]
@ -27,14 +28,16 @@ pub struct ServiceContext<S: ExternalServices> {
pub(crate) identity_provider: S::IP,
pub(crate) ds: S::DS,
pub(crate) rs: S::RS,
pub(crate) wakeup_service: S::WS,
}
impl<S: ExternalServices> ServiceContext<S> {
pub fn new(identity_provider: S::IP, ds: S::DS, rs: S::RS) -> Self {
pub fn new(identity_provider: S::IP, ds: S::DS, rs: S::RS, wakeup_service: S::WS) -> Self {
ServiceContext {
identity_provider,
ds,
rs,
wakeup_service,
}
}
}
@ -57,6 +60,8 @@ pub trait BaseConvo<S: ExternalServices>: Id + Debug {
service_ctx: &mut ServiceContext<S>,
enc_payload: EncryptedPayload,
) -> Result<Option<ContentData>, ChatError>;
fn wakeup(&mut self, service_ctx: &mut ServiceContext<S>) -> Result<(), ChatError>;
}
pub trait BaseGroupConvo<S: ExternalServices>: BaseConvo<S> {

View File

@ -226,6 +226,10 @@ where
}
}
}
fn wakeup(&mut self, _service_ctx: &mut ServiceContext<S>) -> Result<(), ChatError> {
Ok(())
}
}
impl<S, MP> BaseGroupConvo<S> for GroupV1Convo<MP>

View File

@ -1,29 +1,138 @@
/// GroupV2 is a conversationType which provides effecient handling of multiple participants
/// Properties:
/// - Harvest Now Decrypt Later (HNDL) protection provided by XWING
/// - Multiple
use std::cell::RefCell;
use std::rc::Rc;
// This Implementation is a Quick and Dirty Integration of DeMLS into libchat.
// DeMLS and Libchat have different execution models, trait definitions and ownership/lifetimes of objects.
// The easies path is to do a Spike to see what it would take, gather the friction points and then iterate.
//
// Since de-mls::user contains the state-machine and is Async the easiest path is to generate async runtimes
// for each call. This is inefficient but requres the lease amount of effort.
// Expect this branch to not be merged.
macro_rules! run_async {
($expr:expr) => {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { $expr })
};
}
use alloy::signers::local::PrivateKeySigner;
use blake2::{Blake2b, Digest, digest::consts::U6};
use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload};
use openmls::prelude::tls_codec::Deserialize;
use openmls::prelude::*;
use de_mls::app::{ConsensusContext, ConversationConfig, User, UserPlugins};
use de_mls::core::{ScoringConfig, StewardListConfig};
use de_mls::defaults::{
DefaultConsensusPlugin, DefaultConversationPluginsFactory, MemoryDeMlsStorage,
};
use de_mls::ds::{APP_MSG_SUBTOPIC, DeliveryServiceError, InboundPacket, OutboundPacket};
use de_mls::identity::Identity;
use de_mls::mls_crypto::MlsCredentials;
use hashgraph_like_consensus::signing::EthereumConsensusSigner;
use prost::Message;
use rand::{self, Rng};
use std::sync::{Arc, Mutex};
use crate::AccountId;
use crate::conversation::{ConversationIdRef, ServiceContext};
use crate::inbox_v2::{MlsIdentityProvider, MlsProvider};
use crate::conversation::{ConversationIdRef, ExternalServices, ServiceContext};
use crate::inbox_v2::MlsIdentityProvider;
use crate::{
AddressedEncryptedPayload, ContentData, DeliveryService, IdentityProvider, RegistrationService,
AddressedEncryptedPayload, ContentData, DeliveryService, RegistrationService,
conversation::{BaseConvo, BaseGroupConvo, ChatError, Id},
};
pub struct GroupV2Convo<MP: MlsProvider> {
mls_provider: Rc<RefCell<MP>>,
convo_id: String,
const APP_NAME: &str = "sdkchat";
/// This is a Test Wrapper of Demls Identitity Trait
/// Linchat has its own trait that will need to be intergrated at somepoint.
pub struct LocalDemlsIdent {
name: String,
}
impl<MP: MlsProvider> std::fmt::Debug for GroupV2Convo<MP> {
impl LocalDemlsIdent {
pub fn new(name: impl Into<String>) -> Self {
Self { name: name.into() }
}
}
impl Identity for LocalDemlsIdent {
fn identity_bytes(&self) -> &[u8] {
self.name.as_bytes()
}
fn identity_display(&self) -> &str {
&self.name
}
}
#[derive(Debug)]
// This Maps a Demls::DeliveryService to a crate::service_traits::DeliveryService
// It works by caching outbound messages to a Vec which is eventually drained when
// The ServiceContext is available.
//
// All methods in Convo must call drain, to ensure that messages go out.
pub struct BufferDs {
queue: Vec<OutboundPacket>,
}
impl BufferDs {
pub fn new() -> Self {
Self { queue: vec![] }
}
// Warn: Messages are not sent untill drain is called, which is after the return from User.
// If de-mls relies on interactive sends, this will not work.
pub fn drain<S: ExternalServices>(
&mut self,
service_ctx: &mut ServiceContext<S>,
) -> Result<(), ChatError> {
// Swap the Vec out; Own then existing and replace with a new empty vec.
for pkt in self.queue.drain(..) {
let delivery_address = pkt.delivery_address().to_string();
// All Payloads leaving GroupV2 are a GroupV2Frame
let frame = GroupV2Frame {
payload: Some(GroupV2Payload::DeMlsWrapper(pkt.payload.into())),
};
// Wrap in EncryptedPayload
let payload = AddressedEncryptedPayload {
// Note: Likely a mismatch herem as de-mls is expecting a specific topic.
delivery_address,
data: EncryptedPayload {
encryption: Some(encrypted_payload::Encryption::Plaintext(Plaintext {
payload: frame.encode_to_vec().into(),
})),
},
};
let env = payload.into_envelope(pkt.conversation_id.clone());
service_ctx.ds.publish(env).map_err(ChatError::generic)?;
}
Ok(())
}
}
impl de_mls::ds::DeliveryService for BufferDs {
type Error = DeliveryServiceError;
fn publish(&mut self, packet: de_mls::ds::OutboundPacket) -> Result<(), Self::Error> {
self.queue.push(packet);
Ok(())
}
fn subscribe(&mut self, _delivery_address: &str) -> Result<(), Self::Error> {
todo!()
}
}
pub struct GroupV2Convo {
convo_id: String,
user: User<DefaultConsensusPlugin, DefaultConversationPluginsFactory>,
// DeMLS takes shared ownership over the DS, so its incompatible with the &mut ServiceContext
// Use a wrapper for now, and then look at refactoring.
buffer_ds: Arc<Mutex<BufferDs>>,
}
impl std::fmt::Debug for GroupV2Convo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GroupV2Convo")
.field("convo_id", &self.convo_id)
@ -31,35 +140,60 @@ impl<MP: MlsProvider> std::fmt::Debug for GroupV2Convo<MP> {
}
}
impl<MP: MlsProvider> GroupV2Convo<MP> {
// Create a new conversation with the creator as the only participant.
pub fn new<IP: IdentityProvider>(
identity_provider: MlsIdentityProvider<IP>,
mls_provider: Rc<RefCell<MP>>,
fn rand_string(n: usize) -> String {
rand::rng()
.sample_iter(rand::distr::Alphanumeric)
.take(n)
.map(char::from)
.collect()
}
impl GroupV2Convo {
pub fn new<S: ExternalServices>(
service_ctx: &mut ServiceContext<S>,
) -> Result<Self, ChatError> {
// let config = Self::mls_create_config();
// let credential = identity_provider.get_credential();
// Create new instances of all the dependencies that User needs.
// Once working, these can be moved to be shared across different convo instances.
let convo_id = rand_string(5);
let signer = PrivateKeySigner::random();
// let identity = WalletIdentity::from_wallet(signer.address());
let identity = LocalDemlsIdent::new(signer.address().to_string());
todo!();
}
let credentials =
Arc::new(MlsCredentials::from_identity(&identity).map_err(ChatError::generic)?);
let storage = Arc::new(MemoryDeMlsStorage::new());
let conversation_plugins = DefaultConversationPluginsFactory::new(storage, credentials);
// Constructs a new conversation upon receiving a MlsWelcome message.
pub fn new_from_welcome(
mls_provider: Rc<RefCell<MP>>,
welcome: Welcome,
) -> Result<Self, ChatError> {
todo!()
}
let consensus_signer = EthereumConsensusSigner::new(signer);
let consensus = ConsensusContext::<DefaultConsensusPlugin>::new(consensus_signer);
fn mls_create_config() -> MlsGroupCreateConfig {
MlsGroupCreateConfig::builder()
.ciphersuite(Ciphersuite::MLS_256_XWING_CHACHA20POLY1305_SHA256_Ed25519)
.use_ratchet_tree_extension(true) // This is handy for now, until there is central store for this data
.build()
}
let plugins = UserPlugins {
conversation_plugins,
consensus,
default_conversation_config: ConversationConfig::default(),
default_scoring_config: ScoringConfig::default(),
default_steward_list_config: StewardListConfig::default(),
};
fn mls_join_config() -> MlsGroupJoinConfig {
MlsGroupJoinConfig::builder().build()
let ds = BufferDs::new();
let transport = Arc::new(Mutex::new(ds));
let mut user = User::new_with_plugins(Box::new(identity), plugins, transport.clone());
run_async!(
user.start_conversation(convo_id.as_str(), true)
.await
.unwrap()
);
// Ensure that the BufferDs gets drained
transport.lock().unwrap().drain(service_ctx)?;
Ok(Self {
convo_id,
user,
buffer_ds: transport,
})
}
fn delivery_address_from_id(convo_id: &str) -> String {
@ -70,6 +204,7 @@ impl<MP: MlsProvider> GroupV2Convo<MP> {
hex::encode(hash)
}
#[allow(unused)]
fn delivery_address(&self) -> String {
Self::delivery_address_from_id(&self.convo_id)
}
@ -77,30 +212,28 @@ impl<MP: MlsProvider> GroupV2Convo<MP> {
fn ctrl_delivery_address_from_id(convo_id: &str) -> String {
Self::delivery_address_from_id(convo_id)
}
#[allow(unused)]
fn ctrl_delivery_address(&self) -> String {
Self::ctrl_delivery_address_from_id(&self.convo_id)
}
// Needed by Demls
fn app_id(&self) -> &str {
APP_NAME
}
}
impl<MP> Id for GroupV2Convo<MP>
where
MP: MlsProvider,
{
impl Id for GroupV2Convo {
fn id(&self) -> ConversationIdRef<'_> {
&self.convo_id
}
}
impl<IP, MP, DS, RS> BaseConvo<IP, DS, RS> for GroupV2Convo<MP>
impl<S> BaseConvo<S> for GroupV2Convo
where
IP: IdentityProvider,
MP: MlsProvider,
DS: DeliveryService,
RS: RegistrationService,
// KP: RegistrationService,
S: ExternalServices,
{
fn init(&self, service_ctx: &mut super::ServiceContext<IP, DS, RS>) -> Result<(), ChatError> {
fn init(&self, service_ctx: &mut super::ServiceContext<S>) -> Result<(), ChatError> {
// Configure the delivery service to listen for the required delivery addresses.
service_ctx
@ -112,22 +245,27 @@ where
.subscribe(&Self::ctrl_delivery_address_from_id(&self.convo_id))
.map_err(ChatError::generic)?;
// Ensure that the BufferDs gets drained
self.buffer_ds.lock().unwrap().drain(service_ctx)?;
Ok(())
}
fn send_content(
&mut self,
service_ctx: &mut super::ServiceContext<IP, DS, RS>,
content: &[u8],
service_ctx: &mut super::ServiceContext<S>,
_content: &[u8],
) -> Result<(), ChatError> {
let signer = MlsIdentityProvider(&service_ctx.identity_provider);
let _signer = MlsIdentityProvider(&service_ctx.identity_provider);
todo!();
// TODO: Send content
// Ensure that the BufferDs gets drained
self.buffer_ds.lock().unwrap().drain(service_ctx)?;
Ok(())
}
fn handle_frame(
&mut self,
_service_ctx: &mut super::ServiceContext<IP, DS, RS>,
service_ctx: &mut super::ServiceContext<S>,
encoded_payload: EncryptedPayload,
) -> Result<Option<ContentData>, ChatError> {
let bytes = match encoded_payload.encryption {
@ -137,67 +275,71 @@ where
}
};
todo!()
}
}
impl<IP, MP, DS, RS> BaseGroupConvo<IP, DS, RS> for GroupV2Convo<MP>
where
IP: IdentityProvider,
MP: MlsProvider,
DS: DeliveryService,
RS: RegistrationService,
{
// add_members returns:
// commit — the Commit message Alice broadcasts to all members
// welcome — the Welcome message sent privately to each new joiner
// _group_info — used for external joins; ignore for now
fn add_member(
&mut self,
service_ctx: &mut ServiceContext<IP, DS, RS>,
members: &[&AccountId],
) -> Result<(), ChatError> {
let mls_provider = &*self.mls_provider.borrow();
if members.len() > 50 {
// This is a temporary limit that originates from the the De-MLS epoch time.
return Err(ChatError::generic(
"Cannot add more than 50 Members at a time",
));
}
if members.is_empty() {
return Ok(());
}
todo!();
}
}
impl<MP: MlsProvider> GroupV2Convo<MP> {
fn key_package_for_account<
IP: IdentityProvider,
DS: DeliveryService,
RS: RegistrationService,
>(
&self,
service_ctx: &mut ServiceContext<IP, DS, RS>,
ident: &AccountId,
) -> Result<KeyPackage, ChatError> {
let retrieved_bytes = service_ctx
.rs
.retrieve(ident)
.map_err(|e: RS::Error| ChatError::Generic(e.to_string()))?;
// dbg!(ctx.contact_registry());
let Some(keypkg_bytes) = retrieved_bytes else {
return Err(ChatError::generic("Group Contact Not Found"));
// Fake a InboundPacket
let packet = InboundPacket {
payload: bytes.to_vec(),
subtopic: APP_MSG_SUBTOPIC.to_string(), // Assume APP TOPIC, Welcome Messages go to InboxV2
conversation_id: self.convo_id.to_string(),
app_id: self.app_id().as_bytes().to_vec(),
timestamp: 0,
};
let key_package_in = KeyPackageIn::tls_deserialize(&mut keypkg_bytes.as_slice())?;
let keypkg = key_package_in
.validate(self.mls_provider.borrow().crypto(), ProtocolVersion::Mls10)
.map_err(ChatError::generic)?; //TODO: P3 - Hardcoded Protocol Version
Ok(keypkg)
run_async!(self.user.process_inbound_packet(packet).await.unwrap());
// TODO: Return Content types; This is moving towards an event system soon, so ignore if getting
// the concrete Content is difficult
// Ensure that the BufferDs gets drained
self.buffer_ds.lock().unwrap().drain(service_ctx)?;
Ok(None)
}
fn wakeup(&mut self, _service_ctx: &mut ServiceContext<S>) -> Result<(), ChatError> {
// todo!()
Ok(())
}
}
impl<S> BaseGroupConvo<S> for GroupV2Convo
where
S: ExternalServices,
{
fn add_member(
&mut self,
service_ctx: &mut ServiceContext<S>,
members: &[&AccountId],
) -> Result<(), ChatError> {
for member in members {
let key_package_opt = service_ctx
.rs
.retrieve(member)
.map_err(ChatError::generic)?;
let Some(_key_package_bytes) = key_package_opt else {
return Err(ChatError::generic("No Keypackage"));
};
// todo!("Implement function which adds member via Keypackage");
}
// Ensure that the BufferDs gets drained
self.buffer_ds.lock().unwrap().drain(service_ctx)?;
Ok(())
}
}
use prost::{Oneof, bytes::Bytes};
#[derive(Clone, PartialEq, Message)]
pub struct GroupV2Frame {
#[prost(oneof = "GroupV2Payload", tags = "1")]
pub payload: Option<GroupV2Payload>,
}
#[derive(Clone, PartialEq, Oneof)]
pub enum GroupV2Payload {
#[prost(message, tag = "2")]
DeMlsWrapper(Bytes),
}

View File

@ -12,7 +12,7 @@ use crate::{AccountId, errors::ChatError};
use crate::{DeliveryService, IdentityProvider, RegistrationService};
use chat_proto::logoschat::encryption::EncryptedPayload;
use chat_proto::logoschat::envelope::EnvelopeV1;
use libchat::ContentData;
use libchat::{ContentData, WakeupService};
use prost::Message;
use storage::ChatStore;
@ -53,35 +53,45 @@ where
// This allows the ExternalServices trait to be converted from a tuple.
// This is used in CoreClient to convert from the explicit impls to a
// ExternalServices bundle, which means it does not have to be exposed externally.
impl<IP, DS, RS> ExternalServices for (IP, DS, RS)
impl<IP, DS, RS, WS> ExternalServices for (IP, DS, RS, WS)
where
IP: IdentityProvider + Debug,
DS: DeliveryService + Debug,
RS: RegistrationService + Debug,
WS: WakeupService + Debug,
{
type IP = IP;
type DS = DS;
type RS = RS;
type WS = WS;
}
pub struct CoreClient<
IP: IdentityProvider,
DS: DeliveryService,
RS: RegistrationService,
WS: WakeupService,
CS: ChatStore,
> {
inner: Rc<RefCell<InnerClient<(IP, DS, RS), CS>>>,
inner: Rc<RefCell<InnerClient<(IP, DS, RS, WS), CS>>>,
}
impl<IP, DS, RS, CS> CoreClient<IP, DS, RS, CS>
impl<IP, DS, RS, WS, CS> CoreClient<IP, DS, RS, WS, CS>
where
IP: IdentityProvider,
DS: DeliveryService,
RS: RegistrationService,
WS: WakeupService,
CS: ChatStore + 'static,
{
pub fn new(account: IP, delivery: DS, registration: RS, store: CS) -> Result<Self, ChatError> {
let c = InnerClient::new(account, delivery, registration, store)?;
pub fn new(
account: IP,
delivery: DS,
registration: RS,
wakeup: WS,
store: CS,
) -> Result<Self, ChatError> {
let c = InnerClient::new(account, delivery, registration, wakeup, store)?;
Ok(Self {
inner: Rc::new(RefCell::new(c)),
})
@ -98,7 +108,7 @@ where
pub fn create_group_convo(
&self,
participants: &[&AccountId],
) -> Result<GroupConvo<(IP, DS, RS), CS>, ChatError> {
) -> Result<GroupConvo<(IP, DS, RS, WS), CS>, ChatError> {
let convo_id = self.inner.borrow_mut().create_group_convo(participants)?;
Ok(GroupConvo {
client: self.inner.clone(),
@ -122,7 +132,7 @@ where
self.inner.borrow_mut().handle_payload(payload)
}
pub fn convo(&self, convo_id: ConversationIdRef) -> Option<GroupConvo<(IP, DS, RS), CS>> {
pub fn convo(&self, convo_id: ConversationIdRef) -> Option<GroupConvo<(IP, DS, RS, WS), CS>> {
let client = self.inner.clone();
if !client.borrow().has_conversation(convo_id) {
@ -134,6 +144,29 @@ where
convo_id: convo_id.to_string(),
})
}
pub fn on_wakeup(&self, convo_id: &str) -> Result<(), ChatError> {
self.inner.borrow_mut().wakeup(convo_id)
}
pub fn ws(&self) -> RefMut<'_, WS> {
RefMut::map(self.inner.borrow_mut(), |c| c.ws())
}
}
impl<IP, DS, RS, WS, CS> Clone for CoreClient<IP, DS, RS, WS, CS>
where
IP: IdentityProvider,
DS: DeliveryService,
RS: RegistrationService,
WS: WakeupService,
CS: ChatStore,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
struct InnerClient<S: ExternalServices, CS: ChatStore> {
@ -155,13 +188,14 @@ where
account: S::IP,
delivery: S::DS,
registration: S::RS,
wakeup: S::WS,
store: CS,
) -> Result<Self, ChatError> {
// Services for sharing with Converastions/Inboxes
// let mut service_ctx: ServiceContext<S> = ServiceContext::new(account, delivery, registration);
let mut service_ctx: ServiceContext<S> =
ServiceContext::new(account, delivery, registration);
ServiceContext::new(account, delivery, registration, wakeup);
// let contact_registry = Rc::new(RefCell::new(registration));
let _store = Rc::new(RefCell::new(store));
@ -187,13 +221,17 @@ where
&mut self.service_ctx.ds
}
pub fn ws(&mut self) -> &mut S::WS {
&mut self.service_ctx.wakeup_service
}
/// Returns the unique identifier associated with the account
pub fn account_id(&self) -> &AccountId {
self.pq_inbox.account_id()
}
pub fn create_group_convo(&mut self, participants: &[&AccountId]) -> Result<String, ChatError> {
let convo = self.pq_inbox.create_group_v1(&mut self.service_ctx)?;
let convo = self.pq_inbox.create_group_v2(&mut self.service_ctx)?;
let mut convo: Box<dyn BaseGroupConvo<S>> = Box::new(convo);
convo.init(&mut self.service_ctx)?;
convo.add_member(&mut self.service_ctx, participants)?;
@ -279,4 +317,25 @@ where
None => Ok(()),
}
}
pub fn wakeup(&mut self, convo_id: ConversationIdRef) -> Result<(), ChatError> {
match convo_id {
c if c == self.pq_inbox.id() => todo!(),
c if self.cached_convos.contains_key(c) => self.wakeup_convo(c),
_ => Ok(()),
}
}
// Dispatch encrypted payload to its corresponding conversation
fn wakeup_convo(&mut self, convo_id: ConversationIdRef) -> Result<(), ChatError> {
let Some(convo) = self.cached_convos.get_mut(convo_id) else {
return Err(ChatError::generic("No Convo Found"));
};
let convo = match convo {
// ConvoTypeOwned::Pairwise(_) => todo!(),
ConvoTypeOwned::Group(c) => c.as_mut(),
};
convo.wakeup(&mut self.service_ctx)
}
}

View File

@ -1,3 +1,4 @@
use de_mls::mls_crypto::MlsError;
use openmls::prelude::tls_codec;
pub use thiserror::Error;
@ -11,6 +12,8 @@ pub enum ChatError {
ProtobufDecodeError(#[from] prost::DecodeError),
#[error("delivery: {0}")]
Delivery(String),
#[error("Demls: {0}")]
DemlsWrapped(#[from] MlsError),
}
impl ChatError {

View File

@ -21,6 +21,7 @@ use crate::IdentityProvider;
use crate::RegistrationService;
use crate::conversation::BaseConvo;
use crate::conversation::ExternalServices;
use crate::conversation::GroupV2Convo;
use crate::conversation::ServiceContext;
use crate::conversation::{GroupV1Convo, Id};
use crate::utils::{blake2b_hex, hash_size};
@ -214,6 +215,7 @@ impl<CS: ChatStore> InboxV2<CS> {
.map_err(ChatError::generic)
}
#[allow(unused)]
pub fn create_group_v1<S: ExternalServices>(
&self,
service_ctx: &mut ServiceContext<S>,
@ -222,6 +224,13 @@ impl<CS: ChatStore> InboxV2<CS> {
GroupV1Convo::new(mls_ident, self.mls_provider.clone())
}
pub fn create_group_v2<S: ExternalServices>(
&self,
service_ctx: &mut ServiceContext<S>,
) -> Result<GroupV2Convo, ChatError> {
GroupV2Convo::new(service_ctx)
}
fn create_keypackage<IP: IdentityProvider>(
&self,
signer: &MlsIdentityProvider<IP>,

View File

View File

@ -18,3 +18,7 @@ core_client = {path = "../core_client"}
# External dependencies (sorted)
tempfile = "3"
[dependencies]
tracing = "0.1"
tracing-subscriber = "0.3.23"

View File

@ -1,19 +1,34 @@
use std::cell::RefCell;
use std::ops::{Deref, DerefMut};
use std::rc::Rc;
use tracing::info;
use components::{EphemeralRegistry, LocalBroadcaster, MemStore};
use core_client::{ChatError, CoreClient};
use libchat::{ContentData, hex_trunc};
use core_client::CoreClient;
use libchat::{ContentData, WakeupService, hex_trunc};
use logos_account::TestLogosAccount;
struct PollableClient {
inner: CoreClient<TestLogosAccount, LocalBroadcaster, EphemeralRegistry, MemStore>,
inner: CoreClient<
TestLogosAccount,
LocalBroadcaster,
EphemeralRegistry,
ManualWakeupService,
MemStore,
>,
on_content: Option<Box<dyn Fn(ContentData)>>,
}
impl PollableClient {
fn init(
ctx: CoreClient<TestLogosAccount, LocalBroadcaster, EphemeralRegistry, MemStore>,
ctx: CoreClient<
TestLogosAccount,
LocalBroadcaster,
EphemeralRegistry,
ManualWakeupService,
MemStore,
>,
cb: Option<impl Fn(ContentData) + 'static>,
) -> Self {
Self {
@ -36,7 +51,13 @@ impl PollableClient {
}
impl Deref for PollableClient {
type Target = CoreClient<TestLogosAccount, LocalBroadcaster, EphemeralRegistry, MemStore>;
type Target = CoreClient<
TestLogosAccount,
LocalBroadcaster,
EphemeralRegistry,
ManualWakeupService,
MemStore,
>;
fn deref(&self) -> &Self::Target {
&self.inner
@ -49,9 +70,80 @@ impl DerefMut for PollableClient {
}
}
fn process(clients: &mut Vec<PollableClient>) {
for client in clients {
client.process_messages();
fn process(clients: &mut Vec<PollableClient>, wakeups: &mut Vec<WakeupProvider>, secs: u32) {
for _ in 0..secs {
for w in wakeups.iter().as_ref() {
w.advance_time(1);
}
for client in clients.as_mut_slice() {
client.process_messages();
}
}
}
use std::cmp::Reverse;
use std::collections::BinaryHeap;
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd)]
struct WakeupRecord {
expiry: u32,
convo_id: String,
}
struct ManualWakeupService {
now: u32,
pending: BinaryHeap<Reverse<WakeupRecord>>,
on_wakeup: Box<dyn Fn(String)>,
}
impl std::fmt::Debug for ManualWakeupService {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ManualWakeupService")
.field("now", &self.now)
.field("pending", &self.pending)
.finish()
}
}
impl ManualWakeupService {
pub fn new(on_wakeup: impl Fn(String) + 'static) -> Self {
Self {
now: 0,
pending: BinaryHeap::new(),
on_wakeup: Box::new(on_wakeup),
}
}
pub fn tick(&mut self, secs: u32) -> Vec<String> {
self.now += secs;
let mut fired = vec![];
while self
.pending
.peek()
.is_some_and(|Reverse(w)| w.expiry <= self.now)
{
let Reverse(w) = self.pending.pop().unwrap();
info!(now = self.now, w.convo_id, "Popping");
fired.push(w.convo_id);
}
fired
}
pub fn advance_time(&mut self, secs: u32) {
for convo_id in self.tick(secs) {
(self.on_wakeup)(convo_id);
}
}
}
impl WakeupService for ManualWakeupService {
fn wakeup_in(&mut self, secs: u32, convo_id: libchat::ConversationId) {
info!(now = self.now, convo_id, "Pushing");
self.pending.push(Reverse(WakeupRecord {
expiry: self.now + secs,
convo_id: convo_id.to_string(),
}));
}
}
@ -65,8 +157,100 @@ fn pretty_print(prefix: impl Into<String>) -> Box<dyn Fn(ContentData)> {
})
}
struct WakeupProvider {
client_slot: Rc<
RefCell<
Option<
CoreClient<
TestLogosAccount,
LocalBroadcaster,
EphemeralRegistry,
ManualWakeupService,
MemStore,
>,
>,
>,
>,
}
impl WakeupProvider {
pub fn new() -> Self {
Self {
client_slot: Rc::new(RefCell::new(None)),
}
}
pub fn create_wakeup_service(&self) -> ManualWakeupService {
let slot = self.client_slot.clone();
ManualWakeupService::new(move |convo_id| {
if let Some(client) = slot.borrow().as_ref() {
client.on_wakeup(&convo_id);
}
})
}
pub fn advance_time(&self, secs: u32) {
// borrow_mut must be released before on_wakeup fires — it re-borrows client_slot
let fired = {
let mut slot = self.client_slot.borrow_mut();
slot.as_mut()
.map_or(vec![], |client| client.ws().tick(secs))
};
for convo_id in fired {
if let Some(client) = self.client_slot.borrow().as_ref() {
client.on_wakeup(&convo_id);
}
}
}
pub fn fill_slot(
&self,
saro: &CoreClient<
TestLogosAccount,
LocalBroadcaster,
EphemeralRegistry,
ManualWakeupService,
MemStore,
>,
) {
*self.client_slot.borrow_mut() = Some(saro.clone());
}
}
#[test]
fn wakup() {
let _ = tracing_subscriber::fmt().with_test_writer().try_init();
let mut w = ManualWakeupService::new(|c| println!("Wakeup: {}. ", c));
println!("STARTing");
w.wakeup_in(5, "5");
w.wakeup_in(1, "1");
w.wakeup_in(2, "2");
println!("GO");
w.advance_time(1);
w.advance_time(1);
w.advance_time(1);
w.wakeup_in(3, "3");
w.advance_time(1);
w.advance_time(1);
w.advance_time(1);
w.advance_time(1);
w.advance_time(1);
println!("DONE");
}
#[test]
fn core_client() {
let _ = tracing_subscriber::fmt().with_test_writer().try_init();
let swp = WakeupProvider::new();
let rwp = WakeupProvider::new();
let ds = LocalBroadcaster::new();
let rs = EphemeralRegistry::new();
@ -74,14 +258,31 @@ fn core_client() {
let raya_account = TestLogosAccount::new("raya");
let saro = CoreClient::new(saro_account, ds.clone(), rs.clone(), MemStore::new()).unwrap();
let raya = CoreClient::new(raya_account, ds, rs, MemStore::new()).unwrap();
let saro = CoreClient::new(
saro_account,
ds.clone(),
rs.clone(),
swp.create_wakeup_service(),
MemStore::new(),
)
.unwrap();
swp.fill_slot(&saro);
let raya = CoreClient::new(
raya_account,
ds,
rs,
rwp.create_wakeup_service(),
MemStore::new(),
)
.unwrap();
rwp.fill_slot(&raya);
let mut clients = vec![
PollableClient::init(saro, Some(pretty_print(" Saro "))),
PollableClient::init(raya, Some(pretty_print(" Raya "))),
];
let mut wakeups = vec![swp, rwp];
const SARO: usize = 0;
const RAYA: usize = 1;
@ -89,16 +290,22 @@ fn core_client() {
.create_group_convo(&[&clients[RAYA].account_id()])
.unwrap();
process(&mut clients);
// Manaully process the DS
process(&mut clients, &mut wakeups, 10);
s_convo.send_content(b"HI").unwrap();
let convo_id = clients[RAYA].list_conversations().unwrap().pop().unwrap();
let r_convo = clients[RAYA].convo(&convo_id).expect("Convo exists");
process(&mut clients);
r_convo.send_content(b"PEW").unwrap();
process(&mut clients);
s_convo.send_content(b"SARO again").unwrap();
process(&mut clients);
println!("Hello");
// Manaully process the DS
process(&mut clients, &mut wakeups, 10);
// TODO: Needs Invite path working first
// let convo_id = clients[RAYA].list_conversations().unwrap().pop().unwrap();
// let r_convo = clients[RAYA].convo(&convo_id).expect("Convo exists");
// process(&mut clients);
// r_convo.send_content(b"PEW").unwrap();
// process(&mut clients);
// s_convo.send_content(b"SARO again").unwrap();
// process(&mut clients);
// println!("Hello");
}