From af3ff3c6a2901510d8b3814d4b33b9cd9fcae81f Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Thu, 23 Apr 2026 18:04:45 -0700 Subject: [PATCH] Add GroupV1 --- Cargo.lock | 4 + Cargo.toml | 5 +- core/conversations/src/context.rs | 433 +++++++++++++++--- core/conversations/src/conversation.rs | 33 +- .../src/conversation/group_v1.rs | 433 ++++++++++++++++++ core/conversations/src/ctx.rs | 36 ++ core/conversations/src/errors.rs | 18 + core/conversations/src/external_traits.rs | 35 ++ core/conversations/src/inbox_v2.rs | 383 ++++++++++++++++ core/conversations/src/lib.rs | 7 + core/conversations/src/test_utils.rs | 310 +++++++++++++ core/conversations/src/types.rs | 54 ++- core/conversations/src/utils.rs | 63 +++ core/crypto/src/lib.rs | 2 + core/sqlite/src/lib.rs | 14 +- core/storage/src/store.rs | 5 + crates/client/src/client.rs | 4 +- crates/client/src/delivery.rs | 6 - crates/client/src/delivery_in_process.rs | 2 +- crates/client/src/lib.rs | 6 +- extensions/delivery/Cargo.toml | 6 + extensions/delivery/src/lib.rs | 5 + extensions/delivery/src/local_bcast.rs | 58 +++ 23 files changed, 1840 insertions(+), 82 deletions(-) create mode 100644 core/conversations/src/conversation/group_v1.rs create mode 100644 core/conversations/src/ctx.rs create mode 100644 core/conversations/src/external_traits.rs create mode 100644 core/conversations/src/inbox_v2.rs create mode 100644 core/conversations/src/test_utils.rs delete mode 100644 crates/client/src/delivery.rs create mode 100644 extensions/delivery/Cargo.toml create mode 100644 extensions/delivery/src/lib.rs create mode 100644 extensions/delivery/src/local_bcast.rs diff --git a/Cargo.lock b/Cargo.lock index 274c8bb..1e397a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -633,6 +633,10 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "delivery" +version = "0.1.0" + [[package]] name = "der" version = "0.7.10" diff --git a/Cargo.toml b/Cargo.toml index db5f220..448b133 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,9 +10,12 @@ members = [ "core/storage", "crates/client", "crates/client-ffi", - "bin/chat-cli", + "bin/chat-cli", "extensions/delivery", ] +# default-members = [ "core/*", "crates/*"] +default-members = [ "core/*"] + [workspace.dependencies] blake2 = "0.10" libchat = { path = "core/conversations" } diff --git a/core/conversations/src/context.rs b/core/conversations/src/context.rs index 9a48de7..0266aa1 100644 --- a/core/conversations/src/context.rs +++ b/core/conversations/src/context.rs @@ -1,39 +1,50 @@ use std::sync::Arc; use std::{cell::RefCell, rc::Rc}; -use crypto::{Identity, PublicKey}; -use storage::{ChatStore, ConversationKind}; +use crate::conversation::{Convo, GroupConvo, GroupV1Convo, IdentityProvider}; +use crate::ctx::{self, ClientCtx}; use crate::account::LogosAccount; +use crate::{DeliveryService, RegistrationService}; use crate::{ - conversation::{Conversation, ConversationId, Convo, Id, PrivateV1Convo}, + conversation::{Conversation, ConversationId, Id, PrivateV1Convo}, errors::ChatError, inbox::Inbox, + inbox_v2::{GroupInitializer, InboxV2}, proto::{EncryptedPayload, EnvelopeV1, Message}, types::{AddressedEnvelope, ContentData}, }; +use crypto::{Identity, PublicKey}; +use storage::{ChatStore, ConversationKind}; pub use crate::conversation::ConversationIdOwned; pub use crate::inbox::Introduction; // This is the main entry point to the conversations api. // Ctx manages lifetimes of objects to process and generate payloads. -pub struct Context { +pub struct Context { _identity: Rc, - inbox: Inbox, - store: Rc>, - #[allow(unused)] // TODO: (P2) Remove once Account integrated in future PR. - account: LogosAccount, + client_ctx: ClientCtx, + inbox: Inbox, + pq_inbox: InboxV2, + store: Rc>, } -impl Context { +impl Context { /// Opens or creates a Context with the given storage configuration. /// /// If an identity exists in storage, it will be restored. /// Otherwise, a new identity will be created with the given name and saved. - pub fn new_from_store(name: impl Into, store: S) -> Result { + pub fn new_from_store( + name: impl Into, + delivery: DS, + contact_reg: RS, + store: CS, + ) -> Result { let name = name.into(); + let store = Rc::new(RefCell::new(store)); + let mut ctx = ClientCtx::new(delivery, contact_reg, store.clone()); // Load or create identity let identity = if let Some(identity) = store.borrow().load_identity()? { @@ -47,9 +58,18 @@ impl Context { let identity = Rc::new(identity); let inbox = Inbox::new(Rc::clone(&store), Rc::clone(&identity)); + let pq_inbox = InboxV2::new(); + + // Subscribe + ctx.ds() + .subscribe(pq_inbox.delivery_address()) + .map_err(ChatError::generic)?; + Ok(Self { _identity: identity, + client_ctx: ctx, inbox, + pq_inbox, store, account: LogosAccount::new_test(name.as_str()), }) @@ -58,10 +78,17 @@ impl Context { /// Creates a new in-memory Context (for testing). /// /// Uses in-memory SQLite database. Each call creates a new isolated database. - pub fn new_with_name(name: impl Into, chat_store: S) -> Self { + pub fn new_with_name( + name: impl Into, + delivery: DS, + contact_reg: RS, + chat_store: CS, + ) -> Result { let name = name.into(); let identity = Identity::new(&name); + let chat_store = Rc::new(RefCell::new(chat_store)); + let mut ctx = ClientCtx::new(delivery, contact_reg, chat_store.clone()); chat_store .borrow_mut() .save_identity(&identity) @@ -69,13 +96,27 @@ impl Context { let identity = Rc::new(identity); let inbox = Inbox::new(Rc::clone(&chat_store), Rc::clone(&identity)); + let mut pq_inbox = InboxV2::new(); + pq_inbox.register(&mut ctx)?; - Self { + ctx.ds() + .subscribe(pq_inbox.delivery_address()) + .map_err(ChatError::generic)?; + + Ok(Self { _identity: identity, + client_ctx: ctx, + pq_inbox, inbox, + store: chat_store, account: LogosAccount::new_test(name.as_str()), - } + }) + } + + /// Returns the unique identifier associated with the account + pub fn account_id(&self) -> String { + self.pq_inbox.account.friendly_name() } pub fn installation_name(&self) -> &str { @@ -96,7 +137,7 @@ impl Context { .invite_to_private_convo(remote_bundle, content, Rc::clone(&self.store)) .unwrap_or_else(|_| todo!("Log/Surface Error")); - let remote_id = Inbox::::inbox_identifier_for_key(*remote_bundle.installation_key()); + let remote_id = Inbox::::inbox_identifier_for_key(*remote_bundle.installation_key()); let payload_bytes = payloads .into_iter() .map(|p| p.into_envelope(remote_id.clone())) @@ -106,6 +147,23 @@ impl Context { Ok((convo_id, payload_bytes)) } + pub fn create_group_convo( + &mut self, + participants: &[&str], + ) -> Result>, ChatError> { + let mut convo = self.pq_inbox.create_group_v1(&mut self.client_ctx)?; + self.client_ctx + .store() + .save_conversation(&storage::ConversationMeta { + local_convo_id: convo.id().to_string(), + remote_convo_id: "0".into(), + kind: ConversationKind::GroupV1, + })?; + convo.add_member(&mut self.client_ctx, participants)?; + + Ok(Box::new(convo)) + } + pub fn list_conversations(&self) -> Result, ChatError> { let records = self.store.borrow().load_conversations()?; Ok(records @@ -119,41 +177,47 @@ impl Context { convo_id: ConversationId, content: &[u8], ) -> Result, ChatError> { - let convo = self.load_convo(convo_id)?; - - match convo { - Conversation::Private(mut convo) => { - let payloads = convo.send_message(content)?; - let remote_id = convo.remote_id(); - - Ok(payloads - .into_iter() - .map(|p| p.into_envelope(remote_id.clone())) - .collect()) - } - } + let mut convo = self.load_convo(convo_id)?; + let payloads = convo.send_message(content)?; + let remote_id = convo.remote_id(); + Ok(payloads + .into_iter() + .map(|p| p.into_envelope(remote_id.clone())) + .collect()) } // Decode bytes and send to protocol for processing. pub fn handle_payload(&mut self, payload: &[u8]) -> Result, ChatError> { let env = EnvelopeV1::decode(payload)?; + let e2 = env.clone(); // TODO: Impl Conversation hinting let convo_id = env.conversation_hint; - let enc = EncryptedPayload::decode(env.payload)?; + + let a = self.pq_inbox.id(); match convo_id { - c if c == self.inbox.id() => self.dispatch_to_inbox(enc), - c if self.store.borrow().has_conversation(&c)? => self.dispatch_to_convo(&c, enc), - _ => Ok(None), + c if c == self.inbox.id() => self.dispatch_to_inbox(&env.payload), + c if c == self.pq_inbox.id() => self.dispatch_to_inbox2(&env.payload), + c if self.store.borrow().has_conversation(&c)? => { + self.dispatch_to_convo(&c, &env.payload) + } + _ => Ok(Some(ContentData { + conversation_id: "".into(), + data: vec![], + is_new_convo: false, + })), } } // Dispatch encrypted payload to Inbox, and register the created Conversation fn dispatch_to_inbox( &mut self, - enc_payload: EncryptedPayload, + enc_payload_bytes: &[u8], ) -> Result, ChatError> { - let public_key_hex = Inbox::::extract_ephemeral_key_hex(&enc_payload)?; + // EncryptedPayloads are not used by GroupConvos at this time, else this can be performed in `handle_payload` + // TODO: (P1) reconcile envelope parsing between Covno and GroupConvo + let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?; + let public_key_hex = Inbox::::extract_ephemeral_key_hex(&enc_payload)?; let (convo, content) = self.inbox .handle_frame(enc_payload, &public_key_hex, Rc::clone(&self.store))?; @@ -168,20 +232,22 @@ impl Context { Ok(content) } + // Dispatch encrypted payload to Inbox, and register the created Conversation + fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result, ChatError> { + self.pq_inbox.handle_frame(&mut self.client_ctx, payload)?; + + Ok(None) + } + // Dispatch encrypted payload to its corresponding conversation fn dispatch_to_convo( &mut self, convo_id: ConversationId, - enc_payload: EncryptedPayload, + enc_payload_bytes: &[u8], ) -> Result, ChatError> { - let convo = self.load_convo(convo_id)?; - - match convo { - Conversation::Private(mut convo) => { - let result = convo.handle_frame(enc_payload)?; - Ok(result) - } - } + let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?; + let mut convo = self.load_convo(convo_id)?; + convo.handle_frame(enc_payload) } pub fn create_intro_bundle(&mut self) -> Result, ChatError> { @@ -190,7 +256,7 @@ impl Context { } /// Loads a conversation from DB by constructing it from metadata. - fn load_convo(&self, convo_id: ConversationId) -> Result, ChatError> { + fn load_convo(&mut self, convo_id: ConversationId) -> Result, ChatError> { let record = self .store .borrow() @@ -204,8 +270,37 @@ impl Context { record.local_convo_id, record.remote_convo_id, )?; - Ok(Conversation::Private(private_convo)) + Ok(Box::new(private_convo)) } + ConversationKind::GroupV1 => Ok(Box::new( + self.pq_inbox + .load_mls_convo(&mut self.client_ctx, record.local_convo_id)?, + )), + ConversationKind::Unknown(_) => Err(ChatError::BadBundleValue(format!( + "unsupported conversation type: {}", + record.kind.as_str() + ))), + } + } + + fn load_group_convo( + &mut self, + convo_id: ConversationId, + ) -> Result>, ChatError> { + let record = self + .store + .borrow() + .load_conversation(convo_id)? + .ok_or_else(|| ChatError::NoConvo(convo_id.into()))?; + + match record.kind { + ConversationKind::PrivateV1 => { + Err(ChatError::NoConvo("This is not a group convo".into())) + } + ConversationKind::GroupV1 => Ok(Box::new( + self.pq_inbox + .load_mls_convo(&mut self.client_ctx, record.local_convo_id)?, + )), ConversationKind::Unknown(_) => Err(ChatError::BadBundleValue(format!( "unsupported conversation type: {}", record.kind.as_str() @@ -214,17 +309,40 @@ impl Context { } } +impl GroupInitializer + for Context +{ + fn on_new_group_convo( + &self, + convo: impl crate::conversation::GroupConvo, + ) -> Result<(), ChatError> { + todo!() + } +} + #[cfg(test)] mod tests { + use std::{ + any::Any, + ops::{Deref, DerefMut}, + }; + use sqlite::{ChatStorage, StorageConfig}; use storage::{ConversationStore, IdentityStore}; use tempfile::tempdir; + use crate::{ + test_utils::{EphemeralRegistry, LocalBroadcaster, MemStore}, + utils::hex_trunc, + }; + use super::*; + type TestContext = Context; + fn send_and_verify( - sender: &mut Context, - receiver: &mut Context, + sender: &mut TestContext, + receiver: &mut TestContext, convo_id: ConversationId, content: &[u8], ) { @@ -238,10 +356,171 @@ mod tests { assert!(!received.is_new_convo); // Check that `is_new_convo` is FALSE } + // Simple client Functionality for testing + struct Client { + inner: Context, + on_content: Option>, + } + + impl Client { + fn init( + ctx: Context, + cb: Option, + ) -> Self { + Client { + inner: ctx, + on_content: cb.map(|f| Box::new(f) as Box), + } + } + + fn process_messages(&mut self) { + while let Some(data) = self.client_ctx.ds().poll() { + let res = self.handle_payload(&data).unwrap(); + if let Some(cb) = &self.on_content { + match res { + Some(content_data) => cb(content_data), + None => continue, + } + } + } + } + + fn convo( + &mut self, + convo_id: &str, + ) -> Box> { + // TODO: (P1) Convos are being copied somewhere, which means hanging on to a reference causes state desync + self.load_group_convo(convo_id).unwrap() + } + } + + impl Deref for Client { + type Target = Context; + + fn deref(&self) -> &Self::Target { + &self.inner + } + } + + impl DerefMut for Client { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } + } + + // Higher order function to handle printing + fn pretty_print(prefix: impl Into) -> Box { + let prefix = prefix.into(); + return Box::new(move |c: ContentData| { + let cid = hex_trunc(c.conversation_id.as_bytes()); + let content = String::from_utf8(c.data).unwrap(); + println!("{} ({}) {}", prefix, cid, content) + }); + } + + fn process(clients: &mut Vec) { + for client in clients { + client.process_messages(); + } + } + + #[test] + fn create_group() { + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + + let saro_ctx = + Context::new_with_name("saro", ds.new_consumer(), rs.clone(), MemStore::new()).unwrap(); + let raya_ctx = + Context::new_with_name("raya", ds.clone(), rs.clone(), MemStore::new()).unwrap(); + + let mut clients = vec![ + Client::init(saro_ctx, Some(pretty_print(" Saro "))), + Client::init(raya_ctx, Some(pretty_print(" Raya "))), + ]; + + const SARO: usize = 0; + const RAYA: usize = 1; + + let raya_id = clients[RAYA].account_id(); + let s_convo = clients[SARO] + .create_group_convo(&[raya_id.as_ref()]) + .unwrap(); + + let CONVO_ID = s_convo.id(); + + // Raya can read this message because + // 1) It was sent after add_members was committed, and + // 2) LocalBroadcaster provides historical messages. + + clients[SARO] + .convo(CONVO_ID) + .send_content( + &mut clients[SARO].client_ctx, + b"ok who broke the group chat again", + ) + .unwrap(); + + // clients[SARO].process_messages(); + process(&mut clients); + + clients[RAYA] + .convo(CONVO_ID) + .send_content( + &mut clients[RAYA].client_ctx, + b"it was literally working five minutes ago", + ) + .unwrap(); + + // clients[SARO].process_messages(); + process(&mut clients); + + let pax_ctx = Context::new_with_name("pax", ds, rs, MemStore::new()).unwrap(); + clients.push(Client::init(pax_ctx, Some(pretty_print(" Pax")))); + const PAX: usize = 2; + + let pax_id = clients[PAX].account_id(); + clients[SARO] + .convo(CONVO_ID) + .add_member(&mut clients[SARO].client_ctx, &[pax_id.as_ref()]) + .unwrap(); + + // clients[SARO].process_messages(); + process(&mut clients); + + clients[PAX] + .convo(CONVO_ID) + .send_content( + &mut clients[PAX].client_ctx, + b"ngl the key rotation is cooked", + ) + .unwrap(); + + // clients[SARO].process_messages(); + + process(&mut clients); + + clients[SARO] + .convo(CONVO_ID) + .send_content( + &mut clients[SARO].client_ctx, + b"bro we literally just added you to the group ", + ) + .unwrap(); + + process(&mut clients); + // process(&mut clients); + } + #[test] fn ctx_integration() { - let mut saro = Context::new_with_name("saro", ChatStorage::in_memory()); - let mut raya = Context::new_with_name("raya", ChatStorage::in_memory()); + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + + let mut saro = + Context::new_with_name("saro", ds.clone(), rs.clone(), ChatStorage::in_memory()) + .unwrap(); + let mut raya = Context::new_with_name("raya", ds, rs, ChatStorage::in_memory()).unwrap(); // Raya creates intro bundle and sends to Saro let bundle = raya.create_intro_bundle().unwrap(); @@ -274,8 +553,10 @@ mod tests { #[test] fn identity_persistence() { + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); let store1 = ChatStorage::new(StorageConfig::InMemory).unwrap(); - let ctx1 = Context::new_with_name("alice", store1); + let ctx1 = Context::new_with_name("alice", ds, rs, store1).unwrap(); let pubkey1 = ctx1._identity.public_key(); let name1 = ctx1.installation_name().to_string(); @@ -291,8 +572,10 @@ mod tests { let db_path = dir.path().join("chat.sqlite"); let db_path = db_path.to_string_lossy().into_owned(); + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); let store = ChatStorage::new(StorageConfig::File(db_path.clone())).unwrap(); - let ctx = Context::new_from_store("alice", store).unwrap(); + let ctx = Context::new_from_store("alice", ds, rs, store).unwrap(); let pubkey = ctx._identity.public_key(); drop(ctx); @@ -305,8 +588,12 @@ mod tests { #[test] fn conversation_metadata_persistence() { - let mut alice = Context::new_with_name("alice", ChatStorage::in_memory()); - let mut bob = Context::new_with_name("bob", ChatStorage::in_memory()); + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + let mut alice = + Context::new_with_name("alice", ds.clone(), rs.clone(), ChatStorage::in_memory()) + .unwrap(); + let mut bob = Context::new_with_name("bob", ds, rs, ChatStorage::in_memory()).unwrap(); let bundle = alice.create_intro_bundle().unwrap(); let intro = Introduction::try_from(bundle.as_slice()).unwrap(); @@ -323,8 +610,12 @@ mod tests { #[test] fn conversation_full_flow() { - let mut alice = Context::new_with_name("alice", ChatStorage::in_memory()); - let mut bob = Context::new_with_name("bob", ChatStorage::in_memory()); + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + let mut alice = + Context::new_with_name("alice", ds.clone(), rs.clone(), ChatStorage::in_memory()) + .unwrap(); + let mut bob = Context::new_with_name("bob", ds, rs, ChatStorage::in_memory()).unwrap(); let bundle = alice.create_intro_bundle().unwrap(); let intro = Introduction::try_from(bundle.as_slice()).unwrap(); @@ -364,4 +655,38 @@ mod tests { .expect("bob should receive"); assert_eq!(content.data, b"alice reply"); } + + #[test] + fn bcast_test() { + let mut a = LocalBroadcaster::new(); + let mut b = a.new_consumer(); + + a.subscribe("a".into()).unwrap(); + b.subscribe("b".into()).unwrap(); + + { + let e = AddressedEnvelope { + delivery_address: "a".into(), + data: (1..4).collect(), + }; + a.publish(e.clone()).unwrap(); + + let result = a.poll(); + assert!(result.unwrap() == e.data); + assert!(a.poll().is_none()); + } + + { + let e = AddressedEnvelope { + delivery_address: "b".into(), + data: (4..10).collect(), + }; + a.publish(e.clone()).unwrap(); + + dbg!(&b); + let result = b.poll(); + assert!(result.unwrap() == e.data); + assert!(b.poll().is_none()); + } + } } diff --git a/core/conversations/src/conversation.rs b/core/conversations/src/conversation.rs index 1580d78..c172e97 100644 --- a/core/conversations/src/conversation.rs +++ b/core/conversations/src/conversation.rs @@ -1,12 +1,18 @@ +pub mod group_v1; mod privatev1; -use crate::types::{AddressedEncryptedPayload, ContentData}; +use crate::{ + DeliveryService, RegistrationService, + ctx::ClientCtx, + types::{AddressedEncryptedPayload, ContentData}, +}; use chat_proto::logoschat::encryption::EncryptedPayload; use std::fmt::Debug; use std::sync::Arc; -use storage::{ConversationKind, ConversationStore, RatchetStore}; +use storage::{ChatStore, ConversationKind, ConversationStore, RatchetStore}; pub use crate::errors::ChatError; +pub use group_v1::{GroupV1Convo, IdentityProvider, LogosMlsProvider}; pub use privatev1::PrivateV1Convo; pub type ConversationId<'a> = &'a str; @@ -36,6 +42,29 @@ pub trait Convo: Id + Debug { fn convo_type(&self) -> ConversationKind; } +pub trait GroupConvo: Convo { + fn add_member( + &mut self, + ctx: &mut ClientCtx, + members: &[&str], + ) -> Result<(), ChatError>; + + // Default implementation which dispatches envelopes to the DeliveryService + fn send_content( + &mut self, + ctx: &mut ClientCtx, + content: &[u8], + ) -> Result<(), ChatError> { + let payloads = self.send_message(content)?; + for payload in payloads { + ctx.ds() + .publish(payload.into_envelope(self.id().into())) + .map_err(|e| ChatError::Delivery(e.to_string()))?; + } + Ok(()) + } +} + pub enum Conversation { Private(PrivateV1Convo), } diff --git a/core/conversations/src/conversation/group_v1.rs b/core/conversations/src/conversation/group_v1.rs new file mode 100644 index 0000000..9bd35a6 --- /dev/null +++ b/core/conversations/src/conversation/group_v1.rs @@ -0,0 +1,433 @@ +use std::cell::{Ref, RefCell}; +use std::rc::Rc; + +use blake2::{Blake2b, Digest, digest::consts::U6}; +use crypto::Ed25519VerifyingKey; +use openmls::prelude::*; +use openmls::{prelude::tls_codec::Deserialize, treesync::RatchetTree}; +use openmls_libcrux_crypto::Provider as LibcruxProvider; + +use openmls::prelude::MlsMessageBodyIn; +use openmls_traits::signatures::Signer as OpenMlsSigner; +use openmls_traits::storage::StorageProvider; +use prost::Message; + +use crate::{ + AddressedEnvelope, DeliveryService, RegistrationService, + conversation::{ChatError, ConversationId, Convo, GroupConvo, Id}, + ctx::ClientCtx, + types::{AddressedEncryptedPayload, ContentData}, +}; +use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload}; +use storage::{ChatStore, ConversationKind}; + +pub trait IdentityProvider: OpenMlsSigner { + fn friendly_name(&self) -> String; + fn public_key(&self) -> Ed25519VerifyingKey; + // fn installation_key() -> u8; +} + +pub trait MlsInitializer { + fn invite_to_group_v1( + &self, + ctx: &mut ClientCtx, + account_id: &str, + welcome: &MlsMessageOut, + // ratchet_tree: RatchetTree, // Embedded + ) -> Result<(), ChatError>; +} + +pub trait MlsCtx { + type IDENT: IdentityProvider; + type INIT: MlsInitializer; + + fn ident(&self) -> &Self::IDENT; + fn provider(&self) -> Ref<'_, LibcruxProvider>; + fn init(&self) -> &Self::INIT; + + // Build an MLS Credential from the supplied IdentityProvider + fn get_credential(&self) -> CredentialWithKey; +} + +pub trait LogosMlsProvider: OpenMlsProvider {} + +pub trait GroupMlsStorageV1 { + fn save_state(&self, state: &[u8]); + fn load_state(&self) -> Vec; +} + +pub struct GroupV1Convo { + ctx: Rc>, + pub(crate) mls_group: MlsGroup, // TODO: (!) Fix Visibility + convo_id: String, +} + +impl std::fmt::Debug for GroupV1Convo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("GroupV1Convo") + .field("name", &self.ctx.borrow().ident().friendly_name()) + .field("convo_id", &self.convo_id) + .field("mls_epoch", &self.mls_group.epoch()) + .finish_non_exhaustive() + } +} + +impl GroupV1Convo { + pub fn new(ctx: Ctx, ds: &mut DS) -> Self { + let config = Self::mls_create_config(); + let ctx = Rc::new(RefCell::new(ctx)); + let mls_group = { + let ctx_ref = ctx.borrow(); + MlsGroup::new( + &*ctx_ref.provider(), + ctx_ref.ident(), + &config, + ctx_ref.get_credential(), + ) + .unwrap() + }; + let convo_id = hex::encode(mls_group.group_id().as_slice()); + Self::subscribe(ds, &convo_id); + + println!( + "@ Create Convo: {}. {}. d:{} dc:{}", + ctx.borrow().ident().friendly_name(), + convo_id, + Self::delivery_address_from_id(&convo_id), + Self::ctrl_delivery_address_from_id(&convo_id) + ); + Self { + ctx, + mls_group, + convo_id, + } + } + + pub fn new_from_welcome( + ctx: Rc>, + ds: &mut DS, + welcome: Welcome, + ) -> Self { + let mls_group = { + let ctx_borrow = ctx.borrow(); + let provider = ctx_borrow.provider(); + + StagedWelcome::build_from_welcome(&*provider, &Self::mls_join_config(), welcome) + .unwrap() + .build() + .unwrap() + .into_group(&*provider) + .unwrap() + }; + + let convo_id = hex::encode(mls_group.group_id().as_slice()); + Self::subscribe(ds, &convo_id); + + println!( + "@ Welcome Convo: I:{}. {}. d:{} dc:{}", + ctx.borrow().ident().friendly_name(), + convo_id, + Self::delivery_address_from_id(&convo_id), + Self::ctrl_delivery_address_from_id(&convo_id) + ); + + GroupV1Convo { + ctx, + mls_group, + convo_id, + } + } + + pub fn load( + ctx: Rc>, + ds: &mut DS, + convo_id: String, + group_id: GroupId, + ) -> Result { + let Some(mls_group) = MlsGroup::load(ctx.borrow().provider().storage(), &group_id) + .map_err(ChatError::generic)? + else { + return Err(ChatError::NoConvo("mls group not found".into())); + }; + + // println!( + // "\n>>> {}. {:?}", + // ctx.borrow().ident().friendly_name(), + // mls_group + // ); + Self::subscribe(ds, &convo_id); + + Ok(GroupV1Convo { + ctx, + mls_group, + convo_id, + }) + } + + fn subscribe(ds: &mut DS, convo_id: &str) -> Result<(), ChatError> { + ds.subscribe(Self::delivery_address_from_id(&convo_id)) + .map_err(ChatError::generic)?; + ds.subscribe(Self::ctrl_delivery_address_from_id(&convo_id)) + .map_err(ChatError::generic)?; + + Ok(()) + } + + pub fn ratchet_tree(&self) -> RatchetTree { + self.mls_group.export_ratchet_tree() + } + + fn mls_create_config() -> MlsGroupCreateConfig { + MlsGroupCreateConfig::builder() + .ciphersuite(Ciphersuite::MLS_256_XWING_CHACHA20POLY1305_SHA256_Ed25519) + .use_ratchet_tree_extension(true) // This is handy for now, until there is central store for this data + .build() + } + + fn mls_join_config() -> MlsGroupJoinConfig { + MlsGroupJoinConfig::builder().build() + } + + fn delivery_address_from_id(convo_id: &str) -> String { + let hash = Blake2b::::new() + .chain_update("delivery_addr|") + .chain_update(convo_id) + .finalize(); + hex::encode(hash) + } + + fn delivery_address(&self) -> String { + Self::delivery_address_from_id(&self.convo_id) + } + + fn ctrl_delivery_address_from_id(convo_id: &str) -> String { + let hash = Blake2b::::new() + .chain_update("ctrl_delivery_addr|") + .chain_update(convo_id) + .finalize(); + hex::encode(hash) + } + + fn ctrl_delivery_address(&self) -> String { + Self::ctrl_delivery_address_from_id(&self.convo_id) + } + + fn key_package_for_account( + &self, + ctx: &mut ClientCtx, + ident: &str, + ) -> Result { + let retrieved_bytes = ctx + .contact_registry() + .retreive(ident) + .map_err(|e| ChatError::Generic(e.to_string()))?; + + // dbg!(ctx.contact_registry()); + let Some(keypkg_bytes) = retrieved_bytes else { + return Err(ChatError::Protocol("Contact Not Found".into())); + }; + + let key_package_in = KeyPackageIn::tls_deserialize(&mut keypkg_bytes.as_slice())?; + let keypkg = key_package_in.validate( + self.ctx.borrow().provider().crypto(), + ProtocolVersion::Mls10, + )?; //TODO: P3 - Hardcoded Protocol Version + Ok(keypkg) + } + + fn save_state(&self, store: &CS) {} +} + +impl Id for GroupV1Convo { + fn id(&self) -> ConversationId<'_> { + &self.convo_id + } +} + +impl Convo for GroupV1Convo { + fn send_message( + &mut self, + content: &[u8], + ) -> Result, ChatError> { + let ctx_ref = self.ctx.borrow(); + let provider = ctx_ref.provider(); + let mls_message_out = self + .mls_group + .create_message(&*provider, ctx_ref.ident(), content) + .unwrap(); + + let a = AddressedEncryptedPayload { + delivery_address: self.delivery_address(), + data: EncryptedPayload { + encryption: Some( + chat_proto::logoschat::encryption::encrypted_payload::Encryption::Plaintext( + Plaintext { + payload: mls_message_out.to_bytes().unwrap().into(), + }, + ), + ), + }, + }; + + Ok(vec![a]) + } + + fn handle_frame( + &mut self, + encoded_payload: EncryptedPayload, + ) -> Result, ChatError> { + use chat_proto::logoschat::encryption::encrypted_payload::Encryption; + + let bytes = match encoded_payload.encryption { + Some(Encryption::Plaintext(pt)) => pt.payload, + _ => { + return Err(ChatError::ProtocolExpectation( + "None", + "Some(Encryption::Plaintext)".into(), + )); + } + }; + + let mls_message = + MlsMessageIn::tls_deserialize_exact_bytes(&bytes).map_err(ChatError::generic)?; + + let protocol_message: ProtocolMessage = mls_message + .try_into_protocol_message() + .map_err(ChatError::generic)?; + + let ctx_borrow = self.ctx.borrow(); + let provider = ctx_borrow.provider(); + + if protocol_message.epoch() < self.mls_group.epoch() { + // TODO: (!) Determine how to handle messages for old epochs. Minimally log this. + return Ok(None); + } + + let processed = self + .mls_group + .process_message(&*provider, protocol_message) + .map_err(ChatError::generic)?; + + match processed.into_content() { + ProcessedMessageContent::ApplicationMessage(msg) => Ok(Some(ContentData { + conversation_id: hex::encode(self.mls_group.group_id().as_slice()), + data: msg.into_bytes(), + is_new_convo: false, + })), + ProcessedMessageContent::StagedCommitMessage(commit) => { + self.mls_group + .merge_staged_commit(&*provider, *commit) + .map_err(ChatError::generic)?; + Ok(None) + } + x => { + println!("Unhabled Message ttype {:?}", x); + Ok(None) + } + } + } + + fn remote_id(&self) -> String { + // "group_remote_id".into() + todo!() + } + + fn convo_type(&self) -> storage::ConversationKind { + ConversationKind::GroupV1 + } +} + +impl + GroupConvo for GroupV1Convo +{ + fn add_member( + &mut self, + ctx: &mut ClientCtx, + members: &[&str], + ) -> Result<(), ChatError> { + // add_members returns: + // commit — the Commit message Alice broadcasts to all members + // welcome — the Welcome message sent privately to each new joiner + // _group_info — used for external joins; ignore for now + let ctx_ref = self.ctx.borrow(); + let provider = ctx_ref.provider(); + + if members.len() > 50 { + // This is a temporary limit that originates from the the De-MLS epoch time. + return Err(ChatError::Protocol( + "Cannot add more than 50 Members at a time".into(), + )); + } + + // Get the Keypacakages and transpose any errors. + // The account_id is kept so invites can be addressed properly + let keypkgs = members + .iter() + // .map(|ident| self.key_package_for_account(ctx, ident)) + .map(|ident| self.key_package_for_account(ctx, ident)) + .collect::, ChatError>>()?; + + let (commit, welcome, _group_info) = self + .mls_group + .add_members(&*provider, ctx_ref.ident(), keypkgs.iter().as_slice()) + .unwrap(); + + self.mls_group.merge_pending_commit(&*provider).unwrap(); + + // TODO: (P3) Evaluate privacy/performance implications of an aggregated Welcome for multiple users + for account_id in members { + ctx_ref + .init() + .invite_to_group_v1(ctx, account_id, &welcome)?; + } + + let encrypted_payload = EncryptedPayload { + encryption: Some(encrypted_payload::Encryption::Plaintext(Plaintext { + payload: commit.to_bytes()?.into(), + })), + }; + + let addr_enc_payload = AddressedEncryptedPayload { + delivery_address: self.ctrl_delivery_address(), + data: encrypted_payload, + }; + // Prepare commit message + // TODO: (P1) Make GroupConvos agnostic to framing so its less error prone and more + let env = addr_enc_payload.into_envelope(self.convo_id.clone()); + + ctx.ds() + .publish(env) + .map_err(|e| ChatError::Generic(format!("Publish: {e}"))) + } +} + +use prost::Oneof; + +#[derive(Clone, PartialEq, Message)] +pub struct GroupV1Frame { + #[prost(string, tag = "1")] + pub sender: String, + + #[prost(uint64, tag = "2")] + pub timestamp: u64, + + // oneof field — optional, holds one variant + #[prost(oneof = "FrameType", tags = "3, 4, 5")] + pub payload: Option, +} + +#[derive(Clone, PartialEq, Oneof)] +pub enum FrameType { + #[prost(bytes, tag = "3")] + Welcome(Vec), +} + +#[cfg(test)] +mod tests { + use crypto::PrivateKey; + + use super::*; + + #[test] + fn test_mls() {} +} diff --git a/core/conversations/src/ctx.rs b/core/conversations/src/ctx.rs new file mode 100644 index 0000000..f7e3d10 --- /dev/null +++ b/core/conversations/src/ctx.rs @@ -0,0 +1,36 @@ +use std::{ + cell::{Ref, RefCell, RefMut}, + rc::Rc, +}; + +use storage::ChatStore; + +use crate::{DeliveryService, RegistrationService}; + +pub struct ClientCtx { + ds: DS, + contact_registry: RS, + convo_store: Rc>, // TODO: (P2) Remove Rc/Refcell +} + +impl<'a, DS: DeliveryService, RS: RegistrationService, CS: ChatStore> ClientCtx { + pub fn new(ds: DS, contact_registry: RS, convo_store: Rc>) -> Self { + Self { + ds, + contact_registry, + convo_store, + } + } + + pub fn ds(&'a mut self) -> &'a mut DS { + &mut self.ds + } + + pub fn contact_registry(&'a mut self) -> &'a mut RS { + &mut self.contact_registry + } + + pub fn store(&'a self) -> RefMut { + self.convo_store.borrow_mut() + } +} diff --git a/core/conversations/src/errors.rs b/core/conversations/src/errors.rs index 664cdd3..5b2bfe4 100644 --- a/core/conversations/src/errors.rs +++ b/core/conversations/src/errors.rs @@ -1,3 +1,4 @@ +use openmls::{framing::errors::MlsMessageError, prelude::tls_codec}; pub use thiserror::Error; use storage::StorageError; @@ -26,6 +27,23 @@ pub enum ChatError { UnsupportedConvoType(String), #[error("storage error: {0}")] Storage(#[from] StorageError), + #[error("mls error: {0}")] + MlsMessageError(#[from] MlsMessageError), + #[error("TlsCodec: {0}")] + TlsCodec(#[from] tls_codec::Error), + #[error("generic: {0}")] + Generic(String), + #[error("KeyPackage: {0}")] + KeyPackage(#[from] openmls::prelude::KeyPackageVerifyError), + #[error("Delivery: {0}")] + Delivery(String), +} + +impl ChatError { + // This is a stopgap until there is a proper error system in place + pub fn generic(e: impl ToString) -> Self { + Self::Generic(e.to_string()) + } } #[derive(Error, Debug)] diff --git a/core/conversations/src/external_traits.rs b/core/conversations/src/external_traits.rs new file mode 100644 index 0000000..8f6f3fa --- /dev/null +++ b/core/conversations/src/external_traits.rs @@ -0,0 +1,35 @@ +use std::{cell::RefCell, fmt::Debug, fmt::Display, rc::Rc}; + +use crate::types::AddressedEnvelope; + +pub struct Service { + inner: Rc>, +} + +impl Service { + pub fn new(t: T) -> Self { + Self { + inner: Rc::new(RefCell::new(t)), + } + } + + fn with(&self, f: F) -> R + where + F: FnOnce(&T) -> R, + { + let inner = self.inner.borrow(); + f(&inner) + } +} + +pub trait DeliveryService { + type Error: Display; + fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Self::Error>; + fn subscribe(&mut self, delivery_address: String) -> Result<(), Self::Error>; +} + +pub trait RegistrationService: Debug { + type Error: Display; + fn register(&mut self, identity: String, key_bundle: Vec) -> Result<(), Self::Error>; + fn retreive(&self, identity: &str) -> Result>, Self::Error>; +} diff --git a/core/conversations/src/inbox_v2.rs b/core/conversations/src/inbox_v2.rs new file mode 100644 index 0000000..b94bffa --- /dev/null +++ b/core/conversations/src/inbox_v2.rs @@ -0,0 +1,383 @@ +use std::any::Any; +use std::cell::{Ref, RefCell}; +use std::collections::HashMap; +use std::rc::Rc; + +use chat_proto::logoschat::envelope::EnvelopeV1; +use crypto::Ed25519SigningKey; +use crypto::Ed25519VerifyingKey; +use crypto::PublicKey; +use openmls::prelude::tls_codec::Serialize; +use openmls::{prelude::*, treesync::RatchetTree}; +use openmls_libcrux_crypto::Provider as LibcruxProvider; +use openmls_traits::signatures::Signer; +use openmls_traits::storage::StorageProvider; +use prost::{Message, Oneof}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use storage::ChatStore; +use storage::ConversationMeta; +use storage::ConversationStore; + +use crate::AddressedEnvelope; +use crate::ChatError; +use crate::DeliveryService; +use crate::RegistrationService; +use crate::conversation::GroupConvo; +use crate::conversation::group_v1::{MlsCtx, MlsInitializer}; +use crate::conversation::{GroupV1Convo, IdentityProvider}; +use crate::ctx::ClientCtx; +use crate::types::AddressedEncryptedPayload; +use crate::utils::hash_size::Testing; +use crate::utils::{blake2b_hex, hash_size, hex_trunc}; + +static ACCOUNT_COUNTER: AtomicUsize = AtomicUsize::new(0); + +const ACCOUNT_NAMES: &[&str] = &["Saro", "Raya", "Pax"]; + +#[derive(Clone)] +pub struct LogosAccount { + id: String, + signing_key: Ed25519SigningKey, + // x25519_key: crypto::PrivateKey, +} + +impl LogosAccount { + pub fn new() -> Self { + let idx = ACCOUNT_COUNTER.fetch_add(1, Ordering::Relaxed); + + let id = if idx < ACCOUNT_NAMES.len() { + ACCOUNT_NAMES[idx % ACCOUNT_NAMES.len()].to_string() + } else { + use rand_core::{OsRng, RngCore}; + const CHARSET: &[u8] = + b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + let i: String = (0..8) + .map(|_| { + let idx = (OsRng.next_u32() as usize) % CHARSET.len(); + CHARSET[idx] as char + }) + .collect(); + i + }; + Self { + id, + signing_key: Ed25519SigningKey::generate(), + // x25519_key: crypto::PrivateKey::random(), + } + } +} + +impl Signer for LogosAccount { + fn sign(&self, payload: &[u8]) -> Result, openmls_traits::signatures::SignerError> { + Ok(self.signing_key.sign(payload).as_ref().to_vec()) + } + + fn signature_scheme(&self) -> SignatureScheme { + SignatureScheme::ED25519 + } +} + +impl IdentityProvider for LogosAccount { + fn friendly_name(&self) -> String { + self.id.clone() + } + + fn public_key(&self) -> Ed25519VerifyingKey { + self.signing_key.verifying_key() + } +} + +#[derive(Clone)] +pub struct MlsContext { + pub ident_provider: LogosAccount, + pub initializer: Init, + provider: Rc>, +} + +impl MlsCtx for MlsContext { + type IDENT = LogosAccount; + type INIT = Init; + + fn ident(&self) -> &LogosAccount { + &self.ident_provider + } + + fn provider(&self) -> Ref<'_, LibcruxProvider> { + self.provider.borrow() + } + + fn init(&self) -> &Init { + &self.initializer + } + + // Build an MLS Credential from the supplied IdentityProvider + fn get_credential(&self) -> CredentialWithKey { + CredentialWithKey { + credential: BasicCredential::new(self.ident_provider.friendly_name().into()).into(), + signature_key: self.ident_provider.public_key().as_ref().into(), + } + } +} + +pub trait GroupInitializer { + fn on_new_group_convo(&self, convo: impl GroupConvo) -> Result<(), ChatError>; +} + +#[derive(Clone)] +pub struct InboxV2 { + pub account: LogosAccount, // TODO: (!) don't expose account + mls_provider: Rc>, + convo_map: HashMap>, +} + +impl<'a> InboxV2 { + pub fn new() -> Self { + let account = LogosAccount::new(); + let mls_provider = Rc::new(RefCell::new(LibcruxProvider::new().unwrap())); + Self { + account, + mls_provider, + convo_map: HashMap::new(), + } + } + + pub fn register( + &mut self, + ctx: &mut ClientCtx, + ) -> Result<(), ChatError> { + let keypackage = self.create_keypackage()?; + + let bytes = keypackage.tls_serialize_detached()?; + + ctx.contact_registry() + .register(self.account.friendly_name(), bytes) + .map_err(ChatError::generic)?; //TODO: (P1) create an address scheme instead of using names + Ok(()) + } + + pub fn delivery_address(&self) -> String { + Self::delivery_address_for_account_id(&self.account.id) + } + + pub fn id(&self) -> String { + Self::conversation_id_for_account_id(&self.account.id) + } + + pub fn create_group_v1( + &self, + ctx: &mut ClientCtx, + ) -> Result>, ChatError> { + let convo = GroupV1Convo::new(self.assemble_ctx(), ctx.ds()); + Ok(convo) + } + + pub fn handle_frame( + &self, + ctx: &mut ClientCtx, + payload_bytes: &[u8], + ) -> Result<(), ChatError> { + let inbox_frame = InboxV2Frame::decode(payload_bytes)?; + + let Some(payload) = inbox_frame.payload else { + return Err(ChatError::BadParsing("InboxV2Payload missing")); + }; + + match payload { + InviteType::GroupV1(group_v1_heavy_invite) => { + self.handle_heavy_invite(ctx, group_v1_heavy_invite) + } + } + } + + fn assemble_ctx(&self) -> MlsContext { + MlsContext { + ident_provider: self.account.clone(), + initializer: self.clone(), + provider: self.mls_provider.clone(), + } + } + + fn persist_convo( + &self, + ctx: &'a ClientCtx, + convo: impl GroupConvo, + ) -> Result<(), ChatError> { + // TODO: (P2) Remove remote_convo_id this is an implementation detail specific to PrivateV1 + // TODO: (P3) Implement From for ConversationMeta + let meta = ConversationMeta { + local_convo_id: convo.id().to_string(), + remote_convo_id: "0".into(), + kind: storage::ConversationKind::GroupV1, + }; + ctx.store().save_conversation(&meta)?; + // TODO: (P1) Persist state + Ok(()) + } + + fn handle_heavy_invite( + &self, + ctx: &mut ClientCtx, + invite: GroupV1HeavyInvite, + ) -> Result<(), ChatError> { + let (msg_in, rest) = MlsMessageIn::tls_deserialize_bytes(invite.welcome_bytes.as_slice())?; + + let MlsMessageBodyIn::Welcome(welcome) = msg_in.extract() else { + return Err(ChatError::ProtocolExpectation( + "something else", + "Welcome".into(), + )); + }; + + let mls_ctx = Rc::new(RefCell::new(self.assemble_ctx())); + + let convo = GroupV1Convo::new_from_welcome(mls_ctx, ctx.ds(), welcome); + self.persist_convo(ctx, convo) + } + + fn create_keypackage(&self) -> Result { + let mls_ctx = self.assemble_ctx(); + + let capabilities = Capabilities::builder() + .ciphersuites(vec![ + Ciphersuite::MLS_256_XWING_CHACHA20POLY1305_SHA256_Ed25519, + ]) + .extensions(vec![ExtensionType::ApplicationId]) + .build(); + let a = KeyPackage::builder() + .leaf_node_capabilities(capabilities) + .build( + Ciphersuite::MLS_256_XWING_CHACHA20POLY1305_SHA256_Ed25519, + &*mls_ctx.provider(), + &self.account, + mls_ctx.get_credential(), + ) + .expect("Failed to build KeyPackage"); + + Ok(a.key_package().clone()) + } + + fn delivery_address_for_account_id(account_id: &str) -> String { + blake2b_hex::(&["InboxV2|", "delivery_address|", account_id]) + } + + fn conversation_id_for_account_id(account_id: &str) -> String { + blake2b_hex::(&["InboxV2|", "conversation_id|", account_id]) + } + + fn dbg_mls_store(ctx: &MlsContext, prefix: impl AsRef) { + let pa = ctx.provider.borrow(); + let data = &*pa.storage().values.read().unwrap(); + + println!(":::MlsProviderStore::: -- {}", prefix.as_ref()); + for key in data.keys() { + let val = match data.get(key) { + Some(x) => format!("{} ({})", hex_trunc(x), blake2b_hex::(&[x])), + None => "None".into(), + }; + + println!(". {:?}: {:?}", hex_trunc(key), val) + } + } + + pub fn load_mls_convo( + &self, + ctx: &mut ClientCtx, + convo_id: String, + ) -> Result>, ChatError> { + let mls_ctx = self.assemble_ctx(); + + let group_id_bytes = hex::decode(&convo_id).map_err(ChatError::generic)?; + let group_id = GroupId::from_slice(&group_id_bytes); + let convo = + GroupV1Convo::load(Rc::new(RefCell::new(mls_ctx)), ctx.ds(), convo_id, group_id)?; + + Ok(convo) + } +} + +impl MlsInitializer for InboxV2 { + fn invite_to_group_v1( + &self, + ctx: &mut ClientCtx, + account_id: &str, + welcome: &MlsMessageOut, + ) -> Result<(), ChatError> { + let invite = GroupV1HeavyInvite { + welcome_bytes: welcome.to_bytes()?, + }; + + let frame = InboxV2Frame { + payload: Some(InviteType::GroupV1(invite)), + }; + + let envelope = EnvelopeV1 { + conversation_hint: Self::conversation_id_for_account_id(account_id), + salt: 0, + payload: frame.encode_to_vec().into(), + }; + + let outbound_msg = AddressedEnvelope { + delivery_address: Self::delivery_address_for_account_id(account_id), + data: envelope.encode_to_vec(), + }; + + ctx.ds().publish(outbound_msg).map_err(ChatError::generic)?; + Ok(()) + } +} + +#[derive(Clone, PartialEq, Message)] +pub struct InboxV2Frame { + #[prost(oneof = "InviteType", tags = "1")] + pub payload: Option, +} + +#[derive(Clone, PartialEq, Oneof)] +pub enum InviteType { + #[prost(message, tag = "1")] + GroupV1(GroupV1HeavyInvite), +} + +#[derive(Clone, PartialEq, Message)] +pub struct GroupV1HeavyInvite { + #[prost(bytes, tag = "1")] + pub welcome_bytes: Vec, +} + +#[cfg(test)] +mod tests { + use super::*; + use openmls_traits::signatures::Signer; + + struct Account { + name: String, + signing_key: crypto::Ed25519SigningKey, + } + + impl Signer for Account { + fn sign(&self, payload: &[u8]) -> Result, openmls_traits::signatures::SignerError> { + Ok(self.signing_key.sign(payload).as_ref().to_vec()) + } + + fn signature_scheme(&self) -> SignatureScheme { + SignatureScheme::ED25519 + } + } + + impl IdentityProvider for Account { + fn friendly_name(&self) -> String { + self.name.clone() + } + + fn public_key(&self) -> Ed25519VerifyingKey { + todo!() + } + } + + #[test] + fn dev() { + // let inbox = InboxV2::new(...); + // let group = inbox.create_group_v1().unwrap(); + // let bytes = group.send("hello".as_bytes()); + } +} diff --git a/core/conversations/src/lib.rs b/core/conversations/src/lib.rs index 8de610b..a979083 100644 --- a/core/conversations/src/lib.rs +++ b/core/conversations/src/lib.rs @@ -2,15 +2,22 @@ mod account; mod context; mod conversation; mod crypto; +mod ctx; mod errors; +mod external_traits; mod inbox; +mod inbox_v2; mod proto; mod types; mod utils; pub use account::LogosAccount; +#[cfg(test)] +mod test_utils; + pub use context::{Context, ConversationIdOwned, Introduction}; pub use errors::ChatError; +pub use external_traits::{DeliveryService, RegistrationService}; pub use sqlite::ChatStorage; pub use sqlite::StorageConfig; pub use types::{AddressedEnvelope, ContentData}; diff --git a/core/conversations/src/test_utils.rs b/core/conversations/src/test_utils.rs new file mode 100644 index 0000000..7384230 --- /dev/null +++ b/core/conversations/src/test_utils.rs @@ -0,0 +1,310 @@ +use std::{ + cell::RefCell, + collections::{HashMap, HashSet, VecDeque}, + fmt::Debug, + io::Cursor, + rc::Rc, + sync::{Arc, Mutex}, +}; + +use storage::{ChatStore, ConversationMeta, ConversationStore, IdentityStore}; +use storage::{EphemeralKeyStore, RatchetStore}; + +use crate::{ + AddressedEnvelope, DeliveryService, RegistrationService, + utils::{blake2b_hex, hash_size::Testing, hex_trunc}, +}; + +type Callback = Rc)>; + +type Filter = Box) -> bool>; + +#[derive(Debug)] +struct BroadcasterShared { + /// Per-address message queue; all published messages are appended here. + messages: VecDeque, + base_index: usize, +} + +impl BroadcasterShared { + pub fn read(&self, cursor: usize) -> Option<&T> { + self.messages.get(cursor + self.base_index) + } + + pub fn tail(&self) -> usize { + self.messages.len() + self.base_index + } +} + +#[derive(Clone, Debug)] +pub struct LocalBroadcaster { + shared: Rc>>, + cursor: usize, + subscriptions: HashSet, + outbound_msgs: Vec, +} + +impl LocalBroadcaster { + pub fn new() -> Self { + let shared = Rc::new(RefCell::new(BroadcasterShared { + messages: VecDeque::new(), + base_index: 0, + })); + + let cursor = shared.borrow().tail(); + Self { + shared, + cursor, + subscriptions: HashSet::new(), + outbound_msgs: Vec::new(), + } + } + + /// Returns a new consumer that shares the same message store but has its + /// own independent cursor — it starts from the beginning of each address + /// queue regardless of what any other consumer has already processed. + pub fn new_consumer(&self) -> Self { + let mut inner = self.shared.clone(); + let cursor = inner.borrow().tail(); + Self { + shared: inner, + cursor, + subscriptions: HashSet::new(), + outbound_msgs: Vec::new(), + } + } + + /// Pulls all messages this consumer has not yet seen on `address`, + /// applying any registered filter. Advances the cursor so the same + /// messages are not returned again. + pub fn poll(&mut self) -> Option> { + loop { + let next = self.cursor; + match self.shared.borrow().read(next) { + None => return None, + Some(ae) => { + self.cursor = next + 1; + if self.subscriptions.contains(ae.delivery_address.as_str()) + && self.is_inbound(ae) + { + return Some(ae.data.clone()); + } + } + } + } + } + + pub fn clear(&mut self) { + self.cursor = self.shared.borrow().tail(); + } + + fn msg_id(msg: &AddressedEnvelope) -> String { + blake2b_hex::(&[msg.data.as_slice()]) + } + + fn is_inbound(&self, msg: &AddressedEnvelope) -> bool { + let mid = Self::msg_id(msg); + !self.outbound_msgs.contains(&mid) + } +} + +impl DeliveryService for LocalBroadcaster { + type Error = String; + + fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Self::Error> { + self.outbound_msgs.push(Self::msg_id(&envelope)); + self.shared.borrow_mut().messages.push_back(envelope); + + Ok(()) + } + + fn subscribe(&mut self, delivery_address: String) -> Result<(), Self::Error> { + // Strict temporal ordering of subscriptions is not enforced. + // Subscruptions are evaluated on polling, not when the message is published + self.subscriptions.insert(delivery_address); + Ok(()) + } +} + +/// A Contact Registry used for Tests. +/// This implementation stores bundle bytes and then returns them when +/// retreived +/// + +#[derive(Clone)] +pub struct EphemeralRegistry { + registry: Arc>>>, +} + +impl EphemeralRegistry { + pub fn new() -> Self { + Self { + registry: Arc::new(Mutex::new(HashMap::new())), + } + } +} + +impl Debug for EphemeralRegistry { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let registry = self.registry.lock().unwrap(); + let truncated: Vec<(&String, String)> = registry + .iter() + .map(|(k, v)| { + let hex = if v.len() <= 8 { + hex::encode(v) + } else { + format!( + "{}..{}", + hex::encode(&v[..4]), + hex::encode(&v[v.len() - 4..]) + ) + }; + (k, hex) + }) + .collect(); + f.debug_struct("EphemeralRegistry") + .field("registry", &truncated) + .finish() + } +} + +impl RegistrationService for EphemeralRegistry { + type Error = String; + + fn register(&mut self, identity: String, key_bundle: Vec) -> Result<(), Self::Error> { + self.registry.lock().unwrap().insert(identity, key_bundle); + Ok(()) + } + + fn retreive(&self, identity: &str) -> Result>, Self::Error> { + Ok(self.registry.lock().unwrap().get(identity).cloned()) + } +} + +pub struct MemStore { + convos: HashMap, + state: HashMap>, +} + +impl MemStore { + pub fn new() -> Self { + Self { + convos: HashMap::new(), + state: HashMap::new(), + } + } +} + +impl ConversationStore for MemStore { + fn save_conversation( + &mut self, + meta: &storage::ConversationMeta, + ) -> Result<(), storage::StorageError> { + self.convos + .insert(meta.local_convo_id.clone(), meta.clone()); + Ok(()) + } + + fn load_conversation( + &self, + local_convo_id: &str, + ) -> Result, storage::StorageError> { + let a = self.convos.get(local_convo_id).cloned(); + Ok(a) + } + + fn remove_conversation(&mut self, local_convo_id: &str) -> Result<(), storage::StorageError> { + todo!() + } + + fn load_conversations(&self) -> Result, storage::StorageError> { + Ok(self.convos.values().cloned().collect()) + } + + fn has_conversation(&self, local_convo_id: &str) -> Result { + Ok(self.convos.contains_key(local_convo_id)) + } +} + +impl IdentityStore for MemStore { + fn load_identity(&self) -> Result, storage::StorageError> { + // todo!() + Ok(None) + } + + fn save_identity(&mut self, identity: &crypto::Identity) -> Result<(), storage::StorageError> { + // todo!() + Ok(()) + } +} + +impl EphemeralKeyStore for MemStore { + fn save_ephemeral_key( + &mut self, + public_key_hex: &str, + private_key: &crypto::PrivateKey, + ) -> Result<(), storage::StorageError> { + todo!() + } + + fn load_ephemeral_key( + &self, + public_key_hex: &str, + ) -> Result, storage::StorageError> { + todo!() + } + + fn remove_ephemeral_key(&mut self, public_key_hex: &str) -> Result<(), storage::StorageError> { + todo!() + } +} + +impl RatchetStore for MemStore { + fn save_ratchet_state( + &mut self, + conversation_id: &str, + state: &storage::RatchetStateRecord, + skipped_keys: &[storage::SkippedKeyRecord], + ) -> Result<(), storage::StorageError> { + todo!() + } + + fn load_ratchet_state( + &self, + conversation_id: &str, + ) -> Result { + todo!() + } + + fn load_skipped_keys( + &self, + conversation_id: &str, + ) -> Result, storage::StorageError> { + todo!() + } + + fn has_ratchet_state(&self, conversation_id: &str) -> Result { + todo!() + } + + fn delete_ratchet_state(&mut self, conversation_id: &str) -> Result<(), storage::StorageError> { + todo!() + } + + fn cleanup_old_skipped_keys( + &mut self, + max_age_secs: i64, + ) -> Result { + todo!() + } +} + +// impl GroupMlsStorageV1 for MemStore { +// fn save_state(&self, convo_id: &str, state: &[u8]) { +// self.state.insert(convo_id, state) +// } + +// fn load_state(&self, convo_id: &str) -> Vec { +// self.state.get(convo_id).unwrap().clone() +// } +// } diff --git a/core/conversations/src/types.rs b/core/conversations/src/types.rs index e8ecc70..b60ab84 100644 --- a/core/conversations/src/types.rs +++ b/core/conversations/src/types.rs @@ -6,13 +6,51 @@ use crate::proto::{self, Message}; // This struct represents Outbound data. // It wraps an encoded payload with a delivery address, so it can be handled by the delivery service. +#[derive(Clone)] pub struct AddressedEnvelope { pub delivery_address: String, pub data: Vec, } +impl AddressedEnvelope { + pub fn new(delivery_address: String, convo_id: String, data: &[u8]) -> Self { + let envelope = proto::EnvelopeV1 { + // TODO: conversation_id should be obscured + conversation_hint: convo_id, + salt: 0, + payload: proto::Bytes::copy_from_slice(data), + }; + + AddressedEnvelope { + delivery_address, + data: envelope.encode_to_vec(), + } + } +} + +impl Debug for AddressedEnvelope { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let data = &self.data; + let hex = if data.len() <= 8 { + hex::encode(data) + } else { + format!( + "{}..{}", + hex::encode(&data[..4]), + hex::encode(&data[data.len() - 4..]) + ) + }; + + f.debug_struct("AddressedEnvelope") + .field("addr", &self.delivery_address) + .field("data", &hex) + .finish() + } +} + // This struct represents the result of processed inbound data. // It wraps content payload with a conversation_id +#[derive(Debug)] pub struct ContentData { pub conversation_id: String, pub data: Vec, @@ -30,17 +68,11 @@ pub(crate) struct AddressedEncryptedPayload { impl AddressedEncryptedPayload { // Wrap in an envelope and prepare for transmission pub fn into_envelope(self, convo_id: String) -> AddressedEnvelope { - let envelope = proto::EnvelopeV1 { - // TODO: conversation_id should be obscured - conversation_hint: convo_id, - salt: 0, - payload: proto::Bytes::copy_from_slice(self.data.encode_to_vec().as_slice()), - }; - - AddressedEnvelope { - delivery_address: self.delivery_address, - data: envelope.encode_to_vec(), - } + AddressedEnvelope::new( + self.delivery_address, + convo_id, + self.data.encode_to_vec().as_slice(), + ) } } diff --git a/core/conversations/src/utils.rs b/core/conversations/src/utils.rs index 306e898..4649c3d 100644 --- a/core/conversations/src/utils.rs +++ b/core/conversations/src/utils.rs @@ -6,3 +6,66 @@ pub fn timestamp_millis() -> i64 { .unwrap() .as_millis() as i64 } + +/// Track hash sizes in use across the crate. +pub mod hash_size { + use blake2::digest::{ + consts::U64, + generic_array::ArrayLength, + typenum::{IsLessOrEqual, NonZero}, + }; + + pub trait HashLen + where + >::Output: NonZero, + { + type Size: ArrayLength + IsLessOrEqual; + } + + /// This macro generates HashLen for the given typenum::length + macro_rules! hash_sizes { + ($($(#[$attr:meta])* $name:ident => $size:ty),* $(,)?) => { + $( + $(#[$attr])* + pub struct $name; + impl HashLen for $name { type Size = $size; } + )* + }; + } + + use blake2::digest::consts::{U4, U8, U18}; + hash_sizes! { + /// Generic hash size for tests and debug + Testing => U4, + /// Account ID hash length + AccountId => U8, + ConversationId => U18, + } +} + +use blake2::{Blake2b, Digest}; +/// This establishes an easy to use wrapper for hashes in this crate. +/// The output is formatted string of hex characters +pub fn blake2b_hex(components: &[impl AsRef<[u8]>]) -> String { + //A + let mut hash = Blake2b::::new(); + + for c in components { + hash.update(c); + } + + let output = hash.finalize(); + hex::encode(output) +} + +pub fn hex_trunc(data: &[u8]) -> String { + if data.len() <= 8 { + hex::encode(data) + } else { + format!( + "{}..{}", + hex::encode(&data[..4]), + hex::encode(&data[data.len() - 4..]) + ) + } +} diff --git a/core/crypto/src/lib.rs b/core/crypto/src/lib.rs index 1759091..754a37c 100644 --- a/core/crypto/src/lib.rs +++ b/core/crypto/src/lib.rs @@ -4,6 +4,8 @@ mod signatures; mod x3dh; mod xeddsa_sign; +use thiserror::Error; + pub use identity::Identity; pub use keys::{PrivateKey, PublicKey, SymmetricKey32}; pub use signatures::{Ed25519SigningKey, Ed25519VerifyingKey}; diff --git a/core/sqlite/src/lib.rs b/core/sqlite/src/lib.rs index 8c57bb3..8382598 100644 --- a/core/sqlite/src/lib.rs +++ b/core/sqlite/src/lib.rs @@ -10,8 +10,8 @@ use std::collections::HashSet; use crypto::{Identity, PrivateKey}; use rusqlite::{Transaction, params}; use storage::{ - ConversationKind, ConversationMeta, ConversationStore, EphemeralKeyStore, IdentityStore, - RatchetStateRecord, RatchetStore, SkippedKeyRecord, StorageError, + ChatStore, ConversationKind, ConversationMeta, ConversationStore, EphemeralKeyStore, + IdentityStore, RatchetStateRecord, RatchetStore, SkippedKeyRecord, StorageError, }; use zeroize::Zeroize; @@ -532,6 +532,16 @@ fn blob_to_array( .map_err(|_| invalid_blob_length(field, N, actual)) } +// impl GroupMlsStorageV1 for ChatStorage { +// fn save_state(&self, convo_id: &str, state: &[u8]) { +// todo!() +// } + +// fn load_state(&self, convo_id: &str) -> Vec { +// todo!() +// } +// } + #[cfg(test)] mod tests { use storage::{ diff --git a/core/storage/src/store.rs b/core/storage/src/store.rs index a24ad25..d53b16c 100644 --- a/core/storage/src/store.rs +++ b/core/storage/src/store.rs @@ -27,6 +27,7 @@ pub trait EphemeralKeyStore { pub enum ConversationKind { PrivateV1, Unknown(String), + GroupV1, } impl ConversationKind { @@ -34,6 +35,7 @@ impl ConversationKind { match self { Self::PrivateV1 => "private_v1", Self::Unknown(value) => value.as_str(), + Self::GroupV1 => "group_v1", } } } @@ -42,6 +44,7 @@ impl From<&str> for ConversationKind { fn from(value: &str) -> Self { match value { "private_v1" => Self::PrivateV1, + "group_v1" => Self::GroupV1, other => Self::Unknown(other.to_string()), } } @@ -120,6 +123,8 @@ pub trait RatchetStore { fn cleanup_old_skipped_keys(&mut self, max_age_secs: i64) -> Result; } +// TODO: (P2) this should be defined in the ConversationType + pub trait ChatStore: IdentityStore + EphemeralKeyStore + ConversationStore + RatchetStore {} impl ChatStore for T where T: IdentityStore + EphemeralKeyStore + ConversationStore + RatchetStore diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index c95a0f1..50e6099 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -1,9 +1,9 @@ use libchat::{ AddressedEnvelope, ChatError, ChatStorage, ContentData, Context, ConversationIdOwned, - Introduction, StorageConfig, + DeliveryService, Introduction, StorageConfig, }; -use crate::{delivery::DeliveryService, errors::ClientError}; +use crate::errors::ClientError; pub struct ChatClient { ctx: Context, diff --git a/crates/client/src/delivery.rs b/crates/client/src/delivery.rs deleted file mode 100644 index 853de0d..0000000 --- a/crates/client/src/delivery.rs +++ /dev/null @@ -1,6 +0,0 @@ -use libchat::AddressedEnvelope; - -pub trait DeliveryService { - type Error: std::fmt::Debug; - fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Self::Error>; -} diff --git a/crates/client/src/delivery_in_process.rs b/crates/client/src/delivery_in_process.rs index ae9d03a..6cceb25 100644 --- a/crates/client/src/delivery_in_process.rs +++ b/crates/client/src/delivery_in_process.rs @@ -1,4 +1,4 @@ -use crate::{AddressedEnvelope, delivery::DeliveryService}; +use crate::{AddressedEnvelope, DeliveryService}; use std::collections::HashMap; use std::convert::Infallible; use std::sync::{Arc, RwLock}; diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index cfd9074..a0cac6f 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -1,12 +1,12 @@ mod client; -mod delivery; mod delivery_in_process; mod errors; pub use client::ChatClient; -pub use delivery::DeliveryService; pub use delivery_in_process::{Cursor, InProcessDelivery, MessageBus}; pub use errors::ClientError; // Re-export types callers need to interact with ChatClient -pub use libchat::{AddressedEnvelope, ContentData, ConversationIdOwned, StorageConfig}; +pub use libchat::{ + AddressedEnvelope, ContentData, ConversationIdOwned, DeliveryService, StorageConfig, +}; diff --git a/extensions/delivery/Cargo.toml b/extensions/delivery/Cargo.toml new file mode 100644 index 0000000..f55ccf8 --- /dev/null +++ b/extensions/delivery/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "delivery" +version = "0.1.0" +edition = "2024" + +[dependencies] diff --git a/extensions/delivery/src/lib.rs b/extensions/delivery/src/lib.rs new file mode 100644 index 0000000..2b14bd0 --- /dev/null +++ b/extensions/delivery/src/lib.rs @@ -0,0 +1,5 @@ +mod local_bcast; + +use local_bcast::LocalBroadcast; + +pub use LocalBroadcast; diff --git a/extensions/delivery/src/local_bcast.rs b/extensions/delivery/src/local_bcast.rs new file mode 100644 index 0000000..20b1335 --- /dev/null +++ b/extensions/delivery/src/local_bcast.rs @@ -0,0 +1,58 @@ +use libchat::DeliveryService; + +type Callback = Box)>; + +#[derive(Clone)] +struct LocalBroadcaster { + subscribers: Arc>>>, +} + +impl LocalBroadcaster { + pub fn new() -> Self { + Self { + subscribers: Arc::new(Mutex::new(HashMap::new())), + } + } +} + +impl DeliveryService for LocalBroadcaster { + type Error = String; + + fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Self::Error> { + let callbacks = self + .subscribers + .lock() + .unwrap() + .remove(&envelope.delivery_address) + .unwrap_or_default(); + + for cb in callbacks { + cb(envelope.delivery_address.clone(), &envelope.data); + } + + Ok(()) + } + + fn subscribe(&mut self, delivery_address: String, cb: F) -> Result<(), Self::Error> + where + F: FnOnce(String, &Vec) + 'static, + { + self.subscribers + .lock() + .unwrap() + .entry(delivery_address) + .or_default() + .push(Box::new(cb)); + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + + #[test] + fn local_bcast() { + let ds = LocalBroadcast::new(); + } +}