From aa380adb3776957f68e3e134d7e5bd2801b8d4ec Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Thu, 23 Apr 2026 22:12:20 -0700 Subject: [PATCH] Re-use components in integration tests --- Cargo.lock | 26 +- Cargo.toml | 22 +- core/conversations/Cargo.toml | 1 + core/conversations/src/context.rs | 368 ++---------------- .../src/conversation/privatev1.rs | 3 +- core/conversations/src/lib.rs | 3 +- core/integration_tests_core/Cargo.toml | 15 + core/integration_tests_core/src/lib.rs | 23 ++ .../tests/mls_integration.rs | 177 +++++++++ .../tests/private_integration.rs | 165 ++++++++ crates/client/src/client.rs | 6 +- extensions/components/Cargo.toml | 11 + extensions/components/src/contact_registry.rs | 62 +++ extensions/components/src/delivery.rs | 3 + .../src/delivery/local_broadcaster.rs | 116 ++++++ extensions/components/src/lib.rs | 7 + extensions/components/src/storage.rs | 3 + .../components/src/storage/in_memory_store.rs | 130 +++++++ extensions/delivery/Cargo.toml | 6 - extensions/delivery/src/lib.rs | 5 - extensions/delivery/src/local_bcast.rs | 58 --- 21 files changed, 784 insertions(+), 426 deletions(-) create mode 100644 core/integration_tests_core/Cargo.toml create mode 100644 core/integration_tests_core/src/lib.rs create mode 100644 core/integration_tests_core/tests/mls_integration.rs create mode 100644 core/integration_tests_core/tests/private_integration.rs create mode 100644 extensions/components/Cargo.toml create mode 100644 extensions/components/src/contact_registry.rs create mode 100644 extensions/components/src/delivery.rs create mode 100644 extensions/components/src/delivery/local_broadcaster.rs create mode 100644 extensions/components/src/lib.rs create mode 100644 extensions/components/src/storage.rs create mode 100644 extensions/components/src/storage/in_memory_store.rs delete mode 100644 extensions/delivery/Cargo.toml delete mode 100644 extensions/delivery/src/lib.rs delete mode 100644 extensions/delivery/src/local_bcast.rs diff --git a/Cargo.lock b/Cargo.lock index 1e397a6..edbb5ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -407,6 +407,16 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "components" +version = "0.1.0" +dependencies = [ + "crypto", + "hex", + "libchat", + "storage", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -633,10 +643,6 @@ dependencies = [ "syn 2.0.117", ] -[[package]] -name = "delivery" -version = "0.1.0" - [[package]] name = "der" version = "0.7.10" @@ -1317,6 +1323,17 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "integration_tests_core" +version = "0.1.0" +dependencies = [ + "chat-sqlite", + "components", + "libchat", + "storage", + "tempfile", +] + [[package]] name = "inventory" version = "0.3.24" @@ -1402,6 +1419,7 @@ dependencies = [ "blake2", "chat-proto", "chat-sqlite", + "components", "crypto", "double-ratchets", "hex", diff --git a/Cargo.toml b/Cargo.toml index 448b133..9fb2e90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,18 +8,28 @@ members = [ "core/crypto", "core/double-ratchets", "core/storage", + "core/integration_tests_core", "crates/client", "crates/client-ffi", - "bin/chat-cli", "extensions/delivery", + "bin/chat-cli", + "extensions/components", ] -# default-members = [ "core/*", "crates/*"] -default-members = [ "core/*"] +default-members = [ + "core/sqlite", + "core/conversations", + "core/crypto", + "core/double-ratchets", + "core/storage", + "core/integration_tests_core", +] [workspace.dependencies] -blake2 = "0.10" -libchat = { path = "core/conversations" } -storage = { path = "core/storage" } + blake2 = "0.10" + crypto = { path = "core/crypto" } + libchat = { path = "core/conversations" } + sqlite = { path = "core/sqlite"} + storage = { path = "core/storage" } # Panicking across FFI boundaries is UB; abort is the correct strategy for a # C FFI library. diff --git a/core/conversations/Cargo.toml b/core/conversations/Cargo.toml index f5492c4..5ee70d1 100644 --- a/core/conversations/Cargo.toml +++ b/core/conversations/Cargo.toml @@ -24,4 +24,5 @@ openmls = { version = "0.8.1", features = ["libcrux-provider"] } openmls_traits = "0.5.0" [dev-dependencies] +components = { package = "components", path = "../../extensions/components" } tempfile = "3" diff --git a/core/conversations/src/context.rs b/core/conversations/src/context.rs index 895ef64..ae968e6 100644 --- a/core/conversations/src/context.rs +++ b/core/conversations/src/context.rs @@ -1,3 +1,4 @@ +use std::cell::Ref; use std::sync::Arc; use std::{cell::RefCell, rc::Rc}; @@ -7,7 +8,7 @@ use crate::ctx::ClientCtx; use crate::account::LogosAccount; use crate::{DeliveryService, RegistrationService}; use crate::{ - conversation::{Conversation, ConversationId, Id, PrivateV1Convo}, + conversation::{Conversation, Id, PrivateV1Convo}, errors::ChatError, inbox::Inbox, inbox_v2::InboxV2, @@ -17,13 +18,13 @@ use crate::{ use crypto::{Identity, PublicKey}; use storage::{ChatStore, ConversationKind}; -pub use crate::conversation::ConversationIdOwned; +pub use crate::conversation::{ConversationId, ConversationIdOwned}; pub use crate::inbox::Introduction; // This is the main entry point to the conversations api. // Ctx manages lifetimes of objects to process and generate payloads. pub struct Context { - _identity: Rc, + identity: Rc, client_ctx: ClientCtx, inbox: Inbox, pq_inbox: InboxV2, @@ -66,7 +67,7 @@ impl Cont .map_err(ChatError::generic)?; Ok(Self { - _identity: identity, + identity: identity, client_ctx: ctx, inbox, pq_inbox, @@ -104,7 +105,7 @@ impl Cont .map_err(ChatError::generic)?; Ok(Self { - _identity: identity, + identity, client_ctx: ctx, pq_inbox, inbox, @@ -114,17 +115,29 @@ impl Cont }) } + pub fn store(&self) -> Ref<'_, CS> { + self.store.borrow() + } + + pub fn client_ctx(&mut self) -> &mut ClientCtx { + &mut self.client_ctx + } + + pub fn identity(&self) -> &Identity { + &self.identity + } + /// Returns the unique identifier associated with the account pub fn account_id(&self) -> String { self.pq_inbox.account.friendly_name() } pub fn installation_name(&self) -> &str { - self._identity.get_name() + self.identity.get_name() } pub fn installation_key(&self) -> PublicKey { - self._identity.public_key() + self.identity.public_key() } pub fn create_private_convo( @@ -253,6 +266,13 @@ impl Cont Ok(intro.into()) } + pub fn get_convo( + &mut self, + convo_id: ConversationId, + ) -> Result>, ChatError> { + self.load_group_convo(convo_id) + } + /// Loads a conversation from DB by constructing it from metadata. fn load_convo(&mut self, convo_id: ConversationId) -> Result, ChatError> { let record = self @@ -307,337 +327,3 @@ impl Cont } } } - -#[cfg(test)] -mod tests { - use std::ops::{Deref, DerefMut}; - - use sqlite::{ChatStorage, StorageConfig}; - use storage::{ConversationStore, IdentityStore}; - use tempfile::tempdir; - - use crate::{ - test_utils::{EphemeralRegistry, LocalBroadcaster, MemStore}, - utils::hex_trunc, - }; - - use super::*; - - type TestContext = Context; - - fn send_and_verify( - sender: &mut TestContext, - receiver: &mut TestContext, - convo_id: ConversationId, - content: &[u8], - ) { - let payloads = sender.send_content(convo_id, content).unwrap(); - let payload = payloads.first().unwrap(); - let received = receiver - .handle_payload(&payload.data) - .unwrap() - .expect("expected content"); - assert_eq!(content, received.data.as_slice()); - assert!(!received.is_new_convo); // Check that `is_new_convo` is FALSE - } - - // Simple client Functionality for testing - struct Client { - inner: Context, - on_content: Option>, - } - - impl Client { - fn init( - ctx: Context, - cb: Option, - ) -> Self { - Client { - inner: ctx, - on_content: cb.map(|f| Box::new(f) as Box), - } - } - - fn process_messages(&mut self) { - while let Some(data) = self.client_ctx.ds().poll() { - let res = self.handle_payload(&data).unwrap(); - if let Some(cb) = &self.on_content { - match res { - Some(content_data) => cb(content_data), - None => continue, - } - } - } - } - - fn convo( - &mut self, - convo_id: &str, - ) -> Box> { - // TODO: (P1) Convos are being copied somewhere, which means hanging on to a reference causes state desync - self.load_group_convo(convo_id).unwrap() - } - } - - impl Deref for Client { - type Target = Context; - - fn deref(&self) -> &Self::Target { - &self.inner - } - } - - impl DerefMut for Client { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.inner - } - } - - // Higher order function to handle printing - fn pretty_print(prefix: impl Into) -> Box { - let prefix = prefix.into(); - return Box::new(move |c: ContentData| { - let cid = hex_trunc(c.conversation_id.as_bytes()); - let content = String::from_utf8(c.data).unwrap(); - println!("{} ({}) {}", prefix, cid, content) - }); - } - - fn process(clients: &mut Vec) { - for client in clients { - client.process_messages(); - } - } - - #[test] - fn create_group() { - let ds = LocalBroadcaster::new(); - let rs = EphemeralRegistry::new(); - - let saro_ctx = - Context::new_with_name("saro", ds.new_consumer(), rs.clone(), MemStore::new()).unwrap(); - let raya_ctx = - Context::new_with_name("raya", ds.clone(), rs.clone(), MemStore::new()).unwrap(); - - let mut clients = vec![ - Client::init(saro_ctx, Some(pretty_print(" Saro "))), - Client::init(raya_ctx, Some(pretty_print(" Raya "))), - ]; - - const SARO: usize = 0; - const RAYA: usize = 1; - - let raya_id = clients[RAYA].account_id(); - let s_convo = clients[SARO] - .create_group_convo(&[raya_id.as_ref()]) - .unwrap(); - - let convo_id = s_convo.id(); - - // Raya can read this message because - // 1) It was sent after add_members was committed, and - // 2) LocalBroadcaster provides historical messages. - - clients[SARO] - .convo(convo_id) - .send_content( - &mut clients[SARO].client_ctx, - b"ok who broke the group chat again", - ) - .unwrap(); - - // clients[SARO].process_messages(); - process(&mut clients); - - clients[RAYA] - .convo(convo_id) - .send_content( - &mut clients[RAYA].client_ctx, - b"it was literally working five minutes ago", - ) - .unwrap(); - - // clients[SARO].process_messages(); - process(&mut clients); - - let pax_ctx = Context::new_with_name("pax", ds, rs, MemStore::new()).unwrap(); - clients.push(Client::init(pax_ctx, Some(pretty_print(" Pax")))); - const PAX: usize = 2; - - let pax_id = clients[PAX].account_id(); - clients[SARO] - .convo(convo_id) - .add_member(&mut clients[SARO].client_ctx, &[pax_id.as_ref()]) - .unwrap(); - - // clients[SARO].process_messages(); - process(&mut clients); - - clients[PAX] - .convo(convo_id) - .send_content( - &mut clients[PAX].client_ctx, - b"ngl the key rotation is cooked", - ) - .unwrap(); - - // clients[SARO].process_messages(); - - process(&mut clients); - - clients[SARO] - .convo(convo_id) - .send_content( - &mut clients[SARO].client_ctx, - b"bro we literally just added you to the group ", - ) - .unwrap(); - - process(&mut clients); - // process(&mut clients); - } - - #[test] - fn ctx_integration() { - let ds = LocalBroadcaster::new(); - let rs = EphemeralRegistry::new(); - - let mut saro = - Context::new_with_name("saro", ds.clone(), rs.clone(), ChatStorage::in_memory()) - .unwrap(); - let mut raya = Context::new_with_name("raya", ds, rs, ChatStorage::in_memory()).unwrap(); - - // Raya creates intro bundle and sends to Saro - let bundle = raya.create_intro_bundle().unwrap(); - let intro = Introduction::try_from(bundle.as_slice()).unwrap(); - - // Saro initiates conversation with Raya - let mut content = vec![10]; - let (saro_convo_id, payloads) = saro.create_private_convo(&intro, &content).unwrap(); - - // Raya receives initial message - let payload = payloads.first().unwrap(); - let initial_content = raya - .handle_payload(&payload.data) - .unwrap() - .expect("expected initial content"); - - let raya_convo_id = initial_content.conversation_id; - assert_eq!(content, initial_content.data); - assert!(initial_content.is_new_convo); - - // Exchange messages back and forth - for _ in 0..10 { - content.push(content.last().unwrap() + 1); - send_and_verify(&mut raya, &mut saro, &raya_convo_id, &content); - - content.push(content.last().unwrap() + 1); - send_and_verify(&mut saro, &mut raya, &saro_convo_id, &content); - } - } - - #[test] - fn identity_persistence() { - let ds = LocalBroadcaster::new(); - let rs = EphemeralRegistry::new(); - let store1 = ChatStorage::new(StorageConfig::InMemory).unwrap(); - let ctx1 = Context::new_with_name("alice", ds, rs, store1).unwrap(); - let pubkey1 = ctx1._identity.public_key(); - let name1 = ctx1.installation_name().to_string(); - - // For persistence tests with file-based storage, we'd need a shared db. - // With in-memory, we just verify the identity was created. - assert_eq!(name1, "alice"); - assert!(!pubkey1.as_bytes().iter().all(|&b| b == 0)); - } - - #[test] - fn open_persists_new_identity() { - let dir = tempdir().unwrap(); - let db_path = dir.path().join("chat.sqlite"); - let db_path = db_path.to_string_lossy().into_owned(); - - let ds = LocalBroadcaster::new(); - let rs = EphemeralRegistry::new(); - let store = ChatStorage::new(StorageConfig::File(db_path.clone())).unwrap(); - let ctx = Context::new_from_store("alice", ds, rs, store).unwrap(); - let pubkey = ctx._identity.public_key(); - drop(ctx); - - let store = ChatStorage::new(StorageConfig::File(db_path)).unwrap(); - let persisted = store.load_identity().unwrap().unwrap(); - - assert_eq!(persisted.get_name(), "alice"); - assert_eq!(persisted.public_key(), pubkey); - } - - #[test] - fn conversation_metadata_persistence() { - let ds = LocalBroadcaster::new(); - let rs = EphemeralRegistry::new(); - let mut alice = - Context::new_with_name("alice", ds.clone(), rs.clone(), ChatStorage::in_memory()) - .unwrap(); - let mut bob = Context::new_with_name("bob", ds, rs, ChatStorage::in_memory()).unwrap(); - - let bundle = alice.create_intro_bundle().unwrap(); - let intro = Introduction::try_from(bundle.as_slice()).unwrap(); - let (_, payloads) = bob.create_private_convo(&intro, b"hi").unwrap(); - - let payload = payloads.first().unwrap(); - let content = alice.handle_payload(&payload.data).unwrap().unwrap(); - assert!(content.is_new_convo); - - let convos = alice.store.borrow().load_conversations().unwrap(); - assert_eq!(convos.len(), 1); - assert_eq!(convos[0].kind.as_str(), "private_v1"); - } - - #[test] - fn conversation_full_flow() { - let ds = LocalBroadcaster::new(); - let rs = EphemeralRegistry::new(); - let mut alice = - Context::new_with_name("alice", ds.clone(), rs.clone(), ChatStorage::in_memory()) - .unwrap(); - let mut bob = Context::new_with_name("bob", ds, rs, ChatStorage::in_memory()).unwrap(); - - let bundle = alice.create_intro_bundle().unwrap(); - let intro = Introduction::try_from(bundle.as_slice()).unwrap(); - let (bob_convo_id, payloads) = bob.create_private_convo(&intro, b"hello").unwrap(); - - let payload = payloads.first().unwrap(); - let content = alice.handle_payload(&payload.data).unwrap().unwrap(); - let alice_convo_id = content.conversation_id; - - let payloads = alice.send_content(&alice_convo_id, b"reply 1").unwrap(); - let payload = payloads.first().unwrap(); - bob.handle_payload(&payload.data).unwrap().unwrap(); - - let payloads = bob.send_content(&bob_convo_id, b"reply 2").unwrap(); - let payload = payloads.first().unwrap(); - alice.handle_payload(&payload.data).unwrap().unwrap(); - - // Verify conversation list - let convo_ids = alice.list_conversations().unwrap(); - assert_eq!(convo_ids.len(), 1); - - // Continue exchanging messages - let payloads = bob.send_content(&bob_convo_id, b"more messages").unwrap(); - let payload = payloads.first().unwrap(); - let content = alice - .handle_payload(&payload.data) - .expect("should decrypt") - .expect("should have content"); - assert_eq!(content.data, b"more messages"); - - // Alice can also send back - let payloads = alice.send_content(&alice_convo_id, b"alice reply").unwrap(); - let payload = payloads.first().unwrap(); - let content = bob - .handle_payload(&payload.data) - .unwrap() - .expect("bob should receive"); - assert_eq!(content.data, b"alice reply"); - } -} diff --git a/core/conversations/src/conversation/privatev1.rs b/core/conversations/src/conversation/privatev1.rs index b7736d8..cb83396 100644 --- a/core/conversations/src/conversation/privatev1.rs +++ b/core/conversations/src/conversation/privatev1.rs @@ -13,8 +13,7 @@ use std::{cell::RefCell, fmt::Debug, rc::Rc, sync::Arc}; use storage::{ConversationKind, ConversationMeta, ConversationStore}; use crate::{ - context::ConversationIdOwned, - conversation::{ChatError, ConversationId, Convo, Id}, + conversation::{ChatError, ConversationId, ConversationIdOwned, Convo, Id}, errors::EncryptionError, proto, types::{AddressedEncryptedPayload, ContentData}, diff --git a/core/conversations/src/lib.rs b/core/conversations/src/lib.rs index a979083..63417a4 100644 --- a/core/conversations/src/lib.rs +++ b/core/conversations/src/lib.rs @@ -15,7 +15,8 @@ pub use account::LogosAccount; #[cfg(test)] mod test_utils; -pub use context::{Context, ConversationIdOwned, Introduction}; +pub use context::{Context, ConversationId, ConversationIdOwned, Introduction}; +pub use conversation::GroupConvo; pub use errors::ChatError; pub use external_traits::{DeliveryService, RegistrationService}; pub use sqlite::ChatStorage; diff --git a/core/integration_tests_core/Cargo.toml b/core/integration_tests_core/Cargo.toml new file mode 100644 index 0000000..34ff420 --- /dev/null +++ b/core/integration_tests_core/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "integration_tests_core" +version = "0.1.0" +edition = "2024" + +# [[test]] +# name = "integration_tests_core" + +[dev-dependencies] +libchat = { workspace = true } +storage = { workspace = true } +sqlite = {package = "chat-sqlite", path ="../sqlite"} + +components = { path = "../../extensions/components" } +tempfile = "3" diff --git a/core/integration_tests_core/src/lib.rs b/core/integration_tests_core/src/lib.rs new file mode 100644 index 0000000..7d18d35 --- /dev/null +++ b/core/integration_tests_core/src/lib.rs @@ -0,0 +1,23 @@ +// use std::ops::{Deref, DerefMut}; + +// use components::{EphemeralRegistry, LocalBroadcaster, MemStore}; +// use libchat::{ +// AddressedEnvelope, ChatStorage, ContentData, Context, ConversationIdOwned, Introduction, +// StorageConfig, +// }; + +// fn send_and_verify( +// sender: &mut Context, +// receiver: &mut Context, +// convo_id: &str, +// content: &[u8], +// ) { +// let payloads = sender.send_content(convo_id, content).unwrap(); +// let payload = payloads.first().unwrap(); +// let received = receiver +// .handle_payload(&payload.data) +// .unwrap() +// .expect("expected content"); +// assert_eq!(content, received.data.as_slice()); +// assert!(!received.is_new_convo); +// } diff --git a/core/integration_tests_core/tests/mls_integration.rs b/core/integration_tests_core/tests/mls_integration.rs new file mode 100644 index 0000000..ff81b30 --- /dev/null +++ b/core/integration_tests_core/tests/mls_integration.rs @@ -0,0 +1,177 @@ +use std::ops::{Deref, DerefMut}; + +use components::{EphemeralRegistry, LocalBroadcaster, MemStore}; +use libchat::{ChatStorage, ContentData, Context, ConversationId, GroupConvo}; + +type TestContext = Context; + +fn send_and_verify( + sender: &mut TestContext, + receiver: &mut TestContext, + convo_id: ConversationId, + content: &[u8], +) { + let payloads = sender.send_content(convo_id, content).unwrap(); + let payload = payloads.first().unwrap(); + let received = receiver + .handle_payload(&payload.data) + .unwrap() + .expect("expected content"); + assert_eq!(content, received.data.as_slice()); + assert!(!received.is_new_convo); // Check that `is_new_convo` is FALSE +} + +// Simple client Functionality for testing +struct Client { + inner: Context, + on_content: Option>, +} + +impl Client { + fn init( + ctx: Context, + cb: Option, + ) -> Self { + Client { + inner: ctx, + on_content: cb.map(|f| Box::new(f) as Box), + } + } + + fn process_messages(&mut self) { + while let Some(data) = self.client_ctx().ds().poll() { + let res = self.handle_payload(&data).unwrap(); + if let Some(cb) = &self.on_content { + match res { + Some(content_data) => cb(content_data), + None => continue, + } + } + } + } + + fn convo( + &mut self, + convo_id: &str, + ) -> Box> { + // TODO: (P1) Convos are being copied somewhere, which means hanging on to a reference causes state desync + self.get_convo(convo_id).unwrap() + } +} + +impl Deref for Client { + type Target = Context; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for Client { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +// Higher order function to handle printing +fn pretty_print(prefix: impl Into) -> Box { + let prefix = prefix.into(); + return Box::new(move |c: ContentData| { + let cid = c.conversation_id.as_bytes(); + let content = String::from_utf8(c.data).unwrap(); + println!("{} ({:?}) {}", prefix, cid, content) + }); +} + +fn process(clients: &mut Vec) { + for client in clients { + client.process_messages(); + } +} + +#[test] +fn create_group() { + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + + let saro_ctx = + Context::new_with_name("saro", ds.new_consumer(), rs.clone(), MemStore::new()).unwrap(); + let raya_ctx = Context::new_with_name("raya", ds.clone(), rs.clone(), MemStore::new()).unwrap(); + + let mut clients = vec![ + Client::init(saro_ctx, Some(pretty_print(" Saro "))), + Client::init(raya_ctx, Some(pretty_print(" Raya "))), + ]; + + const SARO: usize = 0; + const RAYA: usize = 1; + + let raya_id = clients[RAYA].account_id(); + let s_convo = clients[SARO] + .create_group_convo(&[raya_id.as_ref()]) + .unwrap(); + + let convo_id = s_convo.id(); + + // Raya can read this message because + // 1) It was sent after add_members was committed, and + // 2) LocalBroadcaster provides historical messages. + + clients[SARO] + .convo(convo_id) + .send_content( + &mut clients[SARO].client_ctx(), + b"ok who broke the group chat again", + ) + .unwrap(); + + // clients[SARO].process_messages(); + process(&mut clients); + + clients[RAYA] + .convo(convo_id) + .send_content( + &mut clients[RAYA].client_ctx(), + b"it was literally working five minutes ago", + ) + .unwrap(); + + // clients[SARO].process_messages(); + process(&mut clients); + + let pax_ctx = Context::new_with_name("pax", ds, rs, MemStore::new()).unwrap(); + clients.push(Client::init(pax_ctx, Some(pretty_print(" Pax")))); + const PAX: usize = 2; + + let pax_id = clients[PAX].account_id(); + clients[SARO] + .convo(convo_id) + .add_member(&mut clients[SARO].client_ctx(), &[pax_id.as_ref()]) + .unwrap(); + + // clients[SARO].process_messages(); + process(&mut clients); + + clients[PAX] + .convo(convo_id) + .send_content( + &mut clients[PAX].client_ctx(), + b"ngl the key rotation is cooked", + ) + .unwrap(); + + // clients[SARO].process_messages(); + + process(&mut clients); + + clients[SARO] + .convo(convo_id) + .send_content( + &mut clients[SARO].client_ctx(), + b"bro we literally just added you to the group ", + ) + .unwrap(); + + process(&mut clients); + // process(&mut clients); +} diff --git a/core/integration_tests_core/tests/private_integration.rs b/core/integration_tests_core/tests/private_integration.rs new file mode 100644 index 0000000..b9eec0b --- /dev/null +++ b/core/integration_tests_core/tests/private_integration.rs @@ -0,0 +1,165 @@ +use std::ops::{Deref, DerefMut}; + +use libchat::{AddressedEnvelope, Context, Introduction}; +use sqlite::{ChatStorage, StorageConfig}; +use storage::{ConversationStore, IdentityStore}; +use tempfile::tempdir; + +use components::{EphemeralRegistry, LocalBroadcaster, MemStore}; + +fn send_and_verify( + sender: &mut Context, + receiver: &mut Context, + convo_id: &str, + content: &[u8], +) { + let payloads = sender.send_content(convo_id, content).unwrap(); + let payload = payloads.first().unwrap(); + let received = receiver + .handle_payload(&payload.data) + .unwrap() + .expect("expected content"); + assert_eq!(content, received.data.as_slice()); + assert!(!received.is_new_convo); +} + +#[test] +fn ctx_integration() { + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + + let mut saro = + Context::new_with_name("saro", ds.clone(), rs.clone(), ChatStorage::in_memory()).unwrap(); + let mut raya = Context::new_with_name("raya", ds, rs, ChatStorage::in_memory()).unwrap(); + + // Raya creates intro bundle and sends to Saro + let bundle = raya.create_intro_bundle().unwrap(); + let intro = Introduction::try_from(bundle.as_slice()).unwrap(); + + // Saro initiates conversation with Raya + let mut content = vec![10]; + let (saro_convo_id, payloads) = saro.create_private_convo(&intro, &content).unwrap(); + + // Raya receives initial message + let payload = payloads.first().unwrap(); + let initial_content = raya + .handle_payload(&payload.data) + .unwrap() + .expect("expected initial content"); + + let raya_convo_id = initial_content.conversation_id; + assert_eq!(content, initial_content.data); + assert!(initial_content.is_new_convo); + + // Exchange messages back and forth + for _ in 0..10 { + content.push(content.last().unwrap() + 1); + send_and_verify(&mut raya, &mut saro, &raya_convo_id, &content); + + content.push(content.last().unwrap() + 1); + send_and_verify(&mut saro, &mut raya, &saro_convo_id, &content); + } +} + +#[test] +fn identity_persistence() { + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + let store1 = ChatStorage::new(StorageConfig::InMemory).unwrap(); + let ctx1 = Context::new_with_name("alice", ds, rs, store1).unwrap(); + let pubkey1 = ctx1.identity().public_key(); + let name1 = ctx1.installation_name().to_string(); + + // For persistence tests with file-based storage, we'd need a shared db. + // With in-memory, we just verify the identity was created. + assert_eq!(name1, "alice"); + assert!(!pubkey1.as_bytes().iter().all(|&b| b == 0)); +} + +#[test] +fn open_persists_new_identity() { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("chat.sqlite"); + let db_path = db_path.to_string_lossy().into_owned(); + + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + let store = ChatStorage::new(StorageConfig::File(db_path.clone())).unwrap(); + let ctx = Context::new_from_store("alice", ds, rs, store).unwrap(); + let pubkey = ctx.identity().public_key(); + drop(ctx); + + let store = ChatStorage::new(StorageConfig::File(db_path)).unwrap(); + let persisted = store.load_identity().unwrap().unwrap(); + + assert_eq!(persisted.get_name(), "alice"); + assert_eq!(persisted.public_key(), pubkey); +} + +#[test] +fn conversation_metadata_persistence() { + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + let mut alice = + Context::new_with_name("alice", ds.clone(), rs.clone(), ChatStorage::in_memory()).unwrap(); + let mut bob = Context::new_with_name("bob", ds, rs, ChatStorage::in_memory()).unwrap(); + + let bundle = alice.create_intro_bundle().unwrap(); + let intro = Introduction::try_from(bundle.as_slice()).unwrap(); + let (_, payloads) = bob.create_private_convo(&intro, b"hi").unwrap(); + + let payload = payloads.first().unwrap(); + let content = alice.handle_payload(&payload.data).unwrap().unwrap(); + assert!(content.is_new_convo); + + let convos = alice.store().load_conversations().unwrap(); + assert_eq!(convos.len(), 1); + assert_eq!(convos[0].kind.as_str(), "private_v1"); +} + +#[test] +fn conversation_full_flow() { + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + let mut alice = + Context::new_with_name("alice", ds.clone(), rs.clone(), ChatStorage::in_memory()).unwrap(); + let mut bob = Context::new_with_name("bob", ds, rs, ChatStorage::in_memory()).unwrap(); + + let bundle = alice.create_intro_bundle().unwrap(); + let intro = Introduction::try_from(bundle.as_slice()).unwrap(); + let (bob_convo_id, payloads) = bob.create_private_convo(&intro, b"hello").unwrap(); + + let payload = payloads.first().unwrap(); + let content = alice.handle_payload(&payload.data).unwrap().unwrap(); + let alice_convo_id = content.conversation_id; + + let payloads = alice.send_content(&alice_convo_id, b"reply 1").unwrap(); + let payload = payloads.first().unwrap(); + bob.handle_payload(&payload.data).unwrap().unwrap(); + + let payloads = bob.send_content(&bob_convo_id, b"reply 2").unwrap(); + let payload = payloads.first().unwrap(); + alice.handle_payload(&payload.data).unwrap().unwrap(); + + // Verify conversation list + let convo_ids = alice.list_conversations().unwrap(); + assert_eq!(convo_ids.len(), 1); + + // Continue exchanging messages + let payloads = bob.send_content(&bob_convo_id, b"more messages").unwrap(); + let payload = payloads.first().unwrap(); + let content = alice + .handle_payload(&payload.data) + .expect("should decrypt") + .expect("should have content"); + assert_eq!(content.data, b"more messages"); + + // Alice can also send back + let payloads = alice.send_content(&alice_convo_id, b"alice reply").unwrap(); + let payload = payloads.first().unwrap(); + let content = bob + .handle_payload(&payload.data) + .unwrap() + .expect("bob should receive"); + assert_eq!(content.data, b"alice reply"); +} diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 50e6099..a51b397 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -1,16 +1,16 @@ use libchat::{ - AddressedEnvelope, ChatError, ChatStorage, ContentData, Context, ConversationIdOwned, + AddressedEnvelope, ChatError, ChatStorage, ContentData, Context, ConversationIdOwned, RegistrationService DeliveryService, Introduction, StorageConfig, }; use crate::errors::ClientError; pub struct ChatClient { - ctx: Context, + ctx: Context, delivery: D, } -impl ChatClient { +impl ChatClient { /// Create an in-memory, ephemeral client. Identity is lost on drop. pub fn new(name: impl Into, delivery: D) -> Self { let store = ChatStorage::in_memory(); diff --git a/extensions/components/Cargo.toml b/extensions/components/Cargo.toml new file mode 100644 index 0000000..0be31bc --- /dev/null +++ b/extensions/components/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "components" +version = "0.1.0" +edition = "2024" + +[dependencies] +libchat = { workspace = true } +storage = { workspace = true } +crypto = { workspace = true } # Needed because Storage traits require "Identity" struct + +hex = "0.4.3" \ No newline at end of file diff --git a/extensions/components/src/contact_registry.rs b/extensions/components/src/contact_registry.rs new file mode 100644 index 0000000..02f2f26 --- /dev/null +++ b/extensions/components/src/contact_registry.rs @@ -0,0 +1,62 @@ +use std::{ + collections::HashMap, + fmt::Debug, + sync::{Arc, Mutex}, +}; + +use libchat::RegistrationService; + +/// A Contact Registry used for Tests. +/// This implementation stores bundle bytes and then returns them when +/// retreived +/// + +#[derive(Clone)] +pub struct EphemeralRegistry { + registry: Arc>>>, +} + +impl EphemeralRegistry { + pub fn new() -> Self { + Self { + registry: Arc::new(Mutex::new(HashMap::new())), + } + } +} + +impl Debug for EphemeralRegistry { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let registry = self.registry.lock().unwrap(); + let truncated: Vec<(&String, String)> = registry + .iter() + .map(|(k, v)| { + let hex = if v.len() <= 8 { + hex::encode(v) + } else { + format!( + "{}..{}", + hex::encode(&v[..4]), + hex::encode(&v[v.len() - 4..]) + ) + }; + (k, hex) + }) + .collect(); + f.debug_struct("EphemeralRegistry") + .field("registry", &truncated) + .finish() + } +} + +impl RegistrationService for EphemeralRegistry { + type Error = String; + + fn register(&mut self, identity: String, key_bundle: Vec) -> Result<(), Self::Error> { + self.registry.lock().unwrap().insert(identity, key_bundle); + Ok(()) + } + + fn retreive(&self, identity: &str) -> Result>, Self::Error> { + Ok(self.registry.lock().unwrap().get(identity).cloned()) + } +} diff --git a/extensions/components/src/delivery.rs b/extensions/components/src/delivery.rs new file mode 100644 index 0000000..627ac55 --- /dev/null +++ b/extensions/components/src/delivery.rs @@ -0,0 +1,3 @@ +mod local_broadcaster; + +pub use local_broadcaster::LocalBroadcaster; diff --git a/extensions/components/src/delivery/local_broadcaster.rs b/extensions/components/src/delivery/local_broadcaster.rs new file mode 100644 index 0000000..9828b26 --- /dev/null +++ b/extensions/components/src/delivery/local_broadcaster.rs @@ -0,0 +1,116 @@ +use std::{ + cell::RefCell, + collections::{HashSet, VecDeque}, + hash::{DefaultHasher, Hash, Hasher}, + rc::Rc, +}; + +use libchat::{AddressedEnvelope, DeliveryService}; + +#[derive(Debug)] +struct BroadcasterShared { + /// Per-address message queue; all published messages are appended here. + messages: VecDeque, + base_index: usize, +} + +impl BroadcasterShared { + pub fn read(&self, cursor: usize) -> Option<&T> { + self.messages.get(cursor + self.base_index) + } + + pub fn tail(&self) -> usize { + self.messages.len() + self.base_index + } +} + +#[derive(Clone, Debug)] +pub struct LocalBroadcaster { + shared: Rc>>, + cursor: usize, + subscriptions: HashSet, + outbound_msgs: Vec, +} + +/// This is Lightweight DeliveryService which can be used for tests +/// and local examples. Messages are not delivered until `poll` is called +/// which allows for more fine grain test cases. +impl LocalBroadcaster { + pub fn new() -> Self { + let shared = Rc::new(RefCell::new(BroadcasterShared { + messages: VecDeque::new(), + base_index: 0, + })); + + let cursor = shared.borrow().tail(); + Self { + shared, + cursor, + subscriptions: HashSet::new(), + outbound_msgs: Vec::new(), + } + } + + /// Returns a new consumer that shares the same message store but has its + /// own independent cursor — it starts from the beginning of each address + /// queue regardless of what any other consumer has already processed. + pub fn new_consumer(&self) -> Self { + let inner = self.shared.clone(); + let cursor = inner.borrow().tail(); + Self { + shared: inner, + cursor, + subscriptions: HashSet::new(), + outbound_msgs: Vec::new(), + } + } + + /// Pulls all messages this consumer has not yet seen on `address`, + /// applying any registered filter. Advances the cursor so the same + /// messages are not returned again. + pub fn poll(&mut self) -> Option> { + loop { + let next = self.cursor; + match self.shared.borrow().read(next) { + None => return None, + Some(ae) => { + self.cursor = next + 1; + if self.subscriptions.contains(ae.delivery_address.as_str()) + && self.is_inbound(ae) + { + return Some(ae.data.clone()); + } + } + } + } + } + + fn msg_id(msg: &AddressedEnvelope) -> u64 { + let mut hasher = DefaultHasher::new(); + msg.data.as_slice().hash(&mut hasher); + hasher.finish() + } + + fn is_inbound(&self, msg: &AddressedEnvelope) -> bool { + let mid = Self::msg_id(msg); + !self.outbound_msgs.contains(&mid) + } +} + +impl DeliveryService for LocalBroadcaster { + type Error = String; + + fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Self::Error> { + self.outbound_msgs.push(Self::msg_id(&envelope)); + self.shared.borrow_mut().messages.push_back(envelope); + + Ok(()) + } + + fn subscribe(&mut self, delivery_address: String) -> Result<(), Self::Error> { + // Strict temporal ordering of subscriptions is not enforced. + // Subscriptions are evaluated on polling, not when the message is published + self.subscriptions.insert(delivery_address); + Ok(()) + } +} diff --git a/extensions/components/src/lib.rs b/extensions/components/src/lib.rs new file mode 100644 index 0000000..d55c0f7 --- /dev/null +++ b/extensions/components/src/lib.rs @@ -0,0 +1,7 @@ +mod contact_registry; +mod delivery; +mod storage; + +pub use contact_registry::EphemeralRegistry; +pub use delivery::*; +pub use storage::*; diff --git a/extensions/components/src/storage.rs b/extensions/components/src/storage.rs new file mode 100644 index 0000000..36bbcbe --- /dev/null +++ b/extensions/components/src/storage.rs @@ -0,0 +1,3 @@ +mod in_memory_store; + +pub use in_memory_store::MemStore; diff --git a/extensions/components/src/storage/in_memory_store.rs b/extensions/components/src/storage/in_memory_store.rs new file mode 100644 index 0000000..fcbc8b1 --- /dev/null +++ b/extensions/components/src/storage/in_memory_store.rs @@ -0,0 +1,130 @@ +use std::collections::HashMap; + +use storage::{ + // TODO: (P4) Importable crates need to be prefixed with a project name to avoid conflicts + ConversationMeta, + ConversationStore, + EphemeralKeyStore, + IdentityStore, + RatchetStore, +}; + +/// An Test focused StorageService which holds data in a hashmap +pub struct MemStore { + convos: HashMap, +} + +impl MemStore { + pub fn new() -> Self { + Self { + convos: HashMap::new(), + } + } +} + +impl ConversationStore for MemStore { + fn save_conversation( + &mut self, + meta: &storage::ConversationMeta, + ) -> Result<(), storage::StorageError> { + self.convos + .insert(meta.local_convo_id.clone(), meta.clone()); + Ok(()) + } + + fn load_conversation( + &self, + local_convo_id: &str, + ) -> Result, storage::StorageError> { + let a = self.convos.get(local_convo_id).cloned(); + Ok(a) + } + + fn remove_conversation(&mut self, _local_convo_id: &str) -> Result<(), storage::StorageError> { + todo!() + } + + fn load_conversations(&self) -> Result, storage::StorageError> { + Ok(self.convos.values().cloned().collect()) + } + + fn has_conversation(&self, local_convo_id: &str) -> Result { + Ok(self.convos.contains_key(local_convo_id)) + } +} + +impl IdentityStore for MemStore { + fn load_identity(&self) -> Result, storage::StorageError> { + // todo!() + Ok(None) + } + + fn save_identity(&mut self, _identity: &crypto::Identity) -> Result<(), storage::StorageError> { + // todo!() + Ok(()) + } +} + +impl EphemeralKeyStore for MemStore { + fn save_ephemeral_key( + &mut self, + _public_key_hex: &str, + _private_key: &crypto::PrivateKey, + ) -> Result<(), storage::StorageError> { + todo!() + } + + fn load_ephemeral_key( + &self, + _public_key_hex: &str, + ) -> Result, storage::StorageError> { + todo!() + } + + fn remove_ephemeral_key(&mut self, _public_key_hex: &str) -> Result<(), storage::StorageError> { + todo!() + } +} + +impl RatchetStore for MemStore { + fn save_ratchet_state( + &mut self, + _conversation_id: &str, + _state: &storage::RatchetStateRecord, + _skipped_keys: &[storage::SkippedKeyRecord], + ) -> Result<(), storage::StorageError> { + todo!() + } + + fn load_ratchet_state( + &self, + _conversation_id: &str, + ) -> Result { + todo!() + } + + fn load_skipped_keys( + &self, + _conversation_id: &str, + ) -> Result, storage::StorageError> { + todo!() + } + + fn has_ratchet_state(&self, _conversation_id: &str) -> Result { + todo!() + } + + fn delete_ratchet_state( + &mut self, + _conversation_id: &str, + ) -> Result<(), storage::StorageError> { + todo!() + } + + fn cleanup_old_skipped_keys( + &mut self, + _max_age_secs: i64, + ) -> Result { + todo!() + } +} diff --git a/extensions/delivery/Cargo.toml b/extensions/delivery/Cargo.toml deleted file mode 100644 index f55ccf8..0000000 --- a/extensions/delivery/Cargo.toml +++ /dev/null @@ -1,6 +0,0 @@ -[package] -name = "delivery" -version = "0.1.0" -edition = "2024" - -[dependencies] diff --git a/extensions/delivery/src/lib.rs b/extensions/delivery/src/lib.rs deleted file mode 100644 index 2b14bd0..0000000 --- a/extensions/delivery/src/lib.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod local_bcast; - -use local_bcast::LocalBroadcast; - -pub use LocalBroadcast; diff --git a/extensions/delivery/src/local_bcast.rs b/extensions/delivery/src/local_bcast.rs deleted file mode 100644 index 20b1335..0000000 --- a/extensions/delivery/src/local_bcast.rs +++ /dev/null @@ -1,58 +0,0 @@ -use libchat::DeliveryService; - -type Callback = Box)>; - -#[derive(Clone)] -struct LocalBroadcaster { - subscribers: Arc>>>, -} - -impl LocalBroadcaster { - pub fn new() -> Self { - Self { - subscribers: Arc::new(Mutex::new(HashMap::new())), - } - } -} - -impl DeliveryService for LocalBroadcaster { - type Error = String; - - fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Self::Error> { - let callbacks = self - .subscribers - .lock() - .unwrap() - .remove(&envelope.delivery_address) - .unwrap_or_default(); - - for cb in callbacks { - cb(envelope.delivery_address.clone(), &envelope.data); - } - - Ok(()) - } - - fn subscribe(&mut self, delivery_address: String, cb: F) -> Result<(), Self::Error> - where - F: FnOnce(String, &Vec) + 'static, - { - self.subscribers - .lock() - .unwrap() - .entry(delivery_address) - .or_default() - .push(Box::new(cb)); - - Ok(()) - } -} - -#[cfg(test)] -mod tests { - - #[test] - fn local_bcast() { - let ds = LocalBroadcast::new(); - } -}