From 4c6286234b82f0224a0e0111ec55d9bc7682c920 Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Sun, 10 May 2026 23:08:18 -0700 Subject: [PATCH] Core_client refactor --- Cargo.lock | 21 ++ Cargo.toml | 10 +- core/account/Cargo.toml | 2 + core/account/src/account.rs | 6 +- core/conversations/src/inbox_v2.rs | 8 +- core/conversations/src/lib.rs | 2 +- core/conversations/src/service_traits.rs | 17 +- core/core_client/Cargo.toml | 25 ++ core/core_client/src/conversation.rs | 51 +++ core/core_client/src/conversation/group_v1.rs | 332 ++++++++++++++++++ core/core_client/src/core_client.rs | 278 +++++++++++++++ core/core_client/src/errors.rs | 21 ++ core/core_client/src/inbox_v2.rs | 314 +++++++++++++++++ core/core_client/src/lib.rs | 13 + core/core_client/src/utils.rs | 64 ++++ core/integration_tests_core/Cargo.toml | 2 + .../integration_tests_core/tests/dev_tests.rs | 104 ++++++ .../src/delivery/local_broadcaster.rs | 4 + 18 files changed, 1262 insertions(+), 12 deletions(-) create mode 100644 core/core_client/Cargo.toml create mode 100644 core/core_client/src/conversation.rs create mode 100644 core/core_client/src/conversation/group_v1.rs create mode 100644 core/core_client/src/core_client.rs create mode 100644 core/core_client/src/errors.rs create mode 100644 core/core_client/src/inbox_v2.rs create mode 100644 core/core_client/src/lib.rs create mode 100644 core/core_client/src/utils.rs create mode 100644 core/integration_tests_core/tests/dev_tests.rs diff --git a/Cargo.lock b/Cargo.lock index a927f72..b42b888 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -467,6 +467,25 @@ dependencies = [ "rand 0.9.4", ] +[[package]] +name = "core_client" +version = "0.1.0" +dependencies = [ + "blake2", + "chat-proto", + "chat-sqlite 0.1.0", + "crypto 0.1.0", + "hex", + "libchat 0.1.0", + "openmls", + "openmls_libcrux_crypto 0.3.1", + "openmls_memory_storage 0.5.0", + "openmls_traits 0.5.0", + "prost", + "storage 0.1.0", + "thiserror", +] + [[package]] name = "cpufeatures" version = "0.2.17" @@ -1386,6 +1405,7 @@ version = "0.1.0" dependencies = [ "chat-sqlite 0.1.0", "components", + "core_client", "libchat 0.1.0", "logos-account", "storage 0.1.0", @@ -1828,6 +1848,7 @@ checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" name = "logos-account" version = "0.1.0" dependencies = [ + "core_client", "crypto 0.1.0", "libchat 0.1.0", ] diff --git a/Cargo.toml b/Cargo.toml index bb151df..effacf9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "bin/chat-cli", "core/account", "core/conversations", + "core/core_client", "core/crypto", "core/double-ratchets", "core/integration_tests_core", @@ -13,19 +14,20 @@ members = [ "core/storage", "crates/client-ffi", "crates/client", - "extensions/components", + "extensions/components", ] default-members = [ "core/account", - "core/sqlite", "core/conversations", + "core/core_client", "core/crypto", "core/double-ratchets", - "core/storage", "core/integration_tests_core", - "crates/client", + "core/sqlite", + "core/storage", "crates/client-ffi", + "crates/client", ] [workspace.dependencies] diff --git a/core/account/Cargo.toml b/core/account/Cargo.toml index 5162bf2..6f23fad 100644 --- a/core/account/Cargo.toml +++ b/core/account/Cargo.toml @@ -11,4 +11,6 @@ dev = [] crypto = { workspace = true } libchat = { workspace = true } +core_client = {path = "../core_client"} + # External dependencies (sorted) diff --git a/core/account/src/account.rs b/core/account/src/account.rs index 6705cfd..523896c 100644 --- a/core/account/src/account.rs +++ b/core/account/src/account.rs @@ -1,9 +1,11 @@ -use crypto::{Ed25519SigningKey, Ed25519VerifyingKey}; +use std::fmt::Debug; +use crypto::{Ed25519SigningKey, Ed25519VerifyingKey}; use libchat::{AccountId, IdentityProvider}; /// Logos Account represents a single account across /// multiple installations and services. +#[derive(Debug)] pub struct TestLogosAccount { id: AccountId, signing_key: Ed25519SigningKey, @@ -17,7 +19,7 @@ impl TestLogosAccount { let signing_key = Ed25519SigningKey::generate(); let verifying_key = signing_key.verifying_key(); Self { - id: AccountId::new(explicit_id.into()), + id: AccountId::new(explicit_id), signing_key, verifying_key, } diff --git a/core/conversations/src/inbox_v2.rs b/core/conversations/src/inbox_v2.rs index c667799..3d07263 100644 --- a/core/conversations/src/inbox_v2.rs +++ b/core/conversations/src/inbox_v2.rs @@ -31,13 +31,14 @@ fn conversation_id_for(account_id: &AccountId) -> String { blake2b_hex::(&["InboxV2|", "conversation_id|", account_id.as_str()]) } +#[derive(Debug)] pub struct MlsIdentityProvider(T); impl MlsIdentityProvider { pub fn get_credential(&self) -> CredentialWithKey { CredentialWithKey { - credential: BasicCredential::new(self.friendly_name().into()).into(), - signature_key: self.public_key().as_ref().into(), + credential: BasicCredential::new(self.0.friendly_name().into()).into(), + signature_key: self.0.public_key().as_ref().into(), } } } @@ -202,8 +203,7 @@ where pub fn register(&mut self) -> Result<(), ChatError> { let keypackage_bytes = self.create_keypackage()?.tls_serialize_detached()?; - // TODO: (P3) Each keypackage can only be used once either enable... - // "LastResort" package or publish multiple + // TODO: (P3) Each keypackage can only be used once — enable LastResort or publish multiple self.reg_service .borrow_mut() .register(&self.account.borrow().friendly_name(), keypackage_bytes) diff --git a/core/conversations/src/lib.rs b/core/conversations/src/lib.rs index 2436cf9..ccda27c 100644 --- a/core/conversations/src/lib.rs +++ b/core/conversations/src/lib.rs @@ -15,5 +15,5 @@ pub use context::{Context, ConversationId, ConversationIdOwned, Introduction}; pub use conversation::GroupConvo; pub use errors::ChatError; pub use service_traits::{DeliveryService, IdentityProvider, RegistrationService}; -pub use types::{AccountId, AddressedEnvelope, ContentData}; +pub use types::{AccountId, AddressedEncryptedPayload, AddressedEnvelope, ContentData}; pub use utils::hex_trunc; diff --git a/core/conversations/src/service_traits.rs b/core/conversations/src/service_traits.rs index 441955c..c7acf39 100644 --- a/core/conversations/src/service_traits.rs +++ b/core/conversations/src/service_traits.rs @@ -44,9 +44,24 @@ impl KeyPackageProvider for T { /// Represents an external Identity /// Implement this to provide an Authentication model for users/installations -pub trait IdentityProvider { +pub trait IdentityProvider: Debug { fn account_id(&self) -> &AccountId; fn friendly_name(&self) -> String; fn sign(&self, payload: &[u8]) -> Ed25519Signature; fn public_key(&self) -> &Ed25519VerifyingKey; } + +impl IdentityProvider for &T { + fn account_id(&self) -> &AccountId { + (**self).account_id() + } + fn friendly_name(&self) -> String { + (**self).friendly_name() + } + fn sign(&self, payload: &[u8]) -> Ed25519Signature { + (**self).sign(payload) + } + fn public_key(&self) -> &Ed25519VerifyingKey { + (**self).public_key() + } +} diff --git a/core/core_client/Cargo.toml b/core/core_client/Cargo.toml new file mode 100644 index 0000000..bc8691b --- /dev/null +++ b/core/core_client/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "core_client" +version = "0.1.0" +edition = "2024" + + + + +[dependencies] +# Workspace dependencies (sorted) +blake2 = { workspace = true } +chat-sqlite = { workspace = true } +crypto = { workspace = true } +libchat = { workspace = true } +storage = { workspace = true } + +# External dependencies (sorted) +chat-proto = { git = "https://github.com/logos-messaging/chat_proto" } +thiserror = "2.0.18" +prost = "0.14.3" +hex = "0.4.3" +openmls = "0.8.1" +openmls_libcrux_crypto = "0.3.1" +openmls_memory_storage = "0.5.0" +openmls_traits = "0.5.0" diff --git a/core/core_client/src/conversation.rs b/core/core_client/src/conversation.rs new file mode 100644 index 0000000..58aa4f1 --- /dev/null +++ b/core/core_client/src/conversation.rs @@ -0,0 +1,51 @@ +mod group_v1; + +use crate::{AccountId, ContentData, DeliveryService, RegistrationService}; +use chat_proto::logoschat::encryption::EncryptedPayload; +use libchat::IdentityProvider; + +use std::fmt::Debug; + +pub use crate::ChatError; +pub use group_v1::GroupV1Convo; + +pub type ConversationIdRef<'a> = &'a str; +pub type ConversationId = String; + +pub struct ServiceContext { + pub identity_provider: IP, + pub ds: DS, + pub rs: RS, +} + +pub trait Id: Debug { + fn id(&self) -> ConversationIdRef<'_>; +} + +pub trait BaseConvo: + Id + Debug +{ + fn init(&self, service_ctx: &mut ServiceContext) -> Result<(), ChatError>; + + fn send_content( + &mut self, + service_ctx: &mut ServiceContext, + content: &[u8], + ) -> Result<(), ChatError>; + + fn handle_frame( + &mut self, + service_ctx: &mut ServiceContext, + enc_payload: EncryptedPayload, + ) -> Result, ChatError>; +} + +pub trait BaseGroupConvo: + BaseConvo +{ + fn add_member( + &mut self, + service_ctx: &mut ServiceContext, + members: &[&AccountId], + ) -> Result<(), ChatError>; +} diff --git a/core/core_client/src/conversation/group_v1.rs b/core/core_client/src/conversation/group_v1.rs new file mode 100644 index 0000000..91ee1cf --- /dev/null +++ b/core/core_client/src/conversation/group_v1.rs @@ -0,0 +1,332 @@ +/// GroupV1 is a conversationType which provides effecient handling of multiple participants +/// Properties: +/// - Harvest Now Decrypt Later (HNDL) protection provided by XWING +/// - Multiple +use std::cell::RefCell; +use std::rc::Rc; + +use blake2::{Blake2b, Digest, digest::consts::U6}; +use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload}; +use openmls::prelude::tls_codec::Deserialize; +use openmls::prelude::*; + +use crate::AccountId; +use crate::conversation::{ConversationIdRef, ServiceContext}; +use crate::inbox_v2::{MlsIdentityProvider, MlsProvider}; +use crate::{ + AddressedEncryptedPayload, ContentData, DeliveryService, IdentityProvider, RegistrationService, + conversation::{BaseConvo, BaseGroupConvo, ChatError, Id}, +}; + +pub struct GroupV1Convo { + mls_provider: Rc>, + mls_group: MlsGroup, + convo_id: String, +} + +impl std::fmt::Debug for GroupV1Convo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("GroupV1Convo") + .field("convo_id", &self.convo_id) + .field("mls_epoch", &self.mls_group.epoch()) + .finish_non_exhaustive() + } +} + +impl GroupV1Convo { + // Create a new conversation with the creator as the only participant. + pub fn new( + identity_provider: MlsIdentityProvider, + mls_provider: Rc>, + ) -> Result { + let config = Self::mls_create_config(); + let mls_group = { + let credential = identity_provider.get_credential(); + MlsGroup::new( + &*mls_provider.borrow(), + &identity_provider, + &config, + credential, + ) + .unwrap() + }; + let convo_id = hex::encode(mls_group.group_id().as_slice()); + + Ok(Self { + mls_provider, + mls_group, + convo_id, + }) + } + + // Constructs a new conversation upon receiving a MlsWelcome message. + pub fn new_from_welcome( + mls_provider: Rc>, + welcome: Welcome, + ) -> Result { + let mls_group = { + let provider = &*mls_provider.borrow(); + StagedWelcome::build_from_welcome(provider, &Self::mls_join_config(), welcome) + .unwrap() + .build() + .unwrap() + .into_group(provider) + .unwrap() + }; + + let convo_id = hex::encode(mls_group.group_id().as_slice()); + + Ok(Self { + mls_provider, + mls_group, + convo_id, + }) + } + + fn mls_create_config() -> MlsGroupCreateConfig { + MlsGroupCreateConfig::builder() + .ciphersuite(Ciphersuite::MLS_256_XWING_CHACHA20POLY1305_SHA256_Ed25519) + .use_ratchet_tree_extension(true) // This is handy for now, until there is central store for this data + .build() + } + + fn mls_join_config() -> MlsGroupJoinConfig { + MlsGroupJoinConfig::builder().build() + } + + fn delivery_address_from_id(convo_id: &str) -> String { + let hash = Blake2b::::new() + .chain_update("delivery_addr|") + .chain_update(convo_id) + .finalize(); + hex::encode(hash) + } + + fn delivery_address(&self) -> String { + Self::delivery_address_from_id(&self.convo_id) + } + + fn ctrl_delivery_address_from_id(convo_id: &str) -> String { + let hash = Blake2b::::new() + .chain_update("ctrl_delivery_addr|") + .chain_update(convo_id) + .finalize(); + hex::encode(hash) + } + + fn ctrl_delivery_address(&self) -> String { + Self::ctrl_delivery_address_from_id(&self.convo_id) + } +} + +impl Id for GroupV1Convo +where + MP: MlsProvider, +{ + fn id(&self) -> ConversationIdRef<'_> { + &self.convo_id + } +} + +impl BaseConvo for GroupV1Convo +where + IP: IdentityProvider, + MP: MlsProvider, + DS: DeliveryService, + RS: RegistrationService, + // KP: RegistrationService, +{ + fn init(&self, service_ctx: &mut super::ServiceContext) -> Result<(), ChatError> { + // Configure the delivery service to listen for the required delivery addresses. + + service_ctx + .ds + .subscribe(&Self::delivery_address_from_id(&self.convo_id)) + .map_err(ChatError::generic)?; + service_ctx + .ds + .subscribe(&Self::ctrl_delivery_address_from_id(&self.convo_id)) + .map_err(ChatError::generic)?; + + Ok(()) + } + + fn send_content( + &mut self, + service_ctx: &mut super::ServiceContext, + content: &[u8], + ) -> Result<(), ChatError> { + let signer = MlsIdentityProvider(&service_ctx.identity_provider); + let mls_message_out = self + .mls_group + .create_message(&*self.mls_provider.borrow(), &signer, content) + .unwrap(); + + let payload = AddressedEncryptedPayload { + delivery_address: self.delivery_address(), + data: EncryptedPayload { + encryption: Some(encrypted_payload::Encryption::Plaintext(Plaintext { + payload: mls_message_out.to_bytes().unwrap().into(), + })), + }, + }; + + let env = payload.into_envelope(self.id().into()); + service_ctx + .ds + .publish(env) + .map_err(|e| ChatError::Delivery(e.to_string()))?; + + Ok(()) + } + + fn handle_frame( + &mut self, + _service_ctx: &mut super::ServiceContext, + encoded_payload: EncryptedPayload, + ) -> Result, ChatError> { + let bytes = match encoded_payload.encryption { + Some(encrypted_payload::Encryption::Plaintext(pt)) => pt.payload, + _ => { + return Err(ChatError::generic("Expected plaintext")); + } + }; + + let mls_message = + MlsMessageIn::tls_deserialize_exact_bytes(&bytes).map_err(ChatError::generic)?; + + let protocol_message: ProtocolMessage = mls_message + .try_into_protocol_message() + .map_err(ChatError::generic)?; + + let provider = &*self.mls_provider.borrow(); + + if protocol_message.epoch() < self.mls_group.epoch() { + // TODO: (P1) Add logging for messages arriving from past epoch. + return Ok(None); + } + + let processed = self + .mls_group + .process_message(provider, protocol_message) + .map_err(ChatError::generic)?; + + match processed.into_content() { + ProcessedMessageContent::ApplicationMessage(msg) => Ok(Some(ContentData { + conversation_id: hex::encode(self.mls_group.group_id().as_slice()), + data: msg.into_bytes(), + is_new_convo: false, + })), + ProcessedMessageContent::StagedCommitMessage(commit) => { + self.mls_group + .merge_staged_commit(provider, *commit) + .map_err(ChatError::generic)?; + Ok(None) + } + _ => { + // TODO: (P2) Log unknown message type + Ok(None) + } + } + } +} + +impl BaseGroupConvo for GroupV1Convo +where + IP: IdentityProvider, + MP: MlsProvider, + DS: DeliveryService, + RS: RegistrationService, +{ + // add_members returns: + // commit — the Commit message Alice broadcasts to all members + // welcome — the Welcome message sent privately to each new joiner + // _group_info — used for external joins; ignore for now + fn add_member( + &mut self, + service_ctx: &mut ServiceContext, + members: &[&AccountId], + ) -> Result<(), ChatError> { + let mls_provider = &*self.mls_provider.borrow(); + + if members.len() > 50 { + // This is a temporary limit that originates from the the De-MLS epoch time. + return Err(ChatError::generic( + "Cannot add more than 50 Members at a time", + )); + } + + if members.is_empty() { + return Ok(()); + } + + // Get the Keypacakages and transpose any errors. + // The account_id is kept so invites can be addressed properly + let keypkgs = members + .iter() + .map(|ident| self.key_package_for_account(service_ctx, ident)) + .collect::, ChatError>>()?; + + let signer = MlsIdentityProvider(&service_ctx.identity_provider); + let (commit, welcome, _group_info) = self + .mls_group + .add_members(mls_provider, &signer, keypkgs.iter().as_slice()) + .unwrap(); + + self.mls_group.merge_pending_commit(mls_provider).unwrap(); + + // TODO: (P3) Evaluate privacy/performance implications of an aggregated Welcome for multiple users + for account_id in members { + self.mls_provider + .borrow() + .invite_user(&mut service_ctx.ds, account_id, &welcome)?; + } + + let encrypted_payload = EncryptedPayload { + encryption: Some(encrypted_payload::Encryption::Plaintext(Plaintext { + payload: commit.to_bytes().map_err(ChatError::generic)?.into(), + })), + }; + + let addr_enc_payload = AddressedEncryptedPayload { + delivery_address: self.ctrl_delivery_address(), + data: encrypted_payload, + }; + // Prepare commit message + // TODO: (P1) Make GroupConvos agnostic to framing so its less error prone and more + let env = addr_enc_payload.into_envelope(self.convo_id.clone()); + + service_ctx + .ds + .publish(env) + .map_err(|e| ChatError::Generic(format!("Publish: {e}"))) + } +} + +impl GroupV1Convo { + fn key_package_for_account< + IP: IdentityProvider, + DS: DeliveryService, + RS: RegistrationService, + >( + &self, + service_ctx: &mut ServiceContext, + ident: &AccountId, + ) -> Result { + let retrieved_bytes = service_ctx + .rs + .retrieve(ident) + .map_err(|e: RS::Error| ChatError::Generic(e.to_string()))?; + + // dbg!(ctx.contact_registry()); + let Some(keypkg_bytes) = retrieved_bytes else { + return Err(ChatError::generic("Group Contact Not Found")); + }; + + let key_package_in = KeyPackageIn::tls_deserialize(&mut keypkg_bytes.as_slice())?; + let keypkg = key_package_in + .validate(self.mls_provider.borrow().crypto(), ProtocolVersion::Mls10) + .map_err(ChatError::generic)?; //TODO: P3 - Hardcoded Protocol Version + Ok(keypkg) + } +} diff --git a/core/core_client/src/core_client.rs b/core/core_client/src/core_client.rs new file mode 100644 index 0000000..ce0f347 --- /dev/null +++ b/core/core_client/src/core_client.rs @@ -0,0 +1,278 @@ +use std::cell::RefMut; +use std::collections::HashMap; +use std::{cell::RefCell, rc::Rc}; + +use crate::conversation::{BaseGroupConvo, ConversationId, ConversationIdRef, Id, ServiceContext}; + +use crate::inbox_v2::InboxV2; +use crate::{AccountId, errors::ChatError}; +use crate::{DeliveryService, IdentityProvider, RegistrationService}; +use chat_proto::logoschat::encryption::EncryptedPayload; +use chat_proto::logoschat::envelope::EnvelopeV1; +use libchat::ContentData; +use prost::Message; +use storage::ChatStore; + +#[derive(Debug)] +enum ConvoTypeOwned { + // Pairwise(Box>), + Group(Box>), +} + +impl Id for ConvoTypeOwned +where + IP: IdentityProvider, + DS: DeliveryService, + RS: RegistrationService, +{ + fn id(&self) -> crate::conversation::ConversationIdRef<'_> { + match self { + // ConvoTypeOwned::Pairwise(convo) => convo.id(), + ConvoTypeOwned::Group(convo) => convo.id(), + } + } +} + +pub struct GroupConvo< + IP: IdentityProvider, + DS: DeliveryService, + RS: RegistrationService, + CS: ChatStore, +> { + client: Rc>>, + convo_id: ConversationId, +} + +impl GroupConvo +where + IP: IdentityProvider + 'static, + DS: DeliveryService + 'static, + RS: RegistrationService + 'static, + CS: ChatStore + 'static, +{ + pub fn send_content(&self, content: &[u8]) -> Result<(), ChatError> { + let mut client = self.client.borrow_mut(); + client.send_content(self.convo_id.as_str(), content) + } +} + +pub struct CoreClient< + IP: IdentityProvider, + DS: DeliveryService, + RS: RegistrationService, + CS: ChatStore, +> { + inner: Rc>>, +} + +impl CoreClient +where + IP: IdentityProvider + 'static, + DS: DeliveryService + 'static, + RS: RegistrationService + 'static, + CS: ChatStore + 'static, +{ + pub fn new(account: IP, delivery: DS, registration: RS, store: CS) -> Result { + let c = InnerClient::new(account, delivery, registration, store)?; + Ok(Self { + inner: Rc::new(RefCell::new(c)), + }) + } + + pub fn account_id(&self) -> AccountId { + self.inner.borrow().account_id().clone() + } + + pub fn ds(&self) -> RefMut<'_, DS> { + RefMut::map(self.inner.borrow_mut(), |c| c.ds()) + } + + pub fn create_group_convo( + &self, + participants: &[&AccountId], + ) -> Result, ChatError> { + let convo_id = self.inner.borrow_mut().create_group_convo(participants)?; + Ok(GroupConvo { + client: self.inner.clone(), + convo_id, + }) + } + + pub fn list_conversations(&self) -> Result, ChatError> { + self.inner.borrow().list_conversations() + } + + pub fn send_content( + &self, + convo_id: ConversationIdRef, + content: &[u8], + ) -> Result<(), ChatError> { + self.inner.borrow_mut().send_content(convo_id, content) + } + + pub fn handle_payload(&self, payload: &[u8]) -> Result, ChatError> { + self.inner.borrow_mut().handle_payload(payload) + } + + pub fn convo(&self, convo_id: ConversationIdRef) -> Option> { + let client = self.inner.clone(); + + if !client.borrow().has_conversation(convo_id) { + return None; + } + + Some(GroupConvo { + client, + convo_id: convo_id.to_string(), + }) + } +} + +struct InnerClient< + IP: IdentityProvider, + DS: DeliveryService, + RS: RegistrationService, + CS: ChatStore, +> { + service_ctx: ServiceContext, + _store: Rc>, + + pq_inbox: InboxV2, + + // Cache of loaded conversations + cached_convos: HashMap>, +} + +impl InnerClient +where + IP: IdentityProvider + 'static, + DS: DeliveryService + 'static, + RS: RegistrationService + 'static, + CS: ChatStore + 'static, +{ + pub fn new(account: IP, delivery: DS, registration: RS, store: CS) -> Result { + // Services for sharing with Converastions/Inboxes + + let mut service_ctx = ServiceContext { + identity_provider: account, + ds: delivery, + rs: registration, + }; + + // let contact_registry = Rc::new(RefCell::new(registration)); + let _store = Rc::new(RefCell::new(store)); + + let pq_inbox = InboxV2::new(&mut service_ctx, _store.clone()); + pq_inbox.register(&mut service_ctx)?; + + // Subscribe + service_ctx + .ds + .subscribe(&pq_inbox.delivery_address()) + .map_err(ChatError::generic)?; + + Ok(Self { + service_ctx, + _store, + pq_inbox, + cached_convos: HashMap::new(), + }) + } + + pub fn ds(&mut self) -> &mut DS { + &mut self.service_ctx.ds + } + + /// Returns the unique identifier associated with the account + pub fn account_id(&self) -> &AccountId { + self.pq_inbox.account_id() + } + + pub fn create_group_convo(&mut self, participants: &[&AccountId]) -> Result { + let convo = self.pq_inbox.create_group_v1(&mut self.service_ctx)?; + let mut convo: Box> = Box::new(convo); + convo.init(&mut self.service_ctx)?; + convo.add_member(&mut self.service_ctx, participants)?; + + let convo_id = convo.id().to_string(); + + self.register_convo(ConvoTypeOwned::Group(convo))?; + + Ok(convo_id) + } + + pub fn list_conversations(&self) -> Result, ChatError> { + Ok(self.cached_convos.keys().cloned().collect()) + } + + pub fn has_conversation(&self, convo_id: ConversationIdRef) -> bool { + self.cached_convos.contains_key(convo_id) + } + + pub fn send_content( + &mut self, + convo_id: ConversationIdRef, + content: &[u8], + ) -> Result<(), ChatError> { + let Some(convo) = self.cached_convos.get_mut(convo_id) else { + return Err(ChatError::generic("No Convo Found")); + }; + let convo = match convo { + // ConvoTypeOwned::Pairwise(_) => todo!(), + ConvoTypeOwned::Group(c) => c.as_mut(), + }; + convo.send_content(&mut self.service_ctx, content) + } + + // Decode bytes and send to protocol for processing. + pub fn handle_payload(&mut self, payload: &[u8]) -> Result, ChatError> { + let env = EnvelopeV1::decode(payload)?; + + // TODO: Impl Conversation hinting + let convo_id = env.conversation_hint; + match convo_id { + c if c == self.pq_inbox.id() => self.dispatch_to_inbox2(&env.payload), + c if self.cached_convos.contains_key(c.as_str()) => { + self.dispatch_to_convo(c, &env.payload) + } + _ => Ok(None), + } + } + + // Dispatch encrypted payload to Inbox, and register the created Conversation + fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result, ChatError> { + if let Some(convo) = self.pq_inbox.handle_frame(&mut self.service_ctx, payload)? { + let convo: Box> = Box::new(convo); + self.register_convo(ConvoTypeOwned::Group(convo))?; + } + Ok(None) + } + + // Dispatch encrypted payload to its corresponding conversation + fn dispatch_to_convo( + &mut self, + convo_id: ConversationId, + enc_payload_bytes: &[u8], + ) -> Result, ChatError> { + let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?; + + let Some(convo) = self.cached_convos.get_mut(&convo_id) else { + return Err(ChatError::generic("No Convo Found")); + }; + let convo = match convo { + // ConvoTypeOwned::Pairwise(_) => todo!(), + ConvoTypeOwned::Group(c) => c.as_mut(), + }; + + convo.handle_frame(&mut self.service_ctx, enc_payload) + } + + fn register_convo(&mut self, convo: ConvoTypeOwned) -> Result<(), ChatError> { + let res = self.cached_convos.insert(convo.id().to_string(), convo); + + match res { + Some(_) => Err(ChatError::generic("Convo already exists. Cannot save")), + None => Ok(()), + } + } +} diff --git a/core/core_client/src/errors.rs b/core/core_client/src/errors.rs new file mode 100644 index 0000000..b246f0b --- /dev/null +++ b/core/core_client/src/errors.rs @@ -0,0 +1,21 @@ +use openmls::prelude::tls_codec; +pub use thiserror::Error; + +#[derive(Error, Debug)] +pub enum ChatError { + #[error("generic: {0}")] + Generic(String), + #[error("TlsCodec: {0}")] + TlsCodec(#[from] tls_codec::Error), + #[error("Protobuf decode: {0}")] + ProtobufDecodeError(#[from] prost::DecodeError), + #[error("delivery: {0}")] + Delivery(String), +} + +impl ChatError { + // This is a stopgap until there is a proper error system in place + pub fn generic(e: impl ToString) -> Self { + Self::Generic(e.to_string()) + } +} diff --git a/core/core_client/src/inbox_v2.rs b/core/core_client/src/inbox_v2.rs new file mode 100644 index 0000000..2e5d00b --- /dev/null +++ b/core/core_client/src/inbox_v2.rs @@ -0,0 +1,314 @@ +use std::cell::RefCell; +use std::ops::Deref; +use std::rc::Rc; + +use chat_proto::logoschat::envelope::EnvelopeV1; +use openmls::prelude::tls_codec::Serialize; +use openmls::prelude::*; +use openmls_libcrux_crypto::CryptoProvider as LibcruxCryptoProvider; +use openmls_memory_storage::MemoryStorage; +use openmls_traits::signatures::Signer; +use openmls_traits::signatures::SignerError; +use prost::{Message, Oneof}; +use storage::ChatStore; +use storage::ConversationMeta; + +use crate::AccountId; +use crate::AddressedEnvelope; +use crate::ChatError; +use crate::DeliveryService; +use crate::IdentityProvider; +use crate::RegistrationService; +use crate::conversation::BaseConvo; +use crate::conversation::ServiceContext; +use crate::conversation::{GroupV1Convo, Id}; +use crate::utils::{blake2b_hex, hash_size}; + +// Define unique Identifiers derivations used in InboxV2 +fn delivery_address_for(account_id: &AccountId) -> String { + blake2b_hex::(&["InboxV2|", "delivery_address|", account_id.as_str()]) +} + +fn conversation_id_for(account_id: &AccountId) -> String { + blake2b_hex::(&["InboxV2|", "conversation_id|", account_id.as_str()]) +} + +#[derive(Debug)] +pub struct MlsIdentityProvider(pub T); + +impl MlsIdentityProvider { + pub fn get_credential(&self) -> CredentialWithKey { + CredentialWithKey { + credential: BasicCredential::new(self.0.friendly_name().into()).into(), + signature_key: self.0.public_key().as_ref().into(), + } + } +} + +impl Deref for MlsIdentityProvider { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl IdentityProvider for MlsIdentityProvider { + fn account_id(&self) -> &AccountId { + self.0.account_id() + } + + fn friendly_name(&self) -> String { + self.0.friendly_name() + } + + fn sign(&self, payload: &[u8]) -> crypto::Ed25519Signature { + self.0.sign(payload) + } + + fn public_key(&self) -> &crypto::Ed25519VerifyingKey { + self.0.public_key() + } +} + +impl Signer for MlsIdentityProvider { + fn sign(&self, payload: &[u8]) -> Result, SignerError> { + Ok(self.0.sign(payload).as_ref().to_vec()) + } + + fn signature_scheme(&self) -> SignatureScheme { + SignatureScheme::ED25519 + } +} + +/// An Extension trait which extends OpenMlsProvider to add required functionality +/// All MLS based Conversation should use this trait for defining requirements. +pub trait MlsProvider: OpenMlsProvider { + fn invite_user( + &self, + ds: &mut DS, + account_id: &AccountId, + welcome: &MlsMessageOut, + ) -> Result<(), ChatError>; +} + +/// This is a PQ based provider that uses in memory storage. +pub struct MlsEphemeralPqProvider { + crypto: LibcruxCryptoProvider, + storage: MemoryStorage, +} + +impl MlsEphemeralPqProvider { + pub fn new() -> Result { + let crypto = LibcruxCryptoProvider::new()?; + let storage = MemoryStorage::default(); + + Ok(Self { crypto, storage }) + } +} + +impl MlsProvider for MlsEphemeralPqProvider { + fn invite_user( + &self, + ds: &mut DS, + account_id: &AccountId, + welcome: &MlsMessageOut, + ) -> Result<(), ChatError> { + let invite = GroupV1HeavyInvite { + welcome_bytes: welcome.to_bytes().map_err(ChatError::generic)?, + }; + + let frame = InboxV2Frame { + payload: Some(InviteType::GroupV1(invite)), + }; + + let envelope = EnvelopeV1 { + conversation_hint: conversation_id_for(account_id), + salt: 0, + payload: frame.encode_to_vec().into(), + }; + + let outbound_msg = AddressedEnvelope { + delivery_address: delivery_address_for(account_id), + data: envelope.encode_to_vec(), + }; + + ds.publish(outbound_msg).map_err(ChatError::generic)?; + Ok(()) + } +} + +impl OpenMlsProvider for MlsEphemeralPqProvider { + type CryptoProvider = LibcruxCryptoProvider; + type RandProvider = LibcruxCryptoProvider; + type StorageProvider = openmls_memory_storage::MemoryStorage; + + fn storage(&self) -> &Self::StorageProvider { + &self.storage + } + + fn crypto(&self) -> &Self::CryptoProvider { + &self.crypto + } + + fn rand(&self) -> &Self::RandProvider { + &self.crypto + } +} + +/// An PQ focused Conversation initializer. +/// InboxV2 Incorporates an Account based identity system to support PQ based conversation protocols +/// such as MLS. +pub struct InboxV2 { + account_id: AccountId, + _store: Rc>, + mls_provider: Rc>, +} + +impl InboxV2 { + pub fn new( + service_ctx: &mut ServiceContext, + _store: Rc>, + ) -> Self { + // Avoid referencing a temporary value by caching it. + let account_id = service_ctx.identity_provider.account_id().clone(); + let provider = MlsEphemeralPqProvider::new().unwrap(); + Self { + account_id, + _store, + mls_provider: Rc::new(RefCell::new(provider)), + } + } + + pub fn account_id(&self) -> &AccountId { + &self.account_id + } + + pub fn delivery_address(&self) -> String { + delivery_address_for(&self.account_id) + } + + pub fn id(&self) -> String { + conversation_id_for(&self.account_id) + } + + /// Submit MlsKeypackage to registration service + pub fn register( + &self, + service_ctx: &mut ServiceContext, + ) -> Result<(), ChatError> { + let mls_ident = MlsIdentityProvider(&service_ctx.identity_provider); + let keypackage_bytes = self + .create_keypackage(&mls_ident)? + .tls_serialize_detached()?; + + // TODO: (P3) Each keypackage can only be used once either enable... + // "LastResort" package or publish multiple + service_ctx + .rs + .register( + &service_ctx.identity_provider.friendly_name(), + keypackage_bytes, + ) + .map_err(ChatError::generic) + } + + pub fn create_group_v1( + &self, + service_ctx: &mut ServiceContext, + ) -> Result, ChatError> { + let mls_ident = MlsIdentityProvider(&service_ctx.identity_provider); + GroupV1Convo::new(mls_ident, self.mls_provider.clone()) + } + + fn create_keypackage( + &self, + signer: &MlsIdentityProvider, + ) -> Result { + let capabilities = Capabilities::builder() + .ciphersuites(vec![ + Ciphersuite::MLS_256_XWING_CHACHA20POLY1305_SHA256_Ed25519, + ]) + .extensions(vec![ExtensionType::ApplicationId]) + .build(); + + let a = KeyPackage::builder() + .leaf_node_capabilities(capabilities) + .build( + Ciphersuite::MLS_256_XWING_CHACHA20POLY1305_SHA256_Ed25519, + &*self.mls_provider.borrow(), + signer, + signer.get_credential(), + ) + .expect("Failed to build KeyPackage"); + + Ok(a.key_package().clone()) + } +} + +impl InboxV2 { + pub fn handle_frame( + &self, + service_ctx: &mut ServiceContext, + payload_bytes: &[u8], + ) -> Result>, ChatError> { + let inbox_frame = InboxV2Frame::decode(payload_bytes)?; + + let Some(payload) = inbox_frame.payload else { + return Err(ChatError::Generic("InboxV2Payload missing".into())); + }; + + match payload { + InviteType::GroupV1(group_v1_heavy_invite) => self + .handle_heavy_invite(service_ctx, group_v1_heavy_invite) + .map(Some), + } + } + + fn handle_heavy_invite( + &self, + service_ctx: &mut ServiceContext, + invite: GroupV1HeavyInvite, + ) -> Result, ChatError> { + let (msg_in, _rest) = MlsMessageIn::tls_deserialize_bytes(invite.welcome_bytes.as_slice())?; + + let MlsMessageBodyIn::Welcome(welcome) = msg_in.extract() else { + return Err(ChatError::Generic("Expected Welcome".into())); + }; + + let convo = GroupV1Convo::new_from_welcome(self.mls_provider.clone(), welcome)?; + convo.init(service_ctx)?; + self.persist_convo(convo.id())?; + Ok(convo) + } + + fn persist_convo(&self, local_convo_id: &str) -> Result<(), ChatError> { + let meta = ConversationMeta { + local_convo_id: local_convo_id.to_string(), + remote_convo_id: "0".into(), + kind: storage::ConversationKind::GroupV1, + }; + self._store + .borrow_mut() + .save_conversation(&meta) + .map_err(ChatError::generic) + } +} + +#[derive(Clone, PartialEq, Message)] +pub struct InboxV2Frame { + #[prost(oneof = "InviteType", tags = "1")] + pub payload: Option, +} + +#[derive(Clone, PartialEq, Oneof)] +pub enum InviteType { + #[prost(message, tag = "1")] + GroupV1(GroupV1HeavyInvite), +} + +#[derive(Clone, PartialEq, Message)] +pub struct GroupV1HeavyInvite { + #[prost(bytes, tag = "1")] + pub welcome_bytes: Vec, +} diff --git a/core/core_client/src/lib.rs b/core/core_client/src/lib.rs new file mode 100644 index 0000000..c63baab --- /dev/null +++ b/core/core_client/src/lib.rs @@ -0,0 +1,13 @@ +mod conversation; +mod core_client; +mod errors; +mod inbox_v2; +mod utils; + +pub use libchat::{ + AccountId, AddressedEncryptedPayload, AddressedEnvelope, ContentData, DeliveryService, + IdentityProvider, RegistrationService, +}; + +pub use core_client::{CoreClient, GroupConvo}; +pub use errors::ChatError; diff --git a/core/core_client/src/utils.rs b/core/core_client/src/utils.rs new file mode 100644 index 0000000..5afd21c --- /dev/null +++ b/core/core_client/src/utils.rs @@ -0,0 +1,64 @@ +use blake2::{Blake2b, Digest}; + +/// Track hash sizes in use across the crate. +pub mod hash_size { + use blake2::digest::{ + consts::U64, + generic_array::ArrayLength, + typenum::{IsLessOrEqual, NonZero}, + }; + + pub trait HashLen + where + >::Output: NonZero, + { + type Size: ArrayLength + IsLessOrEqual; + } + + /// This macro generates HashLen for the given typenum::length + macro_rules! hash_sizes { + ($($(#[$attr:meta])* $name:ident => $size:ty),* $(,)?) => { + $( + $(#[$attr])* + pub struct $name; + impl HashLen for $name { type Size = $size; } + )* + }; + } + + use blake2::digest::consts::{U6, U8}; + hash_sizes! { + /// Account ID hash length + AccountId => U8, + /// Conversation ID hash length + ConvoId => U6, + } +} + +/// This establishes an easy to use wrapper for hashes in this crate. +/// The output is formatted string of hex characters +pub fn blake2b_hex(components: &[impl AsRef<[u8]>]) -> String { + //A + let mut hash = Blake2b::::new(); + + for c in components { + hash.update(c); + } + + let output = hash.finalize(); + hex::encode(output) +} + +/// Shorten byte slices for testing and logging +#[allow(unused)] +pub fn hex_trunc(data: &[u8]) -> String { + if data.len() <= 8 { + hex::encode(data) + } else { + format!( + "{}..{}", + hex::encode(&data[..4]), + hex::encode(&data[data.len() - 4..]) + ) + } +} diff --git a/core/integration_tests_core/Cargo.toml b/core/integration_tests_core/Cargo.toml index c9ade4b..3fd5ee8 100644 --- a/core/integration_tests_core/Cargo.toml +++ b/core/integration_tests_core/Cargo.toml @@ -14,5 +14,7 @@ libchat = { workspace = true } logos-account = { workspace = true, features = ["dev"]} storage = { workspace = true } +core_client = {path = "../core_client"} + # External dependencies (sorted) tempfile = "3" diff --git a/core/integration_tests_core/tests/dev_tests.rs b/core/integration_tests_core/tests/dev_tests.rs new file mode 100644 index 0000000..353efd1 --- /dev/null +++ b/core/integration_tests_core/tests/dev_tests.rs @@ -0,0 +1,104 @@ +use std::ops::{Deref, DerefMut}; + +use components::{EphemeralRegistry, LocalBroadcaster, MemStore}; + +use core_client::{ChatError, CoreClient}; +use libchat::{ContentData, hex_trunc}; +use logos_account::TestLogosAccount; + +struct PollableClient { + inner: CoreClient, + on_content: Option>, +} + +impl PollableClient { + fn init( + ctx: CoreClient, + cb: Option, + ) -> Self { + Self { + inner: ctx, + on_content: cb.map(|f| Box::new(f) as Box), + } + } + + fn process_messages(&mut self) { + let messages = self.inner.ds().poll_all(); + for data in messages { + let res = self.handle_payload(&data).unwrap(); + if let Some(cb) = &self.on_content + && let Some(content_data) = res + { + cb(content_data); + } + } + } +} + +impl Deref for PollableClient { + type Target = CoreClient; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for PollableClient { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +fn process(clients: &mut Vec) { + for client in clients { + client.process_messages(); + } +} + +// Higher order function to handle printing +fn pretty_print(prefix: impl Into) -> Box { + let prefix = prefix.into(); + Box::new(move |c: ContentData| { + let cid = hex_trunc(c.conversation_id.as_bytes()); + let content = String::from_utf8(c.data).unwrap(); + println!("{} ({:?}) {}", prefix, cid, content) + }) +} + +#[test] +fn core_client() { + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + + let saro_account = TestLogosAccount::new("saro"); + + let raya_account = TestLogosAccount::new("raya"); + + let saro = CoreClient::new(saro_account, ds.clone(), rs.clone(), MemStore::new()).unwrap(); + let raya = CoreClient::new(raya_account, ds, rs, MemStore::new()).unwrap(); + + let mut clients = vec![ + PollableClient::init(saro, Some(pretty_print(" Saro "))), + PollableClient::init(raya, Some(pretty_print(" Raya "))), + ]; + + const SARO: usize = 0; + const RAYA: usize = 1; + + let s_convo = clients[SARO] + .create_group_convo(&[&clients[RAYA].account_id()]) + .unwrap(); + + process(&mut clients); + + s_convo.send_content(b"HI").unwrap(); + let convo_id = clients[RAYA].list_conversations().unwrap().pop().unwrap(); + let r_convo = clients[RAYA].convo(&convo_id).expect("Convo exists"); + process(&mut clients); + r_convo.send_content(b"PEW").unwrap(); + process(&mut clients); + + s_convo.send_content(b"SARO again").unwrap(); + process(&mut clients); + println!("Hello"); +} diff --git a/extensions/components/src/delivery/local_broadcaster.rs b/extensions/components/src/delivery/local_broadcaster.rs index 5889def..39a6c2b 100644 --- a/extensions/components/src/delivery/local_broadcaster.rs +++ b/extensions/components/src/delivery/local_broadcaster.rs @@ -85,6 +85,10 @@ impl LocalBroadcaster { } } + pub fn poll_all(&mut self) -> Vec> { + std::iter::from_fn(|| self.poll()).collect() + } + fn msg_id(msg: &AddressedEnvelope) -> u64 { let mut hasher = DefaultHasher::new(); msg.data.as_slice().hash(&mut hasher);