diff --git a/Cargo.lock b/Cargo.lock index 274c8bb..bd44e97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -278,7 +278,7 @@ dependencies = [ "arboard", "base64", "clap", - "client", + "client 0.1.0 (git+https://github.com/logos-messaging/libchat?rev=39bf26756448dd16ddff89a6c0054f79236494aa)", "crossterm 0.29.0", "ratatui", "serde", @@ -300,14 +300,26 @@ dependencies = [ name = "chat-sqlite" version = "0.1.0" dependencies = [ - "crypto", + "crypto 0.1.0", "hex", "rusqlite", - "storage", + "storage 0.1.0", "tempfile", "zeroize", ] +[[package]] +name = "chat-sqlite" +version = "0.1.0" +source = "git+https://github.com/logos-messaging/libchat?rev=39bf26756448dd16ddff89a6c0054f79236494aa#39bf26756448dd16ddff89a6c0054f79236494aa" +dependencies = [ + "crypto 0.1.0 (git+https://github.com/logos-messaging/libchat?rev=39bf26756448dd16ddff89a6c0054f79236494aa)", + "hex", + "rusqlite", + "storage 0.1.0 (git+https://github.com/logos-messaging/libchat?rev=39bf26756448dd16ddff89a6c0054f79236494aa)", + "zeroize", +] + [[package]] name = "cipher" version = "0.4.4" @@ -363,18 +375,29 @@ checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" name = "client" version = "0.1.0" dependencies = [ - "chat-sqlite", - "libchat", + "chat-sqlite 0.1.0", + "components", + "libchat 0.1.0", "tempfile", "thiserror", ] +[[package]] +name = "client" +version = "0.1.0" +source = "git+https://github.com/logos-messaging/libchat?rev=39bf26756448dd16ddff89a6c0054f79236494aa#39bf26756448dd16ddff89a6c0054f79236494aa" +dependencies = [ + "chat-sqlite 0.1.0 (git+https://github.com/logos-messaging/libchat?rev=39bf26756448dd16ddff89a6c0054f79236494aa)", + "libchat 0.1.0 (git+https://github.com/logos-messaging/libchat?rev=39bf26756448dd16ddff89a6c0054f79236494aa)", + "thiserror", +] + [[package]] name = "client-ffi" version = "0.1.0" dependencies = [ - "client", - "libchat", + "client 0.1.0", + "libchat 0.1.0", "safer-ffi", ] @@ -407,6 +430,16 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "components" +version = "0.1.0" +dependencies = [ + "crypto 0.1.0", + "hex", + "libchat 0.1.0", + "storage 0.1.0", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -540,6 +573,22 @@ dependencies = [ "zeroize", ] +[[package]] +name = "crypto" +version = "0.1.0" +source = "git+https://github.com/logos-messaging/libchat?rev=39bf26756448dd16ddff89a6c0054f79236494aa#39bf26756448dd16ddff89a6c0054f79236494aa" +dependencies = [ + "ed25519-dalek", + "generic-array 1.3.5", + "hkdf", + "rand_core 0.6.4", + "sha2", + "thiserror", + "x25519-dalek", + "xeddsa", + "zeroize", +] + [[package]] name = "crypto-bigint" version = "0.5.5" @@ -714,18 +763,35 @@ version = "0.0.1" dependencies = [ "blake2", "chacha20poly1305", - "chat-sqlite", + "chat-sqlite 0.1.0", "hkdf", "rand 0.9.4", "rand_core 0.6.4", "serde", - "storage", + "storage 0.1.0", "tempfile", "thiserror", "x25519-dalek", "zeroize", ] +[[package]] +name = "double-ratchets" +version = "0.0.1" +source = "git+https://github.com/logos-messaging/libchat?rev=39bf26756448dd16ddff89a6c0054f79236494aa#39bf26756448dd16ddff89a6c0054f79236494aa" +dependencies = [ + "blake2", + "chacha20poly1305", + "hkdf", + "rand 0.9.4", + "rand_core 0.6.4", + "serde", + "storage 0.1.0 (git+https://github.com/logos-messaging/libchat?rev=39bf26756448dd16ddff89a6c0054f79236494aa)", + "thiserror", + "x25519-dalek", + "zeroize", +] + [[package]] name = "ecdsa" version = "0.16.9" @@ -1313,6 +1379,17 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "integration_tests_core" +version = "0.1.0" +dependencies = [ + "chat-sqlite 0.1.0", + "components", + "libchat 0.1.0", + "storage 0.1.0", + "tempfile", +] + [[package]] name = "inventory" version = "0.3.24" @@ -1397,17 +1474,41 @@ dependencies = [ "base64", "blake2", "chat-proto", - "chat-sqlite", - "crypto", - "double-ratchets", + "chat-sqlite 0.1.0", + "components", + "crypto 0.1.0", + "double-ratchets 0.0.1", + "hex", + "openmls", + "openmls_libcrux_crypto 0.3.1", + "openmls_traits 0.5.0", + "prost", + "rand_core 0.6.4", + "safer-ffi", + "storage 0.1.0", + "tempfile", + "thiserror", + "x25519-dalek", +] + +[[package]] +name = "libchat" +version = "0.1.0" +source = "git+https://github.com/logos-messaging/libchat?rev=39bf26756448dd16ddff89a6c0054f79236494aa#39bf26756448dd16ddff89a6c0054f79236494aa" +dependencies = [ + "base64", + "blake2", + "chat-proto", + "chat-sqlite 0.1.0 (git+https://github.com/logos-messaging/libchat?rev=39bf26756448dd16ddff89a6c0054f79236494aa)", + "crypto 0.1.0 (git+https://github.com/logos-messaging/libchat?rev=39bf26756448dd16ddff89a6c0054f79236494aa)", + "double-ratchets 0.0.1 (git+https://github.com/logos-messaging/libchat?rev=39bf26756448dd16ddff89a6c0054f79236494aa)", "hex", "openmls", "openmls_traits 0.5.0", "prost", "rand_core 0.6.4", "safer-ffi", - "storage", - "tempfile", + "storage 0.1.0 (git+https://github.com/logos-messaging/libchat?rev=39bf26756448dd16ddff89a6c0054f79236494aa)", "thiserror", "x25519-dalek", ] @@ -2895,7 +2996,16 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" name = "storage" version = "0.1.0" dependencies = [ - "crypto", + "crypto 0.1.0", + "thiserror", +] + +[[package]] +name = "storage" +version = "0.1.0" +source = "git+https://github.com/logos-messaging/libchat?rev=39bf26756448dd16ddff89a6c0054f79236494aa#39bf26756448dd16ddff89a6c0054f79236494aa" +dependencies = [ + "crypto 0.1.0 (git+https://github.com/logos-messaging/libchat?rev=39bf26756448dd16ddff89a6c0054f79236494aa)", "thiserror", ] diff --git a/Cargo.toml b/Cargo.toml index db5f220..3d6a969 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,15 +8,31 @@ members = [ "core/crypto", "core/double-ratchets", "core/storage", + "core/integration_tests_core", "crates/client", "crates/client-ffi", "bin/chat-cli", + "extensions/components", +] + +default-members = [ + "core/sqlite", + "core/conversations", + "core/crypto", + "core/double-ratchets", + "core/storage", + "core/integration_tests_core", + "crates/client", + "crates/client-ffi", ] [workspace.dependencies] -blake2 = "0.10" -libchat = { path = "core/conversations" } -storage = { path = "core/storage" } + blake2 = "0.10" + crypto = { path = "core/crypto" } + libchat = { path = "core/conversations" } + logoschat_components = {package="components", path ="extensions/components"} + sqlite = { path = "core/sqlite"} + storage = { path = "core/storage" } # Panicking across FFI boundaries is UB; abort is the correct strategy for a # C FFI library. diff --git a/bin/chat-cli/Cargo.toml b/bin/chat-cli/Cargo.toml index 615992a..8e16d7f 100644 --- a/bin/chat-cli/Cargo.toml +++ b/bin/chat-cli/Cargo.toml @@ -8,7 +8,8 @@ name = "chat-cli" path = "src/main.rs" [dependencies] -client = { path = "../../crates/client" } +# Reference a specific commit so updates to the Core does not break examples +client = { git = "https://github.com/logos-messaging/libchat", rev = "39bf26756448dd16ddff89a6c0054f79236494aa" } ratatui = "0.29" crossterm = "0.29" diff --git a/core/conversations/Cargo.toml b/core/conversations/Cargo.toml index f5492c4..512cfa6 100644 --- a/core/conversations/Cargo.toml +++ b/core/conversations/Cargo.toml @@ -21,7 +21,9 @@ thiserror = "2.0.17" x25519-dalek = { version = "2.0.1", features = ["static_secrets", "reusable_secrets", "getrandom"] } storage = { path = "../storage" } openmls = { version = "0.8.1", features = ["libcrux-provider"] } +openmls_libcrux_crypto = "0.3.1" openmls_traits = "0.5.0" [dev-dependencies] +components = { package = "components", path = "../../extensions/components" } tempfile = "3" diff --git a/core/conversations/src/account.rs b/core/conversations/src/account.rs index 06e2914..161710f 100644 --- a/core/conversations/src/account.rs +++ b/core/conversations/src/account.rs @@ -1,14 +1,15 @@ -use crypto::Ed25519SigningKey; +use crypto::{Ed25519SigningKey, Ed25519VerifyingKey}; use openmls::prelude::SignatureScheme; use openmls_traits::signatures::Signer; -use crate::types::AccountId; +use crate::{conversation::IdentityProvider, types::AccountId}; /// Logos Account represents a single account across /// multiple installations and services. pub struct LogosAccount { id: AccountId, signing_key: Ed25519SigningKey, + verifying_key: Ed25519VerifyingKey, } impl LogosAccount { @@ -17,9 +18,11 @@ impl LogosAccount { /// TODO: (P1) Remove once implementation is ready. pub fn new_test(explicit_id: impl Into) -> Self { let signing_key = Ed25519SigningKey::generate(); + let verifying_key = signing_key.verifying_key(); Self { id: AccountId::new(explicit_id.into()), signing_key, + verifying_key, } } @@ -38,3 +41,13 @@ impl Signer for LogosAccount { SignatureScheme::ED25519 } } + +impl IdentityProvider for LogosAccount { + fn friendly_name(&self) -> String { + self.id.to_string() + } + + fn public_key(&self) -> &Ed25519VerifyingKey { + &self.verifying_key + } +} diff --git a/core/conversations/src/context.rs b/core/conversations/src/context.rs index 9a48de7..a01b4f7 100644 --- a/core/conversations/src/context.rs +++ b/core/conversations/src/context.rs @@ -1,38 +1,56 @@ +use std::cell::{Ref, RefMut}; use std::sync::Arc; use std::{cell::RefCell, rc::Rc}; +use crate::account::LogosAccount; +use crate::conversation::{Convo, GroupConvo}; + +use crate::{DeliveryService, RegistrationService}; +use crate::{ + conversation::{Conversation, Id, PrivateV1Convo}, + errors::ChatError, + inbox::Inbox, + inbox_v2::InboxV2, + proto::{EncryptedPayload, EnvelopeV1, Message}, + types::{AccountId, AddressedEnvelope, ContentData}, +}; use crypto::{Identity, PublicKey}; use storage::{ChatStore, ConversationKind}; -use crate::account::LogosAccount; -use crate::{ - conversation::{Conversation, ConversationId, Convo, Id, PrivateV1Convo}, - errors::ChatError, - inbox::Inbox, - proto::{EncryptedPayload, EnvelopeV1, Message}, - types::{AddressedEnvelope, ContentData}, -}; - -pub use crate::conversation::ConversationIdOwned; +pub use crate::conversation::{ConversationId, ConversationIdOwned}; pub use crate::inbox::Introduction; // This is the main entry point to the conversations api. // Ctx manages lifetimes of objects to process and generate payloads. -pub struct Context { - _identity: Rc, - inbox: Inbox, - store: Rc>, - #[allow(unused)] // TODO: (P2) Remove once Account integrated in future PR. - account: LogosAccount, +pub struct Context { + identity: Rc, + ds: Rc>, + store: Rc>, + inbox: Inbox, + pq_inbox: InboxV2, } -impl Context { +impl Context +where + DS: DeliveryService + 'static, + RS: RegistrationService + 'static, + CS: ChatStore + 'static, +{ /// Opens or creates a Context with the given storage configuration. /// /// If an identity exists in storage, it will be restored. /// Otherwise, a new identity will be created with the given name and saved. - pub fn new_from_store(name: impl Into, store: S) -> Result { + pub fn new_from_store( + name: impl Into, + delivery: DS, + registration: RS, + store: CS, + ) -> Result { let name = name.into(); + + // Services for sharing with Converastions/Inboxes + let ds = Rc::new(RefCell::new(delivery)); + let contact_registry = Rc::new(RefCell::new(registration)); let store = Rc::new(RefCell::new(store)); // Load or create identity @@ -47,43 +65,97 @@ impl Context { let identity = Rc::new(identity); let inbox = Inbox::new(Rc::clone(&store), Rc::clone(&identity)); + let pq_inbox = InboxV2::new( + LogosAccount::new_test(name), + ds.clone(), + contact_registry.clone(), + store.clone(), + ); + + // Subscribe + ds.borrow_mut() + .subscribe(&pq_inbox.delivery_address()) + .map_err(ChatError::generic)?; + Ok(Self { - _identity: identity, - inbox, + identity, + ds, store, - account: LogosAccount::new_test(name.as_str()), + inbox, + pq_inbox, }) } /// Creates a new in-memory Context (for testing). /// /// Uses in-memory SQLite database. Each call creates a new isolated database. - pub fn new_with_name(name: impl Into, chat_store: S) -> Self { + pub fn new_with_name( + name: impl Into, + delivery: DS, + registration: RS, + chat_store: CS, + ) -> Result { let name = name.into(); let identity = Identity::new(&name); - let chat_store = Rc::new(RefCell::new(chat_store)); - chat_store + + // Services for sharing with Converastions/Inboxes + let ds = Rc::new(RefCell::new(delivery)); + let contact_registry = Rc::new(RefCell::new(registration)); + let store = Rc::new(RefCell::new(chat_store)); + + store .borrow_mut() .save_identity(&identity) .expect("in-memory storage should not fail"); let identity = Rc::new(identity); - let inbox = Inbox::new(Rc::clone(&chat_store), Rc::clone(&identity)); + let inbox = Inbox::new(store.clone(), Rc::clone(&identity)); + let mut pq_inbox = InboxV2::new( + LogosAccount::new_test(name), + ds.clone(), + contact_registry.clone(), + store.clone(), + ); - Self { - _identity: identity, + // TODO: (P2) Initialize Account in Context or upper client. + pq_inbox.register()?; + + ds.borrow_mut() + .subscribe(&pq_inbox.delivery_address()) + .map_err(ChatError::generic)?; + + Ok(Self { + identity, + ds, + store, + pq_inbox, inbox, - store: chat_store, - account: LogosAccount::new_test(name.as_str()), - } + }) + } + + pub fn ds(&self) -> RefMut<'_, DS> { + self.ds.borrow_mut() + } + + pub fn store(&self) -> Ref<'_, CS> { + self.store.borrow() + } + + pub fn identity(&self) -> &Identity { + &self.identity + } + + /// Returns the unique identifier associated with the account + pub fn account_id(&self) -> &AccountId { + self.pq_inbox.account_id() } pub fn installation_name(&self) -> &str { - self._identity.get_name() + self.identity.get_name() } pub fn installation_key(&self) -> PublicKey { - self._identity.public_key() + self.identity.public_key() } pub fn create_private_convo( @@ -96,7 +168,7 @@ impl Context { .invite_to_private_convo(remote_bundle, content, Rc::clone(&self.store)) .unwrap_or_else(|_| todo!("Log/Surface Error")); - let remote_id = Inbox::::inbox_identifier_for_key(*remote_bundle.installation_key()); + let remote_id = Inbox::::inbox_identifier_for_key(*remote_bundle.installation_key()); let payload_bytes = payloads .into_iter() .map(|p| p.into_envelope(remote_id.clone())) @@ -106,6 +178,25 @@ impl Context { Ok((convo_id, payload_bytes)) } + pub fn create_group_convo( + &mut self, + participants: &[&AccountId], + ) -> Result>, ChatError> { + // TODO: (P1) Ensure errors are handled propertly. This is a high chance for desynchronized state. + // MlsGroup persistence, conversation persistence, and invite delivery all happen seperately + let mut convo = self.pq_inbox.create_group_v1()?; + self.store + .borrow_mut() + .save_conversation(&storage::ConversationMeta { + local_convo_id: convo.id().to_string(), + remote_convo_id: "0".into(), + kind: ConversationKind::GroupV1, + })?; + convo.add_member(participants)?; + + Ok(Box::new(convo)) + } + pub fn list_conversations(&self) -> Result, ChatError> { let records = self.store.borrow().load_conversations()?; Ok(records @@ -119,19 +210,13 @@ impl Context { convo_id: ConversationId, content: &[u8], ) -> Result, ChatError> { - let convo = self.load_convo(convo_id)?; - - match convo { - Conversation::Private(mut convo) => { - let payloads = convo.send_message(content)?; - let remote_id = convo.remote_id(); - - Ok(payloads - .into_iter() - .map(|p| p.into_envelope(remote_id.clone())) - .collect()) - } - } + let mut convo = self.load_convo(convo_id)?; + let payloads = convo.send_message(content)?; + let remote_id = convo.remote_id(); + Ok(payloads + .into_iter() + .map(|p| p.into_envelope(remote_id.clone())) + .collect()) } // Decode bytes and send to protocol for processing. @@ -140,20 +225,30 @@ impl Context { // TODO: Impl Conversation hinting let convo_id = env.conversation_hint; - let enc = EncryptedPayload::decode(env.payload)?; + match convo_id { - c if c == self.inbox.id() => self.dispatch_to_inbox(enc), - c if self.store.borrow().has_conversation(&c)? => self.dispatch_to_convo(&c, enc), - _ => Ok(None), + c if c == self.inbox.id() => self.dispatch_to_inbox(&env.payload), + c if c == self.pq_inbox.id() => self.dispatch_to_inbox2(&env.payload), + c if self.store.borrow().has_conversation(&c)? => { + self.dispatch_to_convo(&c, &env.payload) + } + _ => Ok(Some(ContentData { + conversation_id: "".into(), + data: vec![], + is_new_convo: false, + })), } } // Dispatch encrypted payload to Inbox, and register the created Conversation fn dispatch_to_inbox( &mut self, - enc_payload: EncryptedPayload, + enc_payload_bytes: &[u8], ) -> Result, ChatError> { - let public_key_hex = Inbox::::extract_ephemeral_key_hex(&enc_payload)?; + // EncryptedPayloads are not used by GroupConvos at this time, else this can be performed in `handle_payload` + // TODO: (P1) reconcile envelope parsing between Covno and GroupConvo + let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?; + let public_key_hex = Inbox::::extract_ephemeral_key_hex(&enc_payload)?; let (convo, content) = self.inbox .handle_frame(enc_payload, &public_key_hex, Rc::clone(&self.store))?; @@ -168,20 +263,22 @@ impl Context { Ok(content) } + // Dispatch encrypted payload to Inbox, and register the created Conversation + fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result, ChatError> { + self.pq_inbox.handle_frame(payload)?; + + Ok(None) + } + // Dispatch encrypted payload to its corresponding conversation fn dispatch_to_convo( &mut self, convo_id: ConversationId, - enc_payload: EncryptedPayload, + enc_payload_bytes: &[u8], ) -> Result, ChatError> { - let convo = self.load_convo(convo_id)?; - - match convo { - Conversation::Private(mut convo) => { - let result = convo.handle_frame(enc_payload)?; - Ok(result) - } - } + let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?; + let mut convo = self.load_convo(convo_id)?; + convo.handle_frame(enc_payload) } pub fn create_intro_bundle(&mut self) -> Result, ChatError> { @@ -189,8 +286,15 @@ impl Context { Ok(intro.into()) } + pub fn get_convo( + &mut self, + convo_id: ConversationId, + ) -> Result>, ChatError> { + self.load_group_convo(convo_id) + } + /// Loads a conversation from DB by constructing it from metadata. - fn load_convo(&self, convo_id: ConversationId) -> Result, ChatError> { + fn load_convo(&mut self, convo_id: ConversationId) -> Result, ChatError> { let record = self .store .borrow() @@ -204,8 +308,35 @@ impl Context { record.local_convo_id, record.remote_convo_id, )?; - Ok(Conversation::Private(private_convo)) + Ok(Box::new(private_convo)) } + ConversationKind::GroupV1 => Ok(Box::new( + self.pq_inbox.load_mls_convo(record.local_convo_id)?, + )), + ConversationKind::Unknown(_) => Err(ChatError::BadBundleValue(format!( + "unsupported conversation type: {}", + record.kind.as_str() + ))), + } + } + + fn load_group_convo( + &mut self, + convo_id: ConversationId, + ) -> Result>, ChatError> { + let record = self + .store + .borrow() + .load_conversation(convo_id)? + .ok_or_else(|| ChatError::NoConvo(convo_id.into()))?; + + match record.kind { + ConversationKind::PrivateV1 => { + Err(ChatError::NoConvo("This is not a group convo".into())) + } + ConversationKind::GroupV1 => Ok(Box::new( + self.pq_inbox.load_mls_convo(record.local_convo_id)?, + )), ConversationKind::Unknown(_) => Err(ChatError::BadBundleValue(format!( "unsupported conversation type: {}", record.kind.as_str() @@ -213,155 +344,3 @@ impl Context { } } } - -#[cfg(test)] -mod tests { - use sqlite::{ChatStorage, StorageConfig}; - use storage::{ConversationStore, IdentityStore}; - use tempfile::tempdir; - - use super::*; - - fn send_and_verify( - sender: &mut Context, - receiver: &mut Context, - convo_id: ConversationId, - content: &[u8], - ) { - let payloads = sender.send_content(convo_id, content).unwrap(); - let payload = payloads.first().unwrap(); - let received = receiver - .handle_payload(&payload.data) - .unwrap() - .expect("expected content"); - assert_eq!(content, received.data.as_slice()); - assert!(!received.is_new_convo); // Check that `is_new_convo` is FALSE - } - - #[test] - fn ctx_integration() { - let mut saro = Context::new_with_name("saro", ChatStorage::in_memory()); - let mut raya = Context::new_with_name("raya", ChatStorage::in_memory()); - - // Raya creates intro bundle and sends to Saro - let bundle = raya.create_intro_bundle().unwrap(); - let intro = Introduction::try_from(bundle.as_slice()).unwrap(); - - // Saro initiates conversation with Raya - let mut content = vec![10]; - let (saro_convo_id, payloads) = saro.create_private_convo(&intro, &content).unwrap(); - - // Raya receives initial message - let payload = payloads.first().unwrap(); - let initial_content = raya - .handle_payload(&payload.data) - .unwrap() - .expect("expected initial content"); - - let raya_convo_id = initial_content.conversation_id; - assert_eq!(content, initial_content.data); - assert!(initial_content.is_new_convo); - - // Exchange messages back and forth - for _ in 0..10 { - content.push(content.last().unwrap() + 1); - send_and_verify(&mut raya, &mut saro, &raya_convo_id, &content); - - content.push(content.last().unwrap() + 1); - send_and_verify(&mut saro, &mut raya, &saro_convo_id, &content); - } - } - - #[test] - fn identity_persistence() { - let store1 = ChatStorage::new(StorageConfig::InMemory).unwrap(); - let ctx1 = Context::new_with_name("alice", store1); - let pubkey1 = ctx1._identity.public_key(); - let name1 = ctx1.installation_name().to_string(); - - // For persistence tests with file-based storage, we'd need a shared db. - // With in-memory, we just verify the identity was created. - assert_eq!(name1, "alice"); - assert!(!pubkey1.as_bytes().iter().all(|&b| b == 0)); - } - - #[test] - fn open_persists_new_identity() { - let dir = tempdir().unwrap(); - let db_path = dir.path().join("chat.sqlite"); - let db_path = db_path.to_string_lossy().into_owned(); - - let store = ChatStorage::new(StorageConfig::File(db_path.clone())).unwrap(); - let ctx = Context::new_from_store("alice", store).unwrap(); - let pubkey = ctx._identity.public_key(); - drop(ctx); - - let store = ChatStorage::new(StorageConfig::File(db_path)).unwrap(); - let persisted = store.load_identity().unwrap().unwrap(); - - assert_eq!(persisted.get_name(), "alice"); - assert_eq!(persisted.public_key(), pubkey); - } - - #[test] - fn conversation_metadata_persistence() { - let mut alice = Context::new_with_name("alice", ChatStorage::in_memory()); - let mut bob = Context::new_with_name("bob", ChatStorage::in_memory()); - - let bundle = alice.create_intro_bundle().unwrap(); - let intro = Introduction::try_from(bundle.as_slice()).unwrap(); - let (_, payloads) = bob.create_private_convo(&intro, b"hi").unwrap(); - - let payload = payloads.first().unwrap(); - let content = alice.handle_payload(&payload.data).unwrap().unwrap(); - assert!(content.is_new_convo); - - let convos = alice.store.borrow().load_conversations().unwrap(); - assert_eq!(convos.len(), 1); - assert_eq!(convos[0].kind.as_str(), "private_v1"); - } - - #[test] - fn conversation_full_flow() { - let mut alice = Context::new_with_name("alice", ChatStorage::in_memory()); - let mut bob = Context::new_with_name("bob", ChatStorage::in_memory()); - - let bundle = alice.create_intro_bundle().unwrap(); - let intro = Introduction::try_from(bundle.as_slice()).unwrap(); - let (bob_convo_id, payloads) = bob.create_private_convo(&intro, b"hello").unwrap(); - - let payload = payloads.first().unwrap(); - let content = alice.handle_payload(&payload.data).unwrap().unwrap(); - let alice_convo_id = content.conversation_id; - - let payloads = alice.send_content(&alice_convo_id, b"reply 1").unwrap(); - let payload = payloads.first().unwrap(); - bob.handle_payload(&payload.data).unwrap().unwrap(); - - let payloads = bob.send_content(&bob_convo_id, b"reply 2").unwrap(); - let payload = payloads.first().unwrap(); - alice.handle_payload(&payload.data).unwrap().unwrap(); - - // Verify conversation list - let convo_ids = alice.list_conversations().unwrap(); - assert_eq!(convo_ids.len(), 1); - - // Continue exchanging messages - let payloads = bob.send_content(&bob_convo_id, b"more messages").unwrap(); - let payload = payloads.first().unwrap(); - let content = alice - .handle_payload(&payload.data) - .expect("should decrypt") - .expect("should have content"); - assert_eq!(content.data, b"more messages"); - - // Alice can also send back - let payloads = alice.send_content(&alice_convo_id, b"alice reply").unwrap(); - let payload = payloads.first().unwrap(); - let content = bob - .handle_payload(&payload.data) - .unwrap() - .expect("bob should receive"); - assert_eq!(content.data, b"alice reply"); - } -} diff --git a/core/conversations/src/conversation.rs b/core/conversations/src/conversation.rs index 1580d78..702ca93 100644 --- a/core/conversations/src/conversation.rs +++ b/core/conversations/src/conversation.rs @@ -1,12 +1,18 @@ +pub mod group_v1; mod privatev1; -use crate::types::{AddressedEncryptedPayload, ContentData}; +use crate::{ + DeliveryService, + service_traits::KeyPackageProvider, + types::{AccountId, AddressedEncryptedPayload, ContentData}, +}; use chat_proto::logoschat::encryption::EncryptedPayload; use std::fmt::Debug; use std::sync::Arc; use storage::{ConversationKind, ConversationStore, RatchetStore}; pub use crate::errors::ChatError; +pub use group_v1::{GroupV1Convo, IdentityProvider}; pub use privatev1::PrivateV1Convo; pub type ConversationId<'a> = &'a str; @@ -36,6 +42,14 @@ pub trait Convo: Id + Debug { fn convo_type(&self) -> ConversationKind; } +pub trait GroupConvo: Convo { + fn add_member(&mut self, members: &[&AccountId]) -> Result<(), ChatError>; + + // This is intended to replace `send_message`. The trait change is that it automatically + // sends the payload directly. + fn send_content(&mut self, content: &[u8]) -> Result<(), ChatError>; +} + pub enum Conversation { Private(PrivateV1Convo), } diff --git a/core/conversations/src/conversation/group_v1.rs b/core/conversations/src/conversation/group_v1.rs new file mode 100644 index 0000000..eadc441 --- /dev/null +++ b/core/conversations/src/conversation/group_v1.rs @@ -0,0 +1,417 @@ +/// GroupV1 is a conversationType which provides effecient handling of multiple participants +/// Properties: +/// - Harvest Now Decrypt Later (HNDL) protection provided by XWING +/// - Multiple +use std::cell::RefCell; +use std::rc::Rc; + +use blake2::{Blake2b, Digest, digest::consts::U6}; +use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload}; +use crypto::Ed25519VerifyingKey; +use openmls::prelude::tls_codec::Deserialize; +use openmls::prelude::*; +use openmls_libcrux_crypto::Provider as LibcruxProvider; +use openmls_traits::signatures::Signer as OpenMlsSigner; +use storage::ConversationKind; + +use crate::types::AccountId; +use crate::{ + DeliveryService, + conversation::{ChatError, ConversationId, Convo, GroupConvo, Id}, + service_traits::KeyPackageProvider, + types::{AddressedEncryptedPayload, ContentData}, +}; + +/// Provides the identity information needed to participate in an MLS group. +/// +/// Implementors must also implement [`OpenMlsSigner`] so they can sign MLS +/// messages. The two methods here supply what [`MlsContext::get_credential`] +/// needs to build a [`CredentialWithKey`]: `friendly_name` becomes the +/// `BasicCredential` label and `public_key` becomes the signature-verification key. +pub trait IdentityProvider: OpenMlsSigner { + fn friendly_name(&self) -> String; + fn public_key(&self) -> &Ed25519VerifyingKey; +} + +/// Connects the MLS protocol engine to app-level identity and transport. +/// +/// `GroupV1Convo` is generic over this trait so the MLS logic stays +/// independent of how identities are stored or how invites are delivered. +/// Implementors supply: +/// - a [`LibcruxProvider`] for MLS crypto operations +/// - an [`IdentityProvider`] for signing and credential construction +/// - [`invite_user`] — the app-specific logic for routing a [`Welcome`] +/// message to a new member's inbox +pub trait MlsContext { + type IDENT: IdentityProvider; + + fn ident(&self) -> &Self::IDENT; + fn provider(&self) -> &LibcruxProvider; + + // Build an MLS Credential from the supplied IdentityProvider + fn get_credential(&self) -> CredentialWithKey { + CredentialWithKey { + credential: BasicCredential::new(self.ident().friendly_name().into()).into(), + signature_key: self.ident().public_key().as_ref().into(), + } + } + + fn invite_user( + &self, + ds: &mut DS, + account_id: &AccountId, + welcome: &MlsMessageOut, + ) -> Result<(), ChatError>; +} + +pub struct GroupV1Convo { + ctx: Rc>, + ds: Rc>, + keypkg_provider: Rc>, + mls_group: MlsGroup, + convo_id: String, +} + +impl std::fmt::Debug for GroupV1Convo +where + MlsCtx: MlsContext, + DS: DeliveryService, + KP: KeyPackageProvider, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("GroupV1Convo") + .field("name", &self.ctx.borrow().ident().friendly_name()) + .field("convo_id", &self.convo_id) + .field("mls_epoch", &self.mls_group.epoch()) + .finish_non_exhaustive() + } +} + +impl GroupV1Convo +where + MlsCtx: MlsContext, + DS: DeliveryService, + KP: KeyPackageProvider, +{ + // Create a new conversation with the creator as the only participant. + pub fn new( + ctx: Rc>, + ds: Rc>, + keypkg_provider: Rc>, + ) -> Result { + let config = Self::mls_create_config(); + let mls_group = { + let ctx_ref = ctx.borrow(); + MlsGroup::new( + ctx_ref.provider(), + ctx_ref.ident(), + &config, + ctx_ref.get_credential(), + ) + .unwrap() + }; + let convo_id = hex::encode(mls_group.group_id().as_slice()); + Self::subscribe(&mut ds.borrow_mut(), &convo_id)?; + + Ok(Self { + ctx, + ds, + keypkg_provider, + mls_group, + convo_id, + }) + } + + // Constructs a new conversation upon receiving a MlsWelcome message. + pub fn new_from_welcome( + ctx: Rc>, + ds: Rc>, + keypkg_provider: Rc>, + welcome: Welcome, + ) -> Result { + let mls_group = { + let ctx_borrow = ctx.borrow(); + let provider = ctx_borrow.provider(); + + StagedWelcome::build_from_welcome(provider, &Self::mls_join_config(), welcome) + .unwrap() + .build() + .unwrap() + .into_group(provider) + .unwrap() + }; + + let convo_id = hex::encode(mls_group.group_id().as_slice()); + Self::subscribe(&mut *ds.borrow_mut(), &convo_id)?; + + Ok(Self { + ctx, + ds, + keypkg_provider, + mls_group, + convo_id, + }) + } + + pub fn load( + ctx: Rc>, + ds: Rc>, + keypkg_provider: Rc>, + convo_id: String, + group_id: GroupId, + ) -> Result { + let mls_group = MlsGroup::load(ctx.borrow().provider().storage(), &group_id) + .map_err(ChatError::generic)? + .ok_or_else(|| ChatError::NoConvo("mls group not found".into()))?; + + Self::subscribe(&mut *ds.borrow_mut(), &convo_id)?; + + Ok(GroupV1Convo { + ctx, + ds, + keypkg_provider, + mls_group, + convo_id, + }) + } + + // Configure the delivery service to listen for the required delivery addresses. + fn subscribe(ds: &mut DS, convo_id: &str) -> Result<(), ChatError> { + ds.subscribe(&Self::delivery_address_from_id(convo_id)) + .map_err(ChatError::generic)?; + ds.subscribe(&Self::ctrl_delivery_address_from_id(convo_id)) + .map_err(ChatError::generic)?; + + Ok(()) + } + + fn mls_create_config() -> MlsGroupCreateConfig { + MlsGroupCreateConfig::builder() + .ciphersuite(Ciphersuite::MLS_256_XWING_CHACHA20POLY1305_SHA256_Ed25519) + .use_ratchet_tree_extension(true) // This is handy for now, until there is central store for this data + .build() + } + + fn mls_join_config() -> MlsGroupJoinConfig { + MlsGroupJoinConfig::builder().build() + } + + fn delivery_address_from_id(convo_id: &str) -> String { + let hash = Blake2b::::new() + .chain_update("delivery_addr|") + .chain_update(convo_id) + .finalize(); + hex::encode(hash) + } + + fn delivery_address(&self) -> String { + Self::delivery_address_from_id(&self.convo_id) + } + + fn ctrl_delivery_address_from_id(convo_id: &str) -> String { + let hash = Blake2b::::new() + .chain_update("ctrl_delivery_addr|") + .chain_update(convo_id) + .finalize(); + hex::encode(hash) + } + + fn ctrl_delivery_address(&self) -> String { + Self::ctrl_delivery_address_from_id(&self.convo_id) + } + + fn key_package_for_account(&self, ident: &AccountId) -> Result { + let retrieved_bytes = self + .keypkg_provider + .borrow() + .retrieve(ident) + .map_err(|e: KP::Error| ChatError::Generic(e.to_string()))?; + + // dbg!(ctx.contact_registry()); + let Some(keypkg_bytes) = retrieved_bytes else { + return Err(ChatError::Protocol("Contact Not Found".into())); + }; + + let key_package_in = KeyPackageIn::tls_deserialize(&mut keypkg_bytes.as_slice())?; + let keypkg = key_package_in.validate( + self.ctx.borrow().provider().crypto(), + ProtocolVersion::Mls10, + )?; //TODO: P3 - Hardcoded Protocol Version + Ok(keypkg) + } +} + +impl Id for GroupV1Convo +where + MlsCtx: MlsContext, + DS: DeliveryService, + KP: KeyPackageProvider, +{ + fn id(&self) -> ConversationId<'_> { + &self.convo_id + } +} + +impl Convo for GroupV1Convo +where + MlsCtx: MlsContext, + DS: DeliveryService, + KP: KeyPackageProvider, +{ + fn send_message( + &mut self, + content: &[u8], + ) -> Result, ChatError> { + let ctx_ref = self.ctx.borrow(); + let provider = ctx_ref.provider(); + let mls_message_out = self + .mls_group + .create_message(provider, ctx_ref.ident(), content) + .unwrap(); + + let a = AddressedEncryptedPayload { + delivery_address: self.delivery_address(), + data: EncryptedPayload { + encryption: Some(encrypted_payload::Encryption::Plaintext(Plaintext { + payload: mls_message_out.to_bytes().unwrap().into(), + })), + }, + }; + + Ok(vec![a]) + } + + fn handle_frame( + &mut self, + encoded_payload: EncryptedPayload, + ) -> Result, ChatError> { + let bytes = match encoded_payload.encryption { + Some(encrypted_payload::Encryption::Plaintext(pt)) => pt.payload, + _ => { + return Err(ChatError::ProtocolExpectation( + "None", + "Some(Encryption::Plaintext)".into(), + )); + } + }; + + let mls_message = + MlsMessageIn::tls_deserialize_exact_bytes(&bytes).map_err(ChatError::generic)?; + + let protocol_message: ProtocolMessage = mls_message + .try_into_protocol_message() + .map_err(ChatError::generic)?; + + let ctx_borrow = self.ctx.borrow(); + let provider = ctx_borrow.provider(); + + if protocol_message.epoch() < self.mls_group.epoch() { + // TODO: (P1) Add logging for messages arriving from past epoch. + return Ok(None); + } + + let processed = self + .mls_group + .process_message(provider, protocol_message) + .map_err(ChatError::generic)?; + + match processed.into_content() { + ProcessedMessageContent::ApplicationMessage(msg) => Ok(Some(ContentData { + conversation_id: hex::encode(self.mls_group.group_id().as_slice()), + data: msg.into_bytes(), + is_new_convo: false, + })), + ProcessedMessageContent::StagedCommitMessage(commit) => { + self.mls_group + .merge_staged_commit(provider, *commit) + .map_err(ChatError::generic)?; + Ok(None) + } + _ => { + // TODO: (P2) Log unknown message type + Ok(None) + } + } + } + + fn remote_id(&self) -> String { + // "group_remote_id".into() + todo!() + } + + fn convo_type(&self) -> storage::ConversationKind { + ConversationKind::GroupV1 + } +} + +impl GroupConvo for GroupV1Convo +where + MlsCtx: MlsContext, + DS: DeliveryService, + KP: KeyPackageProvider, +{ + // add_members returns: + // commit — the Commit message Alice broadcasts to all members + // welcome — the Welcome message sent privately to each new joiner + // _group_info — used for external joins; ignore for now + fn add_member(&mut self, members: &[&AccountId]) -> Result<(), ChatError> { + let ctx_ref = self.ctx.borrow(); + let provider = ctx_ref.provider(); + + if members.len() > 50 { + // This is a temporary limit that originates from the the De-MLS epoch time. + return Err(ChatError::Protocol( + "Cannot add more than 50 Members at a time".into(), + )); + } + + // Get the Keypacakages and transpose any errors. + // The account_id is kept so invites can be addressed properly + let keypkgs = members + .iter() + .map(|ident| self.key_package_for_account(ident)) + .collect::, ChatError>>()?; + + let (commit, welcome, _group_info) = self + .mls_group + .add_members(provider, ctx_ref.ident(), keypkgs.iter().as_slice()) + .unwrap(); + + self.mls_group.merge_pending_commit(provider).unwrap(); + + // TODO: (P3) Evaluate privacy/performance implications of an aggregated Welcome for multiple users + for account_id in members { + ctx_ref.invite_user(&mut *self.ds.borrow_mut(), account_id, &welcome)?; + } + + let encrypted_payload = EncryptedPayload { + encryption: Some(encrypted_payload::Encryption::Plaintext(Plaintext { + payload: commit.to_bytes()?.into(), + })), + }; + + let addr_enc_payload = AddressedEncryptedPayload { + delivery_address: self.ctrl_delivery_address(), + data: encrypted_payload, + }; + // Prepare commit message + // TODO: (P1) Make GroupConvos agnostic to framing so its less error prone and more + let env = addr_enc_payload.into_envelope(self.convo_id.clone()); + + self.ds + .borrow_mut() + .publish(env) + .map_err(|e| ChatError::Generic(format!("Publish: {e}"))) + } + + fn send_content(&mut self, content: &[u8]) -> Result<(), ChatError> { + let payloads = self.send_message(content)?; + for payload in payloads { + self.ds + .borrow_mut() + .publish(payload.into_envelope(self.id().into())) + .map_err(|e| ChatError::Delivery(e.to_string()))?; + } + Ok(()) + } +} diff --git a/core/conversations/src/errors.rs b/core/conversations/src/errors.rs index 664cdd3..5b2bfe4 100644 --- a/core/conversations/src/errors.rs +++ b/core/conversations/src/errors.rs @@ -1,3 +1,4 @@ +use openmls::{framing::errors::MlsMessageError, prelude::tls_codec}; pub use thiserror::Error; use storage::StorageError; @@ -26,6 +27,23 @@ pub enum ChatError { UnsupportedConvoType(String), #[error("storage error: {0}")] Storage(#[from] StorageError), + #[error("mls error: {0}")] + MlsMessageError(#[from] MlsMessageError), + #[error("TlsCodec: {0}")] + TlsCodec(#[from] tls_codec::Error), + #[error("generic: {0}")] + Generic(String), + #[error("KeyPackage: {0}")] + KeyPackage(#[from] openmls::prelude::KeyPackageVerifyError), + #[error("Delivery: {0}")] + Delivery(String), +} + +impl ChatError { + // This is a stopgap until there is a proper error system in place + pub fn generic(e: impl ToString) -> Self { + Self::Generic(e.to_string()) + } } #[derive(Error, Debug)] diff --git a/core/conversations/src/inbox_v2.rs b/core/conversations/src/inbox_v2.rs new file mode 100644 index 0000000..ef7b030 --- /dev/null +++ b/core/conversations/src/inbox_v2.rs @@ -0,0 +1,246 @@ +use std::cell::RefCell; +use std::rc::Rc; + +use chat_proto::logoschat::envelope::EnvelopeV1; +use openmls::prelude::tls_codec::Serialize; +use openmls::prelude::*; +use openmls_libcrux_crypto::Provider as LibcruxProvider; +use prost::{Message, Oneof}; +use storage::ChatStore; +use storage::ConversationMeta; + +use crate::AddressedEnvelope; +use crate::ChatError; +use crate::DeliveryService; +use crate::RegistrationService; +use crate::account::LogosAccount; +use crate::conversation::GroupConvo; +use crate::conversation::group_v1::MlsContext; +use crate::conversation::{GroupV1Convo, IdentityProvider}; +use crate::types::AccountId; +use crate::utils::{blake2b_hex, hash_size}; +pub struct PqMlsContext { + ident_provider: LogosAccount, + provider: LibcruxProvider, +} + +impl MlsContext for PqMlsContext { + type IDENT = LogosAccount; + + fn ident(&self) -> &LogosAccount { + &self.ident_provider + } + + fn provider(&self) -> &LibcruxProvider { + &self.provider + } + + fn invite_user( + &self, + ds: &mut DS, + account_id: &AccountId, + welcome: &MlsMessageOut, + ) -> Result<(), ChatError> { + let invite = GroupV1HeavyInvite { + welcome_bytes: welcome.to_bytes()?, + }; + + let frame = InboxV2Frame { + payload: Some(InviteType::GroupV1(invite)), + }; + + let envelope = EnvelopeV1 { + conversation_hint: conversation_id_for(account_id), + salt: 0, + payload: frame.encode_to_vec().into(), + }; + + let outbound_msg = AddressedEnvelope { + delivery_address: delivery_address_for(account_id), + data: envelope.encode_to_vec(), + }; + + ds.publish(outbound_msg).map_err(ChatError::generic)?; + Ok(()) + } +} + +// Define unique Identifiers derivations used in InboxV2 +fn delivery_address_for(account_id: &AccountId) -> String { + blake2b_hex::(&["InboxV2|", "delivery_address|", account_id.as_str()]) +} + +fn conversation_id_for(account_id: &AccountId) -> String { + blake2b_hex::(&["InboxV2|", "conversation_id|", account_id.as_str()]) +} + +/// An PQ focused Conversation initializer. +/// InboxV2 Incorporates an Account based identity system to support PQ based conversation protocols +/// such as MLS. +pub struct InboxV2 { + account_id: AccountId, + ds: Rc>, + reg_service: Rc>, + store: Rc>, + ctx: Rc>, +} + +impl InboxV2 +where + DS: DeliveryService, + RS: RegistrationService, + CS: ChatStore, +{ + pub fn new( + account: LogosAccount, + ds: Rc>, + reg_service: Rc>, + store: Rc>, + ) -> Self { + let account_id = account.account_id().clone(); + let provider = LibcruxProvider::new().unwrap(); + Self { + account_id, + ds, + reg_service, + store, + ctx: Rc::new(RefCell::new(PqMlsContext { + ident_provider: account, + provider, + })), + } + } + + pub fn account_id(&self) -> &AccountId { + &self.account_id + } + + /// Submit MlsKeypackage to registration service + pub fn register(&mut self) -> Result<(), ChatError> { + let keypackage_bytes = self.create_keypackage()?.tls_serialize_detached()?; + + // TODO: (P3) Each keypackage can only be used once either enable... + // "LastResort" package or publish multiple + self.reg_service + .borrow_mut() + .register( + &self.ctx.borrow().ident_provider.friendly_name(), + keypackage_bytes, + ) + .map_err(ChatError::generic) + } + + pub fn delivery_address(&self) -> String { + delivery_address_for(&self.account_id) + } + + pub fn id(&self) -> String { + conversation_id_for(&self.account_id) + } + + pub fn create_group_v1(&self) -> Result, ChatError> { + GroupV1Convo::new(self.ctx.clone(), self.ds.clone(), self.reg_service.clone()) + } + + pub fn handle_frame(&self, payload_bytes: &[u8]) -> Result<(), ChatError> { + let inbox_frame = InboxV2Frame::decode(payload_bytes)?; + + let Some(payload) = inbox_frame.payload else { + return Err(ChatError::BadParsing("InboxV2Payload missing")); + }; + + match payload { + InviteType::GroupV1(group_v1_heavy_invite) => { + self.handle_heavy_invite(group_v1_heavy_invite) + } + } + } + + fn persist_convo(&self, convo: impl GroupConvo) -> Result<(), ChatError> { + // TODO: (P2) Remove remote_convo_id this is an implementation detail specific to PrivateV1 + // TODO: (P3) Implement From for ConversationMeta + let meta = ConversationMeta { + local_convo_id: convo.id().to_string(), + remote_convo_id: "0".into(), + kind: storage::ConversationKind::GroupV1, + }; + self.store.borrow_mut().save_conversation(&meta)?; + // TODO: (P1) Persist state + Ok(()) + } + + fn handle_heavy_invite(&self, invite: GroupV1HeavyInvite) -> Result<(), ChatError> { + let (msg_in, _rest) = MlsMessageIn::tls_deserialize_bytes(invite.welcome_bytes.as_slice())?; + + let MlsMessageBodyIn::Welcome(welcome) = msg_in.extract() else { + return Err(ChatError::ProtocolExpectation( + "something else", + "Welcome".into(), + )); + }; + + let convo = GroupV1Convo::new_from_welcome( + self.ctx.clone(), + self.ds.clone(), + self.reg_service.clone(), + welcome, + )?; + self.persist_convo(convo) + } + + fn create_keypackage(&self) -> Result { + let ctx_borrow = self.ctx.borrow(); + let capabilities = Capabilities::builder() + .ciphersuites(vec![ + Ciphersuite::MLS_256_XWING_CHACHA20POLY1305_SHA256_Ed25519, + ]) + .extensions(vec![ExtensionType::ApplicationId]) + .build(); + let a = KeyPackage::builder() + .leaf_node_capabilities(capabilities) + .build( + Ciphersuite::MLS_256_XWING_CHACHA20POLY1305_SHA256_Ed25519, + ctx_borrow.provider(), + ctx_borrow.ident(), + ctx_borrow.get_credential(), + ) + .expect("Failed to build KeyPackage"); + + Ok(a.key_package().clone()) + } + + pub fn load_mls_convo( + &self, + convo_id: String, + ) -> Result, ChatError> { + let group_id_bytes = hex::decode(&convo_id).map_err(ChatError::generic)?; + let group_id = GroupId::from_slice(&group_id_bytes); + let convo = GroupV1Convo::load( + self.ctx.clone(), + self.ds.clone(), + self.reg_service.clone(), + convo_id, + group_id, + )?; + + Ok(convo) + } +} + +#[derive(Clone, PartialEq, Message)] +pub struct InboxV2Frame { + #[prost(oneof = "InviteType", tags = "1")] + pub payload: Option, +} + +#[derive(Clone, PartialEq, Oneof)] +pub enum InviteType { + #[prost(message, tag = "1")] + GroupV1(GroupV1HeavyInvite), +} + +#[derive(Clone, PartialEq, Message)] +pub struct GroupV1HeavyInvite { + #[prost(bytes, tag = "1")] + pub welcome_bytes: Vec, +} diff --git a/core/conversations/src/lib.rs b/core/conversations/src/lib.rs index 8de610b..4f855f6 100644 --- a/core/conversations/src/lib.rs +++ b/core/conversations/src/lib.rs @@ -4,13 +4,18 @@ mod conversation; mod crypto; mod errors; mod inbox; +mod inbox_v2; mod proto; +mod service_traits; mod types; mod utils; pub use account::LogosAccount; -pub use context::{Context, ConversationIdOwned, Introduction}; +pub use context::{Context, ConversationId, ConversationIdOwned, Introduction}; +pub use conversation::GroupConvo; pub use errors::ChatError; +pub use service_traits::{DeliveryService, RegistrationService}; pub use sqlite::ChatStorage; pub use sqlite::StorageConfig; -pub use types::{AddressedEnvelope, ContentData}; +pub use types::{AccountId, AddressedEnvelope, ContentData}; +pub use utils::hex_trunc; diff --git a/core/conversations/src/service_traits.rs b/core/conversations/src/service_traits.rs new file mode 100644 index 0000000..8e37253 --- /dev/null +++ b/core/conversations/src/service_traits.rs @@ -0,0 +1,41 @@ +/// Service traits define the functionality which must be externally supplied by +/// platform clients. Platforms can alter the behaviour of the chat core by supplying +/// different implementations. +use std::{fmt::Debug, fmt::Display}; + +use crate::types::{AccountId, AddressedEnvelope}; + +/// A Delivery service is responsible for payload transport. +/// This interface allows Conversations to send payloads on the wire as well as +/// register interest in delivery_addresses. Client implementations are responsible +/// for providing the inbound payloads to Context::handle_payload. +pub trait DeliveryService: Debug { + type Error: Display; + fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Self::Error>; + fn subscribe(&mut self, delivery_address: &str) -> Result<(), Self::Error>; +} + +/// Manages key bundle storage for MLS group creation/addition while contacts are +/// offline. +/// +/// Implement this to provide a contact registry — ach participant publishes their key package +/// on registration; others fetch it to initiate a conversation. +pub trait RegistrationService: Debug { + type Error: Display; + fn register(&mut self, identity: &str, key_bundle: Vec) -> Result<(), Self::Error>; + fn retrieve(&self, identity: &AccountId) -> Result>, Self::Error>; +} + +/// Read-only view of a contact registry. Not part of the public API. +/// Satisfied automatically by any `RegistrationService` implementation. +pub trait KeyPackageProvider: Debug { + type Error: Display; + fn retrieve(&self, identity: &AccountId) -> Result>, Self::Error>; +} + +impl KeyPackageProvider for T { + type Error = T::Error; + fn retrieve(&self, identity: &AccountId) -> Result>, Self::Error> { + RegistrationService::retrieve(self, identity) + } +} diff --git a/core/conversations/src/types.rs b/core/conversations/src/types.rs index e8ecc70..9564234 100644 --- a/core/conversations/src/types.rs +++ b/core/conversations/src/types.rs @@ -1,4 +1,4 @@ -use std::fmt; +use std::fmt::{self, Debug}; use crate::proto::{self, Message}; @@ -6,13 +6,51 @@ use crate::proto::{self, Message}; // This struct represents Outbound data. // It wraps an encoded payload with a delivery address, so it can be handled by the delivery service. +#[derive(Clone)] pub struct AddressedEnvelope { pub delivery_address: String, pub data: Vec, } +impl AddressedEnvelope { + pub fn new(delivery_address: String, convo_id: String, data: &[u8]) -> Self { + let envelope = proto::EnvelopeV1 { + // TODO: conversation_id should be obscured + conversation_hint: convo_id, + salt: 0, + payload: proto::Bytes::copy_from_slice(data), + }; + + AddressedEnvelope { + delivery_address, + data: envelope.encode_to_vec(), + } + } +} + +impl Debug for AddressedEnvelope { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let data = &self.data; + let hex = if data.len() <= 8 { + hex::encode(data) + } else { + format!( + "{}..{}", + hex::encode(&data[..4]), + hex::encode(&data[data.len() - 4..]) + ) + }; + + f.debug_struct("AddressedEnvelope") + .field("addr", &self.delivery_address) + .field("data", &hex) + .finish() + } +} + // This struct represents the result of processed inbound data. // It wraps content payload with a conversation_id +#[derive(Debug)] pub struct ContentData { pub conversation_id: String, pub data: Vec, @@ -22,7 +60,7 @@ pub struct ContentData { // Internal type Definitions // Used by Conversations to attach addresses to outbound encrypted payloads -pub(crate) struct AddressedEncryptedPayload { +pub struct AddressedEncryptedPayload { pub delivery_address: String, pub data: proto::EncryptedPayload, } @@ -30,17 +68,11 @@ pub(crate) struct AddressedEncryptedPayload { impl AddressedEncryptedPayload { // Wrap in an envelope and prepare for transmission pub fn into_envelope(self, convo_id: String) -> AddressedEnvelope { - let envelope = proto::EnvelopeV1 { - // TODO: conversation_id should be obscured - conversation_hint: convo_id, - salt: 0, - payload: proto::Bytes::copy_from_slice(self.data.encode_to_vec().as_slice()), - }; - - AddressedEnvelope { - delivery_address: self.delivery_address, - data: envelope.encode_to_vec(), - } + AddressedEnvelope::new( + self.delivery_address, + convo_id, + self.data.encode_to_vec().as_slice(), + ) } } diff --git a/core/conversations/src/utils.rs b/core/conversations/src/utils.rs index 306e898..93eaf85 100644 --- a/core/conversations/src/utils.rs +++ b/core/conversations/src/utils.rs @@ -1,3 +1,4 @@ +use blake2::{Blake2b, Digest}; use std::time::{SystemTime, UNIX_EPOCH}; pub fn timestamp_millis() -> i64 { @@ -6,3 +7,66 @@ pub fn timestamp_millis() -> i64 { .unwrap() .as_millis() as i64 } + +/// Track hash sizes in use across the crate. +pub mod hash_size { + use blake2::digest::{ + consts::U64, + generic_array::ArrayLength, + typenum::{IsLessOrEqual, NonZero}, + }; + + pub trait HashLen + where + >::Output: NonZero, + { + type Size: ArrayLength + IsLessOrEqual; + } + + /// This macro generates HashLen for the given typenum::length + macro_rules! hash_sizes { + ($($(#[$attr:meta])* $name:ident => $size:ty),* $(,)?) => { + $( + $(#[$attr])* + pub struct $name; + impl HashLen for $name { type Size = $size; } + )* + }; + } + + use blake2::digest::consts::{U6, U8}; + hash_sizes! { + /// Account ID hash length + AccountId => U8, + /// Conversation ID hash length + ConvoId => U6, + } +} + +/// This establishes an easy to use wrapper for hashes in this crate. +/// The output is formatted string of hex characters +pub fn blake2b_hex(components: &[impl AsRef<[u8]>]) -> String { + //A + let mut hash = Blake2b::::new(); + + for c in components { + hash.update(c); + } + + let output = hash.finalize(); + hex::encode(output) +} + +/// Shorten byte slices for testing and logging +#[allow(unused)] +pub fn hex_trunc(data: &[u8]) -> String { + if data.len() <= 8 { + hex::encode(data) + } else { + format!( + "{}..{}", + hex::encode(&data[..4]), + hex::encode(&data[data.len() - 4..]) + ) + } +} diff --git a/core/integration_tests_core/Cargo.toml b/core/integration_tests_core/Cargo.toml new file mode 100644 index 0000000..34ff420 --- /dev/null +++ b/core/integration_tests_core/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "integration_tests_core" +version = "0.1.0" +edition = "2024" + +# [[test]] +# name = "integration_tests_core" + +[dev-dependencies] +libchat = { workspace = true } +storage = { workspace = true } +sqlite = {package = "chat-sqlite", path ="../sqlite"} + +components = { path = "../../extensions/components" } +tempfile = "3" diff --git a/core/integration_tests_core/README.md b/core/integration_tests_core/README.md new file mode 100644 index 0000000..d44b7de --- /dev/null +++ b/core/integration_tests_core/README.md @@ -0,0 +1,12 @@ +This crate is dedicated to backend integration tests. + +Tests can be built using any supplied service implementation. +Various implementations are available in the `Extensions/components` crate. + +## Running Tests + +Integration tests are executed when running `cargo test` from the workspace folder. + +Alternatively they can be executed from any crate, using + +`cargo test --package integration_tests_core` diff --git a/core/integration_tests_core/src/lib.rs b/core/integration_tests_core/src/lib.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/core/integration_tests_core/src/lib.rs @@ -0,0 +1 @@ + diff --git a/core/integration_tests_core/tests/mls_integration.rs b/core/integration_tests_core/tests/mls_integration.rs new file mode 100644 index 0000000..03119e7 --- /dev/null +++ b/core/integration_tests_core/tests/mls_integration.rs @@ -0,0 +1,143 @@ +use std::ops::{Deref, DerefMut}; + +use components::{EphemeralRegistry, LocalBroadcaster, MemStore}; +use libchat::{ContentData, Context, GroupConvo, hex_trunc}; + +// Simple client Functionality for testing +struct Client { + inner: Context, + on_content: Option>, +} + +impl Client { + fn init( + ctx: Context, + cb: Option, + ) -> Self { + Client { + inner: ctx, + on_content: cb.map(|f| Box::new(f) as Box), + } + } + + fn process_messages(&mut self) { + let messages: Vec<_> = { + let mut ds = self.ds(); + std::iter::from_fn(|| ds.poll()).collect() + }; + + for data in messages { + let res = self.handle_payload(&data).unwrap(); + if let Some(cb) = &self.on_content + && let Some(content_data) = res + { + cb(content_data); + } + } + } + + fn convo( + &mut self, + convo_id: &str, + ) -> Box> { + // TODO: (P1) Convos are being copied somewhere, which means hanging on to a reference causes state desync + self.get_convo(convo_id).unwrap() + } +} + +impl Deref for Client { + type Target = Context; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for Client { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +// Higher order function to handle printing +fn pretty_print(prefix: impl Into) -> Box { + let prefix = prefix.into(); + Box::new(move |c: ContentData| { + let cid = hex_trunc(c.conversation_id.as_bytes()); + let content = String::from_utf8(c.data).unwrap(); + println!("{} ({:?}) {}", prefix, cid, content) + }) +} + +fn process(clients: &mut Vec) { + for client in clients { + client.process_messages(); + } +} + +#[test] +fn create_group() { + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + + let saro_ctx = + Context::new_with_name("saro", ds.new_consumer(), rs.clone(), MemStore::new()).unwrap(); + let raya_ctx = Context::new_with_name("raya", ds.clone(), rs.clone(), MemStore::new()).unwrap(); + + let mut clients = vec![ + Client::init(saro_ctx, Some(pretty_print(" Saro "))), + Client::init(raya_ctx, Some(pretty_print(" Raya "))), + ]; + + const SARO: usize = 0; + const RAYA: usize = 1; + + let raya_id = clients[RAYA].account_id().clone(); + let s_convo = clients[SARO].create_group_convo(&[&raya_id]).unwrap(); + + let convo_id = s_convo.id(); + + // Raya can read this message because + // 1) It was sent after add_members was committed, and + // 2) LocalBroadcaster provides historical messages. + + clients[SARO] + .convo(convo_id) + .send_content(b"ok who broke the group chat again") + .unwrap(); + + process(&mut clients); + + clients[RAYA] + .convo(convo_id) + .send_content(b"it was literally working five minutes ago") + .unwrap(); + + process(&mut clients); + + let pax_ctx = Context::new_with_name("pax", ds, rs, MemStore::new()).unwrap(); + clients.push(Client::init(pax_ctx, Some(pretty_print(" Pax")))); + const PAX: usize = 2; + + let pax_id = clients[PAX].account_id().clone(); + clients[SARO] + .convo(convo_id) + .add_member(&[&pax_id]) + .unwrap(); + + process(&mut clients); + + clients[PAX] + .convo(convo_id) + .send_content(b"ngl the key rotation is cooked") + .unwrap(); + + process(&mut clients); + + clients[SARO] + .convo(convo_id) + .send_content(b"bro we literally just added you to the group ") + .unwrap(); + + process(&mut clients); +} diff --git a/core/integration_tests_core/tests/private_integration.rs b/core/integration_tests_core/tests/private_integration.rs new file mode 100644 index 0000000..90d2a9b --- /dev/null +++ b/core/integration_tests_core/tests/private_integration.rs @@ -0,0 +1,163 @@ +use libchat::{Context, Introduction}; +use sqlite::{ChatStorage, StorageConfig}; +use storage::{ConversationStore, IdentityStore}; +use tempfile::tempdir; + +use components::{EphemeralRegistry, LocalBroadcaster}; + +fn send_and_verify( + sender: &mut Context, + receiver: &mut Context, + convo_id: &str, + content: &[u8], +) { + let payloads = sender.send_content(convo_id, content).unwrap(); + let payload = payloads.first().unwrap(); + let received = receiver + .handle_payload(&payload.data) + .unwrap() + .expect("expected content"); + assert_eq!(content, received.data.as_slice()); + assert!(!received.is_new_convo); +} + +#[test] +fn ctx_integration() { + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + + let mut saro = + Context::new_with_name("saro", ds.clone(), rs.clone(), ChatStorage::in_memory()).unwrap(); + let mut raya = Context::new_with_name("raya", ds, rs, ChatStorage::in_memory()).unwrap(); + + // Raya creates intro bundle and sends to Saro + let bundle = raya.create_intro_bundle().unwrap(); + let intro = Introduction::try_from(bundle.as_slice()).unwrap(); + + // Saro initiates conversation with Raya + let mut content = vec![10]; + let (saro_convo_id, payloads) = saro.create_private_convo(&intro, &content).unwrap(); + + // Raya receives initial message + let payload = payloads.first().unwrap(); + let initial_content = raya + .handle_payload(&payload.data) + .unwrap() + .expect("expected initial content"); + + let raya_convo_id = initial_content.conversation_id; + assert_eq!(content, initial_content.data); + assert!(initial_content.is_new_convo); + + // Exchange messages back and forth + for _ in 0..10 { + content.push(content.last().unwrap() + 1); + send_and_verify(&mut raya, &mut saro, &raya_convo_id, &content); + + content.push(content.last().unwrap() + 1); + send_and_verify(&mut saro, &mut raya, &saro_convo_id, &content); + } +} + +#[test] +fn identity_persistence() { + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + let store1 = ChatStorage::new(StorageConfig::InMemory).unwrap(); + let ctx1 = Context::new_with_name("alice", ds, rs, store1).unwrap(); + let pubkey1 = ctx1.identity().public_key(); + let name1 = ctx1.installation_name().to_string(); + + // For persistence tests with file-based storage, we'd need a shared db. + // With in-memory, we just verify the identity was created. + assert_eq!(name1, "alice"); + assert!(!pubkey1.as_bytes().iter().all(|&b| b == 0)); +} + +#[test] +fn open_persists_new_identity() { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("chat.sqlite"); + let db_path = db_path.to_string_lossy().into_owned(); + + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + let store = ChatStorage::new(StorageConfig::File(db_path.clone())).unwrap(); + let ctx = Context::new_from_store("alice", ds, rs, store).unwrap(); + let pubkey = ctx.identity().public_key(); + drop(ctx); + + let store = ChatStorage::new(StorageConfig::File(db_path)).unwrap(); + let persisted = store.load_identity().unwrap().unwrap(); + + assert_eq!(persisted.get_name(), "alice"); + assert_eq!(persisted.public_key(), pubkey); +} + +#[test] +fn conversation_metadata_persistence() { + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + let mut alice = + Context::new_with_name("alice", ds.clone(), rs.clone(), ChatStorage::in_memory()).unwrap(); + let mut bob = Context::new_with_name("bob", ds, rs, ChatStorage::in_memory()).unwrap(); + + let bundle = alice.create_intro_bundle().unwrap(); + let intro = Introduction::try_from(bundle.as_slice()).unwrap(); + let (_, payloads) = bob.create_private_convo(&intro, b"hi").unwrap(); + + let payload = payloads.first().unwrap(); + let content = alice.handle_payload(&payload.data).unwrap().unwrap(); + assert!(content.is_new_convo); + + let convos = alice.store().load_conversations().unwrap(); + assert_eq!(convos.len(), 1); + assert_eq!(convos[0].kind.as_str(), "private_v1"); +} + +#[test] +fn conversation_full_flow() { + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + let mut alice = + Context::new_with_name("alice", ds.clone(), rs.clone(), ChatStorage::in_memory()).unwrap(); + let mut bob = Context::new_with_name("bob", ds, rs, ChatStorage::in_memory()).unwrap(); + + let bundle = alice.create_intro_bundle().unwrap(); + let intro = Introduction::try_from(bundle.as_slice()).unwrap(); + let (bob_convo_id, payloads) = bob.create_private_convo(&intro, b"hello").unwrap(); + + let payload = payloads.first().unwrap(); + let content = alice.handle_payload(&payload.data).unwrap().unwrap(); + let alice_convo_id = content.conversation_id; + + let payloads = alice.send_content(&alice_convo_id, b"reply 1").unwrap(); + let payload = payloads.first().unwrap(); + bob.handle_payload(&payload.data).unwrap().unwrap(); + + let payloads = bob.send_content(&bob_convo_id, b"reply 2").unwrap(); + let payload = payloads.first().unwrap(); + alice.handle_payload(&payload.data).unwrap().unwrap(); + + // Verify conversation list + let convo_ids = alice.list_conversations().unwrap(); + assert_eq!(convo_ids.len(), 1); + + // Continue exchanging messages + let payloads = bob.send_content(&bob_convo_id, b"more messages").unwrap(); + let payload = payloads.first().unwrap(); + let content = alice + .handle_payload(&payload.data) + .expect("should decrypt") + .expect("should have content"); + assert_eq!(content.data, b"more messages"); + + // Alice can also send back + let payloads = alice.send_content(&alice_convo_id, b"alice reply").unwrap(); + let payload = payloads.first().unwrap(); + let content = bob + .handle_payload(&payload.data) + .unwrap() + .expect("bob should receive"); + assert_eq!(content.data, b"alice reply"); +} diff --git a/core/storage/src/store.rs b/core/storage/src/store.rs index a24ad25..d53b16c 100644 --- a/core/storage/src/store.rs +++ b/core/storage/src/store.rs @@ -27,6 +27,7 @@ pub trait EphemeralKeyStore { pub enum ConversationKind { PrivateV1, Unknown(String), + GroupV1, } impl ConversationKind { @@ -34,6 +35,7 @@ impl ConversationKind { match self { Self::PrivateV1 => "private_v1", Self::Unknown(value) => value.as_str(), + Self::GroupV1 => "group_v1", } } } @@ -42,6 +44,7 @@ impl From<&str> for ConversationKind { fn from(value: &str) -> Self { match value { "private_v1" => Self::PrivateV1, + "group_v1" => Self::GroupV1, other => Self::Unknown(other.to_string()), } } @@ -120,6 +123,8 @@ pub trait RatchetStore { fn cleanup_old_skipped_keys(&mut self, max_age_secs: i64) -> Result; } +// TODO: (P2) this should be defined in the ConversationType + pub trait ChatStore: IdentityStore + EphemeralKeyStore + ConversationStore + RatchetStore {} impl ChatStore for T where T: IdentityStore + EphemeralKeyStore + ConversationStore + RatchetStore diff --git a/crates/client-ffi/src/delivery.rs b/crates/client-ffi/src/delivery.rs index b58f7d2..26f8c4c 100644 --- a/crates/client-ffi/src/delivery.rs +++ b/crates/client-ffi/src/delivery.rs @@ -14,6 +14,8 @@ pub type DeliverFn = Option< ) -> i32, >; +#[derive(Debug)] + pub struct CDelivery { pub callback: DeliverFn, } @@ -28,4 +30,9 @@ impl DeliveryService for CDelivery { let rc = unsafe { cb(addr.as_ptr(), addr.len(), data.as_ptr(), data.len()) }; if rc < 0 { Err(rc) } else { Ok(()) } } + + fn subscribe(&mut self, _delivery_address: &str) -> Result<(), Self::Error> { + // TODO: (P1) CDelivery does not support delivery_address filtering + Ok(()) + } } diff --git a/crates/client/Cargo.toml b/crates/client/Cargo.toml index 5aa4d4c..7bec801 100644 --- a/crates/client/Cargo.toml +++ b/crates/client/Cargo.toml @@ -8,6 +8,7 @@ crate-type = ["rlib"] [dependencies] libchat = { workspace = true } +logoschat_components = { workspace = true} chat-sqlite = { path = "../../core/sqlite" } thiserror = "2" diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index c95a0f1..97870f9 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -1,22 +1,23 @@ use libchat::{ AddressedEnvelope, ChatError, ChatStorage, ContentData, Context, ConversationIdOwned, - Introduction, StorageConfig, + DeliveryService, Introduction, StorageConfig, }; -use crate::{delivery::DeliveryService, errors::ClientError}; +use logoschat_components::EphemeralRegistry; -pub struct ChatClient { - ctx: Context, - delivery: D, +use crate::errors::ClientError; + +pub struct ChatClient { + ctx: Context, } impl ChatClient { /// Create an in-memory, ephemeral client. Identity is lost on drop. pub fn new(name: impl Into, delivery: D) -> Self { + let registry = EphemeralRegistry::new(); let store = ChatStorage::in_memory(); Self { - ctx: Context::new_with_name(name, store), - delivery, + ctx: Context::new_with_name(name, delivery, registry, store).unwrap(), } } @@ -30,8 +31,9 @@ impl ChatClient { delivery: D, ) -> Result> { let store = ChatStorage::new(config).map_err(ChatError::from)?; - let ctx = Context::new_from_store(name, store)?; - Ok(Self { ctx, delivery }) + let registry = EphemeralRegistry::new(); + let ctx = Context::new_from_store(name, delivery, registry, store)?; + Ok(Self { ctx }) } /// Returns the installation name (identity label) of this client. @@ -86,7 +88,8 @@ impl ChatClient { envelopes: Vec, ) -> Result<(), ClientError> { for env in envelopes { - self.delivery.publish(env).map_err(ClientError::Delivery)?; + let mut delivery = self.ctx.ds(); + delivery.publish(env).map_err(ClientError::Delivery)?; } Ok(()) } diff --git a/crates/client/src/delivery.rs b/crates/client/src/delivery.rs deleted file mode 100644 index 853de0d..0000000 --- a/crates/client/src/delivery.rs +++ /dev/null @@ -1,6 +0,0 @@ -use libchat::AddressedEnvelope; - -pub trait DeliveryService { - type Error: std::fmt::Debug; - fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Self::Error>; -} diff --git a/crates/client/src/delivery_in_process.rs b/crates/client/src/delivery_in_process.rs index ae9d03a..3feff06 100644 --- a/crates/client/src/delivery_in_process.rs +++ b/crates/client/src/delivery_in_process.rs @@ -1,4 +1,4 @@ -use crate::{AddressedEnvelope, delivery::DeliveryService}; +use crate::{AddressedEnvelope, DeliveryService}; use std::collections::HashMap; use std::convert::Infallible; use std::sync::{Arc, RwLock}; @@ -10,7 +10,7 @@ type Message = Vec; /// Messages are stored in an append-only log per delivery address. Readers hold /// independent [`Cursor`]s and advance their position without consuming messages, /// so multiple consumers on the same address each see every message. -#[derive(Clone, Default)] +#[derive(Clone, Default, Debug)] pub struct MessageBus { log: Arc>>>, } @@ -80,7 +80,7 @@ impl Iterator for Cursor { /// clients can share one logical delivery service. Construct with a /// [`MessageBus`] and use [`cursor`](InProcessDelivery::cursor) / /// [`cursor_at_tail`](InProcessDelivery::cursor_at_tail) to read messages. -#[derive(Clone, Default)] +#[derive(Clone, Default, Debug)] pub struct InProcessDelivery(MessageBus); impl InProcessDelivery { @@ -108,4 +108,9 @@ impl DeliveryService for InProcessDelivery { self.0.push(envelope.delivery_address, envelope.data); Ok(()) } + + fn subscribe(&mut self, _delivery_address: &str) -> Result<(), Self::Error> { + // TODO: (P1) implement subscribe + Ok(()) + } } diff --git a/crates/client/src/errors.rs b/crates/client/src/errors.rs index ff7ac27..322104c 100644 --- a/crates/client/src/errors.rs +++ b/crates/client/src/errors.rs @@ -1,7 +1,7 @@ use libchat::ChatError; #[derive(Debug, thiserror::Error)] -pub enum ClientError { +pub enum ClientError { #[error(transparent)] Chat(#[from] ChatError), /// Crypto state advanced but at least one envelope failed delivery. diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index cfd9074..a0cac6f 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -1,12 +1,12 @@ mod client; -mod delivery; mod delivery_in_process; mod errors; pub use client::ChatClient; -pub use delivery::DeliveryService; pub use delivery_in_process::{Cursor, InProcessDelivery, MessageBus}; pub use errors::ClientError; // Re-export types callers need to interact with ChatClient -pub use libchat::{AddressedEnvelope, ContentData, ConversationIdOwned, StorageConfig}; +pub use libchat::{ + AddressedEnvelope, ContentData, ConversationIdOwned, DeliveryService, StorageConfig, +}; diff --git a/extensions/components/Cargo.toml b/extensions/components/Cargo.toml new file mode 100644 index 0000000..0be31bc --- /dev/null +++ b/extensions/components/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "components" +version = "0.1.0" +edition = "2024" + +[dependencies] +libchat = { workspace = true } +storage = { workspace = true } +crypto = { workspace = true } # Needed because Storage traits require "Identity" struct + +hex = "0.4.3" \ No newline at end of file diff --git a/extensions/components/src/contact_registry.rs b/extensions/components/src/contact_registry.rs new file mode 100644 index 0000000..2037cf6 --- /dev/null +++ b/extensions/components/src/contact_registry.rs @@ -0,0 +1,76 @@ +use std::{ + collections::HashMap, + fmt::Debug, + sync::{Arc, Mutex}, +}; + +use libchat::{AccountId, RegistrationService}; + +/// A Contact Registry used for Tests. +/// This implementation stores bundle bytes and then returns them when +/// retrieved +/// + +#[derive(Clone)] +pub struct EphemeralRegistry { + registry: Arc>>>, +} + +impl EphemeralRegistry { + pub fn new() -> Self { + Self { + registry: Arc::new(Mutex::new(HashMap::new())), + } + } +} + +impl Default for EphemeralRegistry { + fn default() -> Self { + Self::new() + } +} + +impl Debug for EphemeralRegistry { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let registry = self.registry.lock().unwrap(); + let truncated: Vec<(&String, String)> = registry + .iter() + .map(|(k, v)| { + let hex = if v.len() <= 8 { + hex::encode(v) + } else { + format!( + "{}..{}", + hex::encode(&v[..4]), + hex::encode(&v[v.len() - 4..]) + ) + }; + (k, hex) + }) + .collect(); + f.debug_struct("EphemeralRegistry") + .field("registry", &truncated) + .finish() + } +} + +impl RegistrationService for EphemeralRegistry { + type Error = String; + + fn register(&mut self, identity: &str, key_bundle: Vec) -> Result<(), Self::Error> { + self.registry + .lock() + .unwrap() + .insert(identity.to_string(), key_bundle); + Ok(()) + } + + fn retrieve(&self, identity: &AccountId) -> Result>, Self::Error> { + Ok(self + .registry + .lock() + .unwrap() + .get(identity.as_str()) + .cloned()) + } +} diff --git a/extensions/components/src/delivery.rs b/extensions/components/src/delivery.rs new file mode 100644 index 0000000..627ac55 --- /dev/null +++ b/extensions/components/src/delivery.rs @@ -0,0 +1,3 @@ +mod local_broadcaster; + +pub use local_broadcaster::LocalBroadcaster; diff --git a/extensions/components/src/delivery/local_broadcaster.rs b/extensions/components/src/delivery/local_broadcaster.rs new file mode 100644 index 0000000..5889def --- /dev/null +++ b/extensions/components/src/delivery/local_broadcaster.rs @@ -0,0 +1,122 @@ +use std::{ + cell::RefCell, + collections::{HashSet, VecDeque}, + hash::{DefaultHasher, Hash, Hasher}, + rc::Rc, +}; + +use libchat::{AddressedEnvelope, DeliveryService}; + +#[derive(Debug)] +struct BroadcasterShared { + /// Per-address message queue; all published messages are appended here. + messages: VecDeque, + base_index: usize, +} + +impl BroadcasterShared { + pub fn read(&self, cursor: usize) -> Option<&T> { + self.messages.get(cursor + self.base_index) + } + + pub fn tail(&self) -> usize { + self.messages.len() + self.base_index + } +} + +#[derive(Clone, Debug)] +pub struct LocalBroadcaster { + shared: Rc>>, + cursor: usize, + subscriptions: HashSet, + outbound_msgs: Vec, +} + +/// This is Lightweight DeliveryService which can be used for tests +/// and local examples. Messages are not delivered until `poll` is called +/// which allows for more fine grain test cases. +impl LocalBroadcaster { + pub fn new() -> Self { + let shared = Rc::new(RefCell::new(BroadcasterShared { + messages: VecDeque::new(), + base_index: 0, + })); + + let cursor = shared.borrow().tail(); + Self { + shared, + cursor, + subscriptions: HashSet::new(), + outbound_msgs: Vec::new(), + } + } + + /// Returns a new consumer that shares the same message store but has its + /// own independent cursor — it starts from the beginning of each address + /// queue regardless of what any other consumer has already processed. + pub fn new_consumer(&self) -> Self { + let inner = self.shared.clone(); + let cursor = inner.borrow().tail(); + Self { + shared: inner, + cursor, + subscriptions: HashSet::new(), + outbound_msgs: Vec::new(), + } + } + + /// Pulls all messages this consumer has not yet seen on `address`, + /// applying any registered filter. Advances the cursor so the same + /// messages are not returned again. + pub fn poll(&mut self) -> Option> { + loop { + let next = self.cursor; + match self.shared.borrow().read(next) { + None => return None, + Some(ae) => { + self.cursor = next + 1; + if self.subscriptions.contains(ae.delivery_address.as_str()) + && self.is_inbound(ae) + { + return Some(ae.data.clone()); + } + } + } + } + } + + fn msg_id(msg: &AddressedEnvelope) -> u64 { + let mut hasher = DefaultHasher::new(); + msg.data.as_slice().hash(&mut hasher); + hasher.finish() + } + + fn is_inbound(&self, msg: &AddressedEnvelope) -> bool { + let mid = Self::msg_id(msg); + !self.outbound_msgs.contains(&mid) + } +} + +impl Default for LocalBroadcaster { + fn default() -> Self { + Self::new() + } +} + +impl DeliveryService for LocalBroadcaster { + type Error = String; + + fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Self::Error> { + self.outbound_msgs.push(Self::msg_id(&envelope)); + self.shared.borrow_mut().messages.push_back(envelope); + + Ok(()) + } + + fn subscribe(&mut self, delivery_address: &str) -> Result<(), Self::Error> { + // Strict temporal ordering of subscriptions is not enforced. + // Subscriptions are evaluated on polling, not when the message is published + self.subscriptions.insert(delivery_address.to_string()); + Ok(()) + } +} diff --git a/extensions/components/src/lib.rs b/extensions/components/src/lib.rs new file mode 100644 index 0000000..d55c0f7 --- /dev/null +++ b/extensions/components/src/lib.rs @@ -0,0 +1,7 @@ +mod contact_registry; +mod delivery; +mod storage; + +pub use contact_registry::EphemeralRegistry; +pub use delivery::*; +pub use storage::*; diff --git a/extensions/components/src/storage.rs b/extensions/components/src/storage.rs new file mode 100644 index 0000000..36bbcbe --- /dev/null +++ b/extensions/components/src/storage.rs @@ -0,0 +1,3 @@ +mod in_memory_store; + +pub use in_memory_store::MemStore; diff --git a/extensions/components/src/storage/in_memory_store.rs b/extensions/components/src/storage/in_memory_store.rs new file mode 100644 index 0000000..2bf84f9 --- /dev/null +++ b/extensions/components/src/storage/in_memory_store.rs @@ -0,0 +1,136 @@ +use std::collections::HashMap; + +use storage::{ + // TODO: (P4) Importable crates need to be prefixed with a project name to avoid conflicts + ConversationMeta, + ConversationStore, + EphemeralKeyStore, + IdentityStore, + RatchetStore, +}; + +/// An Test focused StorageService which holds data in a hashmap +pub struct MemStore { + convos: HashMap, +} + +impl MemStore { + pub fn new() -> Self { + Self { + convos: HashMap::new(), + } + } +} + +impl Default for MemStore { + fn default() -> Self { + Self::new() + } +} + +impl ConversationStore for MemStore { + fn save_conversation( + &mut self, + meta: &storage::ConversationMeta, + ) -> Result<(), storage::StorageError> { + self.convos + .insert(meta.local_convo_id.clone(), meta.clone()); + Ok(()) + } + + fn load_conversation( + &self, + local_convo_id: &str, + ) -> Result, storage::StorageError> { + let a = self.convos.get(local_convo_id).cloned(); + Ok(a) + } + + fn remove_conversation(&mut self, _local_convo_id: &str) -> Result<(), storage::StorageError> { + todo!() + } + + fn load_conversations(&self) -> Result, storage::StorageError> { + Ok(self.convos.values().cloned().collect()) + } + + fn has_conversation(&self, local_convo_id: &str) -> Result { + Ok(self.convos.contains_key(local_convo_id)) + } +} + +impl IdentityStore for MemStore { + fn load_identity(&self) -> Result, storage::StorageError> { + // todo!() + Ok(None) + } + + fn save_identity(&mut self, _identity: &crypto::Identity) -> Result<(), storage::StorageError> { + // todo!() + Ok(()) + } +} + +impl EphemeralKeyStore for MemStore { + fn save_ephemeral_key( + &mut self, + _public_key_hex: &str, + _private_key: &crypto::PrivateKey, + ) -> Result<(), storage::StorageError> { + todo!() + } + + fn load_ephemeral_key( + &self, + _public_key_hex: &str, + ) -> Result, storage::StorageError> { + todo!() + } + + fn remove_ephemeral_key(&mut self, _public_key_hex: &str) -> Result<(), storage::StorageError> { + todo!() + } +} + +impl RatchetStore for MemStore { + fn save_ratchet_state( + &mut self, + _conversation_id: &str, + _state: &storage::RatchetStateRecord, + _skipped_keys: &[storage::SkippedKeyRecord], + ) -> Result<(), storage::StorageError> { + todo!() + } + + fn load_ratchet_state( + &self, + _conversation_id: &str, + ) -> Result { + todo!() + } + + fn load_skipped_keys( + &self, + _conversation_id: &str, + ) -> Result, storage::StorageError> { + todo!() + } + + fn has_ratchet_state(&self, _conversation_id: &str) -> Result { + todo!() + } + + fn delete_ratchet_state( + &mut self, + _conversation_id: &str, + ) -> Result<(), storage::StorageError> { + todo!() + } + + fn cleanup_old_skipped_keys( + &mut self, + _max_age_secs: i64, + ) -> Result { + todo!() + } +}