diff --git a/core/conversations/src/causal_history.rs b/core/conversations/src/causal_history.rs index 331da7d..7afc7b1 100644 --- a/core/conversations/src/causal_history.rs +++ b/core/conversations/src/causal_history.rs @@ -20,7 +20,6 @@ use std::cell::RefCell; use std::collections::{HashMap, HashSet, VecDeque}; -use std::rc::Rc; use crate::proto::{Bytes, HistoryEntry, ReliablePayload}; use crate::utils::{blake2b_hex, hash_size}; @@ -109,11 +108,11 @@ struct Inner { /// instance. /// /// Convos are rebuilt from storage on every inbound message, so this state -/// cannot live on the convo struct — it is shared through `InboxV2`, the +/// cannot live on the convo struct — it is shared through `ServiceContext`, the /// same way the MLS provider is. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Default)] pub struct CausalHistoryStore { - inner: Rc>, + inner: RefCell, } impl CausalHistoryStore { diff --git a/core/conversations/src/context.rs b/core/conversations/src/context.rs deleted file mode 100644 index 17b95f3..0000000 --- a/core/conversations/src/context.rs +++ /dev/null @@ -1,339 +0,0 @@ -use std::cell::{Ref, RefMut}; -use std::{cell::RefCell, rc::Rc}; - -use crate::account::LogosAccount; -use crate::causal_history::MissingMessage; -use crate::conversation::{Convo, GroupConvo}; - -use crate::{DeliveryService, RegistrationService}; -use crate::{ - conversation::{Id, PrivateV1Convo}, - errors::ChatError, - inbox::Inbox, - inbox_v2::InboxV2, - outcomes::{ConvoOutcome, InboxOutcome, PayloadOutcome}, - proto::{EncryptedPayload, EnvelopeV1, Message}, - types::{AccountId, AddressedEnvelope}, -}; -use crypto::{Identity, PublicKey}; -use storage::{ChatStore, ConversationKind}; - -pub use crate::conversation::ConversationId; -pub use crate::inbox::Introduction; - -// This is the main entry point to the conversations api. -// Ctx manages lifetimes of objects to process and generate payloads. -pub struct Context { - identity: Rc, - _account: Rc>, - ds: Rc>, - store: Rc>, - inbox: Inbox, - pq_inbox: InboxV2, -} - -impl Context -where - DS: DeliveryService + 'static, - RS: RegistrationService + 'static, - CS: ChatStore + 'static, -{ - /// Opens or creates a Context with the given storage configuration. - /// - /// If an identity exists in storage, it will be restored. - /// Otherwise, a new identity will be created with the given name and saved. - pub fn new_from_store( - name: impl Into, - delivery: DS, - registration: RS, - store: CS, - ) -> Result { - let name = name.into(); - - // Services for sharing with Converastions/Inboxes - let account = Rc::new(RefCell::new(LogosAccount::new_test(name.to_string()))); - let ds = Rc::new(RefCell::new(delivery)); - let contact_registry = Rc::new(RefCell::new(registration)); - let store = Rc::new(RefCell::new(store)); - - // Load or create identity - let identity = if let Some(identity) = store.borrow().load_identity()? { - identity - } else { - let identity = Identity::new(&name); - store.borrow_mut().save_identity(&identity)?; - identity - }; - - let identity = Rc::new(identity); - let inbox = Inbox::new(Rc::clone(&store), Rc::clone(&identity)); - - let pq_inbox = InboxV2::new( - LogosAccount::new_test(name), - ds.clone(), - contact_registry.clone(), - store.clone(), - ); - - // Subscribe - ds.borrow_mut() - .subscribe(&pq_inbox.delivery_address()) - .map_err(ChatError::generic)?; - - Ok(Self { - identity, - _account: account, - ds, - store, - inbox, - pq_inbox, - }) - } - - /// Creates a new in-memory Context (for testing). - /// - /// Uses in-memory SQLite database. Each call creates a new isolated database. - pub fn new_with_name( - name: impl Into, - delivery: DS, - registration: RS, - chat_store: CS, - ) -> Result { - let name = name.into(); - let identity = Identity::new(&name); - - // Services for sharing with Converastions/Inboxes - let account = Rc::new(RefCell::new(LogosAccount::new_test(name.to_string()))); - let ds = Rc::new(RefCell::new(delivery)); - let contact_registry = Rc::new(RefCell::new(registration)); - let store = Rc::new(RefCell::new(chat_store)); - - store - .borrow_mut() - .save_identity(&identity) - .expect("in-memory storage should not fail"); - - let identity = Rc::new(identity); - let inbox = Inbox::new(store.clone(), Rc::clone(&identity)); - let mut pq_inbox = InboxV2::new( - LogosAccount::new_test(name), - ds.clone(), - contact_registry.clone(), - store.clone(), - ); - - // TODO: (P2) Initialize Account in Context or upper client. - pq_inbox.register()?; - - ds.borrow_mut() - .subscribe(&pq_inbox.delivery_address()) - .map_err(ChatError::generic)?; - - Ok(Self { - identity, - _account: account, - ds, - store, - pq_inbox, - inbox, - }) - } - - pub fn ds(&self) -> RefMut<'_, DS> { - self.ds.borrow_mut() - } - - pub fn store(&self) -> Ref<'_, CS> { - self.store.borrow() - } - - pub fn identity(&self) -> &Identity { - &self.identity - } - - /// Returns the unique identifier associated with the account - pub fn account_id(&self) -> &AccountId { - self.pq_inbox.account_id() - } - - pub fn installation_name(&self) -> &str { - self.identity.get_name() - } - - pub fn installation_key(&self) -> PublicKey { - self.identity.public_key() - } - - /// Submit the local account's MLS KeyPackage to the registration service. - /// Idempotent on the server side (registries that retain history will keep - /// the most recent N submissions; older entries are pruned). - pub fn register_keypackage(&mut self) -> Result<(), ChatError> { - self.pq_inbox.register() - } - - pub fn create_private_convo( - &mut self, - remote_bundle: &Introduction, - content: &[u8], - ) -> Result<(ConversationId, Vec), ChatError> { - let (mut convo, payloads) = self - .inbox - .invite_to_private_convo(remote_bundle, content, Rc::clone(&self.store)) - .unwrap_or_else(|_| todo!("Log/Surface Error")); - - let remote_id = Inbox::::inbox_identifier_for_key(*remote_bundle.installation_key()); - let payload_bytes = payloads - .into_iter() - .map(|p| p.into_envelope(remote_id.clone())) - .collect(); - - let convo_id = convo.persist()?; - Ok((convo_id, payload_bytes)) - } - - pub fn create_group_convo( - &mut self, - participants: &[&AccountId], - ) -> Result>, ChatError> { - // TODO: (P1) Ensure errors are handled propertly. This is a high chance for desynchronized state. - // MlsGroup persistence, conversation persistence, and invite delivery all happen seperately - let mut convo = self.pq_inbox.create_group_v1()?; - self.store - .borrow_mut() - .save_conversation(&storage::ConversationMeta { - local_convo_id: convo.id().to_string(), - remote_convo_id: "0".into(), - kind: ConversationKind::GroupV1, - })?; - convo.add_member(participants)?; - - Ok(Box::new(convo)) - } - - pub fn list_conversations(&self) -> Result, ChatError> { - let records = self.store.borrow().load_conversations()?; - Ok(records.into_iter().map(|r| r.local_convo_id).collect()) - } - - pub fn take_missing_messages(&self) -> Vec { - self.pq_inbox.take_missing_messages() - } - - pub fn send_content( - &mut self, - convo_id: &str, - content: &[u8], - ) -> Result, ChatError> { - let mut convo = self.load_convo(convo_id)?; - let payloads = convo.send_message(content)?; - let remote_id = convo.remote_id(); - Ok(payloads - .into_iter() - .map(|p| p.into_envelope(remote_id.clone())) - .collect()) - } - - // Decode bytes and send to protocol for processing. - pub fn handle_payload(&mut self, payload: &[u8]) -> Result { - let env = EnvelopeV1::decode(payload)?; - - // TODO: Impl Conversation hinting - let convo_id = env.conversation_hint; - - match convo_id { - c if c == self.inbox.id() => self.dispatch_to_inbox(&env.payload).map(Into::into), - c if c == self.pq_inbox.id() => self.dispatch_to_inbox2(&env.payload).map(Into::into), - c if self.store.borrow().has_conversation(&c)? => { - self.dispatch_to_convo(&c, &env.payload).map(Into::into) - } - _ => Ok(PayloadOutcome::Empty), - } - } - - // Dispatch encrypted payload to Inbox. The Inbox persists the newly - // created conversation and consumes the ephemeral key internally. - fn dispatch_to_inbox(&mut self, enc_payload_bytes: &[u8]) -> Result { - // EncryptedPayloads are not used by GroupConvos at this time, else this can be performed in `handle_payload` - // TODO: (P1) reconcile envelope parsing between Covno and GroupConvo - let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?; - let public_key_hex = Inbox::::extract_ephemeral_key_hex(&enc_payload)?; - self.inbox - .handle_frame(enc_payload, &public_key_hex, Rc::clone(&self.store)) - } - - // Dispatch encrypted payload to the post-quantum inbox. - fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result { - self.pq_inbox.handle_frame(payload) - } - - // Dispatch encrypted payload to its corresponding conversation - fn dispatch_to_convo( - &mut self, - convo_id: &str, - enc_payload_bytes: &[u8], - ) -> Result { - let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?; - let mut convo = self.load_convo(convo_id)?; - convo.handle_frame(enc_payload) - } - - pub fn create_intro_bundle(&mut self) -> Result, ChatError> { - let intro = self.inbox.create_intro_bundle()?; - Ok(intro.into()) - } - - pub fn get_convo(&mut self, convo_id: &str) -> Result>, ChatError> { - self.load_group_convo(convo_id) - } - - /// Loads a conversation from DB by constructing it from metadata. - fn load_convo(&mut self, convo_id: &str) -> Result, ChatError> { - let record = self - .store - .borrow() - .load_conversation(convo_id)? - .ok_or_else(|| ChatError::NoConvo(convo_id.into()))?; - - match record.kind { - ConversationKind::PrivateV1 => { - let private_convo = PrivateV1Convo::new( - self.store.clone(), - record.local_convo_id, - record.remote_convo_id, - )?; - Ok(Box::new(private_convo)) - } - ConversationKind::GroupV1 => Ok(Box::new( - self.pq_inbox.load_mls_convo(record.local_convo_id)?, - )), - ConversationKind::Unknown(_) => Err(ChatError::BadBundleValue(format!( - "unsupported conversation type: {}", - record.kind.as_str() - ))), - } - } - - fn load_group_convo( - &mut self, - convo_id: &str, - ) -> Result>, ChatError> { - let record = self - .store - .borrow() - .load_conversation(convo_id)? - .ok_or_else(|| ChatError::NoConvo(convo_id.into()))?; - - match record.kind { - ConversationKind::PrivateV1 => { - Err(ChatError::NoConvo("This is not a group convo".into())) - } - ConversationKind::GroupV1 => Ok(Box::new( - self.pq_inbox.load_mls_convo(record.local_convo_id)?, - )), - ConversationKind::Unknown(_) => Err(ChatError::BadBundleValue(format!( - "unsupported conversation type: {}", - record.kind.as_str() - ))), - } - } -} diff --git a/core/conversations/src/conversation.rs b/core/conversations/src/conversation.rs index 83a869c..a6cd5d0 100644 --- a/core/conversations/src/conversation.rs +++ b/core/conversations/src/conversation.rs @@ -1,15 +1,10 @@ pub mod group_v1; mod privatev1; -use crate::{ - DeliveryService, - outcomes::ConvoOutcome, - service_traits::KeyPackageProvider, - types::{AccountId, AddressedEncryptedPayload}, -}; -use chat_proto::logoschat::encryption::EncryptedPayload; -use std::fmt::Debug; -use storage::ConversationKind; +use crate::outcomes::ConvoOutcome; +use crate::proto::EncryptedPayload; +use crate::service_context::{ExternalServices, ServiceContext}; +use crate::types::AccountId; pub use crate::errors::ChatError; pub use group_v1::GroupV1Convo; @@ -17,31 +12,28 @@ pub use privatev1::PrivateV1Convo; pub type ConversationId = String; -pub trait Id: Debug { - fn id(&self) -> &str; -} - -pub trait Convo: Id + Debug { - fn send_message(&mut self, content: &[u8]) - -> Result, ChatError>; +/// Behaviour shared by every conversation kind. +pub(crate) trait Convo { + fn send_content(&mut self, cx: &mut ServiceContext, content: &[u8]) + -> Result<(), ChatError>; /// Decrypts and processes an incoming encrypted frame. /// /// Returns the [`ConvoOutcome`] describing what the frame produced; its /// `content` is `None` for protocol-only frames (placeholders, MLS /// commits). Errors only on decryption or frame-parsing failure. - fn handle_frame(&mut self, enc_payload: EncryptedPayload) -> Result; - - fn remote_id(&self) -> String; - - /// Returns the conversation type identifier for storage. - fn convo_type(&self) -> ConversationKind; + fn handle_frame( + &mut self, + cx: &mut ServiceContext, + enc: EncryptedPayload, + ) -> Result; } -pub trait GroupConvo: Convo { - fn add_member(&mut self, members: &[&AccountId]) -> Result<(), ChatError>; - - // This is intended to replace `send_message`. The trait change is that it automatically - // sends the payload directly. - fn send_content(&mut self, content: &[u8]) -> Result<(), ChatError>; +/// Group-only operations. +pub(crate) trait GroupConvo: Convo { + fn add_member( + &mut self, + cx: &mut ServiceContext, + members: &[&AccountId], + ) -> Result<(), ChatError>; } diff --git a/core/conversations/src/conversation/group_v1.rs b/core/conversations/src/conversation/group_v1.rs index 7068024..7b06bbb 100644 --- a/core/conversations/src/conversation/group_v1.rs +++ b/core/conversations/src/conversation/group_v1.rs @@ -2,153 +2,99 @@ /// Properties: /// - Harvest Now Decrypt Later (HNDL) protection provided by XWING /// - Multiple -use std::cell::RefCell; -use std::rc::Rc; - use blake2::{Blake2b, Digest, digest::consts::U6}; use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload}; use chat_proto::logoschat::reliability::ReliablePayload; use openmls::prelude::tls_codec::Deserialize; use openmls::prelude::*; -use prost::Message; -use storage::ConversationKind; +use prost::Message as _; -use crate::IdentityProvider; -use crate::causal_history::CausalHistoryStore; -use crate::inbox_v2::{MlsIdentityProvider, MlsProvider}; +use crate::inbox_v2::MlsProvider; +use crate::service_context::{ExternalServices, ServiceContext}; use crate::types::AccountId; use crate::{ - DeliveryService, - conversation::{ChatError, Convo, GroupConvo, Id}, + DeliveryService, IdentityProvider, + conversation::{ChatError, Convo, GroupConvo}, outcomes::{Content, ConvoOutcome}, service_traits::KeyPackageProvider, types::AddressedEncryptedPayload, }; -pub struct GroupV1Convo { - identity_provider: Rc>>, - mls_provider: Rc>, - ds: Rc>, - keypkg_provider: Rc>, +pub struct GroupV1Convo { mls_group: MlsGroup, convo_id: String, - causal: CausalHistoryStore, } -impl std::fmt::Debug for GroupV1Convo -where - IP: IdentityProvider, - MP: MlsProvider, - DS: DeliveryService, - KP: KeyPackageProvider, -{ +impl std::fmt::Debug for GroupV1Convo { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("GroupV1Convo") - .field("name", &self.identity_provider.borrow().display_name()) .field("convo_id", &self.convo_id) .field("mls_epoch", &self.mls_group.epoch()) .finish_non_exhaustive() } } -impl GroupV1Convo -where - IP: IdentityProvider, - MP: MlsProvider, - DS: DeliveryService, - KP: KeyPackageProvider, -{ +impl GroupV1Convo { // Create a new conversation with the creator as the only participant. - pub fn new( - identity_provider: Rc>>, - mls_provider: Rc>, - ds: Rc>, - keypkg_provider: Rc>, - causal: CausalHistoryStore, - ) -> Result { + pub fn new(cx: &mut ServiceContext) -> Result { let config = Self::mls_create_config(); - let mls_group = { - let mls_provider_ref = mls_provider.borrow(); - let signer = identity_provider.borrow(); - let credential = signer.get_credential(); - - MlsGroup::new(&*mls_provider_ref, &*signer, &config, credential).unwrap() - }; + let mls_group = MlsGroup::new( + &cx.mls_provider, + &cx.mls_identity, + &config, + cx.mls_identity.get_credential(), + ) + .unwrap(); let convo_id = hex::encode(mls_group.group_id().as_slice()); - Self::subscribe(&mut ds.borrow_mut(), &convo_id)?; + Self::subscribe(&mut cx.ds, &convo_id)?; Ok(Self { - identity_provider, - mls_provider, - ds, - keypkg_provider, mls_group, convo_id, - causal, }) } // Constructs a new conversation upon receiving a MlsWelcome message. - pub fn new_from_welcome( - identity_provider: Rc>>, - mls_provider: Rc>, - ds: Rc>, - keypkg_provider: Rc>, - causal: CausalHistoryStore, + pub fn new_from_welcome( + cx: &mut ServiceContext, welcome: Welcome, ) -> Result { - let mls_group = { - let provider = &*mls_provider.borrow(); - StagedWelcome::build_from_welcome(provider, &Self::mls_join_config(), welcome) + let mls_group = + StagedWelcome::build_from_welcome(&cx.mls_provider, &Self::mls_join_config(), welcome) .unwrap() .build() .unwrap() - .into_group(provider) - .unwrap() - }; + .into_group(&cx.mls_provider) + .unwrap(); let convo_id = hex::encode(mls_group.group_id().as_slice()); - Self::subscribe(&mut *ds.borrow_mut(), &convo_id)?; + Self::subscribe(&mut cx.ds, &convo_id)?; Ok(Self { - identity_provider, - mls_provider, - ds, - keypkg_provider, mls_group, convo_id, - causal, }) } - pub fn load( - identity_provider: Rc>>, - mls_provider: Rc>, - ds: Rc>, - keypkg_provider: Rc>, - causal: CausalHistoryStore, + pub fn load( + cx: &mut ServiceContext, convo_id: String, group_id: GroupId, ) -> Result { - let mls_group = MlsGroup::load(mls_provider.borrow().storage(), &group_id) + let mls_group = MlsGroup::load(cx.mls_provider.storage(), &group_id) .map_err(ChatError::generic)? .ok_or_else(|| ChatError::NoConvo("mls group not found".into()))?; - Self::subscribe(&mut *ds.borrow_mut(), &convo_id)?; + Self::subscribe(&mut cx.ds, &convo_id)?; Ok(GroupV1Convo { - identity_provider, - mls_provider, - ds, - keypkg_provider, mls_group, convo_id, - causal, }) } // Configure the delivery service to listen for the required delivery addresses. - fn subscribe(ds: &mut DS, convo_id: &str) -> Result<(), ChatError> { + fn subscribe(ds: &mut impl DeliveryService, convo_id: &str) -> Result<(), ChatError> { ds.subscribe(&Self::delivery_address_from_id(convo_id)) .map_err(ChatError::generic)?; ds.subscribe(&Self::ctrl_delivery_address_from_id(convo_id)) @@ -192,66 +138,46 @@ where Self::ctrl_delivery_address_from_id(&self.convo_id) } - fn key_package_for_account(&self, ident: &AccountId) -> Result { + fn key_package_for_account( + &self, + ident: &AccountId, + provider: &impl MlsProvider, + keypkg_provider: &impl KeyPackageProvider, + ) -> Result { // INTERIM: the key package registry is keyed by `DeviceId`, but resolving an // `AccountId` to its device(s) is a future task. For now (single device // per account) we use the account-id string directly as the device id. // When account->device resolution lands, only this conversion changes. let device_id = ident.to_string(); - let retrieved_bytes = self - .keypkg_provider - .borrow() + let retrieved_bytes = keypkg_provider .retrieve(&device_id) - .map_err(|e: KP::Error| ChatError::Generic(e.to_string()))?; + .map_err(|e| ChatError::Generic(e.to_string()))?; - // dbg!(ctx.contact_registry()); let Some(keypkg_bytes) = retrieved_bytes else { return Err(ChatError::Protocol("Contact Not Found".into())); }; let key_package_in = KeyPackageIn::tls_deserialize(&mut keypkg_bytes.as_slice())?; - let keypkg = - key_package_in.validate(self.mls_provider.borrow().crypto(), ProtocolVersion::Mls10)?; //TODO: P3 - Hardcoded Protocol Version - Ok(keypkg) + let key_package = key_package_in.validate(provider.crypto(), ProtocolVersion::Mls10)?; //TODO: P3 - Hardcoded Protocol Version + Ok(key_package) } -} -impl Id for GroupV1Convo -where - IP: IdentityProvider, - MP: MlsProvider, - DS: DeliveryService, - KP: KeyPackageProvider, -{ - fn id(&self) -> &str { + pub fn id(&self) -> &str { &self.convo_id } -} -impl Convo for GroupV1Convo -where - IP: IdentityProvider, - MP: MlsProvider, - DS: DeliveryService, - KP: KeyPackageProvider, -{ - fn send_message( + fn send_message( &mut self, content: &[u8], + cx: &ServiceContext, ) -> Result, ChatError> { - let sender_id = self.identity_provider.borrow(); - let reliable = - self.causal - .on_send(&self.convo_id, sender_id.account_id().as_str(), content); + let sender_id = cx.mls_identity.account_id().as_str(); + let reliable = cx.causal.on_send(&self.convo_id, sender_id, content); let wire = reliable.encode_to_vec(); let mls_message_out = self .mls_group - .create_message( - &*self.mls_provider.borrow(), - &*self.identity_provider.borrow(), - &wire, - ) + .create_message(&cx.mls_provider, &cx.mls_identity, &wire) .unwrap(); let a = AddressedEncryptedPayload { @@ -265,9 +191,26 @@ where Ok(vec![a]) } +} + +impl Convo for GroupV1Convo { + fn send_content( + &mut self, + cx: &mut ServiceContext, + content: &[u8], + ) -> Result<(), ChatError> { + let payloads = self.send_message(content, cx)?; + for payload in payloads { + cx.ds + .publish(payload.into_envelope(self.id().into())) + .map_err(|e| ChatError::Delivery(e.to_string()))?; + } + Ok(()) + } fn handle_frame( &mut self, + cx: &mut ServiceContext, encoded_payload: EncryptedPayload, ) -> Result { let bytes = match encoded_payload.encryption { @@ -287,8 +230,6 @@ where .try_into_protocol_message() .map_err(ChatError::generic)?; - let provider = &*self.mls_provider.borrow(); - if protocol_message.epoch() < self.mls_group.epoch() { // TODO: (P1) Add logging for messages arriving from past epoch. return Ok(ConvoOutcome::empty(self.id().to_string())); @@ -296,20 +237,20 @@ where let processed = self .mls_group - .process_message(provider, protocol_message) + .process_message(&cx.mls_provider, protocol_message) .map_err(ChatError::generic)?; let content = match processed.into_content() { ProcessedMessageContent::ApplicationMessage(msg) => { let reliable = ReliablePayload::decode(msg.into_bytes().as_slice())?; - self.causal.on_receive(&self.convo_id, &reliable); + cx.causal.on_receive(&self.convo_id, &reliable); Some(Content { bytes: reliable.content.to_vec(), }) } ProcessedMessageContent::StagedCommitMessage(commit) => { self.mls_group - .merge_staged_commit(provider, *commit) + .merge_staged_commit(&cx.mls_provider, *commit) .map_err(ChatError::generic)?; None } @@ -323,32 +264,18 @@ where content, }) } - - fn remote_id(&self) -> String { - // "group_remote_id".into() - todo!() - } - - fn convo_type(&self) -> storage::ConversationKind { - ConversationKind::GroupV1 - } } -impl GroupConvo for GroupV1Convo -where - IP: IdentityProvider, - MP: MlsProvider, - DS: DeliveryService, - KP: KeyPackageProvider, -{ +impl GroupConvo for GroupV1Convo { // add_members returns: // commit — the Commit message Alice broadcasts to all members // welcome — the Welcome message sent privately to each new joiner // _group_info — used for external joins; ignore for now - fn add_member(&mut self, members: &[&AccountId]) -> Result<(), ChatError> { - let identity_provider = &*self.identity_provider.borrow(); - let mls_provider = &*self.mls_provider.borrow(); - + fn add_member( + &mut self, + cx: &mut ServiceContext, + members: &[&AccountId], + ) -> Result<(), ChatError> { if members.len() > 50 { // This is a temporary limit that originates from the the De-MLS epoch time. return Err(ChatError::Protocol( @@ -360,23 +287,26 @@ where // The account_id is kept so invites can be addressed properly let keypkgs = members .iter() - .map(|ident| self.key_package_for_account(ident)) + .map(|ident| self.key_package_for_account(ident, &cx.mls_provider, &cx.registry)) .collect::, ChatError>>()?; let (commit, welcome, _group_info) = self .mls_group - .add_members(mls_provider, identity_provider, keypkgs.iter().as_slice()) + .add_members( + &cx.mls_provider, + &cx.mls_identity, + keypkgs.iter().as_slice(), + ) .unwrap(); - self.mls_group.merge_pending_commit(mls_provider).unwrap(); + self.mls_group + .merge_pending_commit(&cx.mls_provider) + .unwrap(); // TODO: (P3) Evaluate privacy/performance implications of an aggregated Welcome for multiple users for account_id in members { - self.mls_provider.borrow().invite_user( - &mut *self.ds.borrow_mut(), - account_id, - &welcome, - )?; + cx.mls_provider + .invite_user(&mut cx.ds, account_id, &welcome)?; } let encrypted_payload = EncryptedPayload { @@ -393,20 +323,8 @@ where // TODO: (P1) Make GroupConvos agnostic to framing so its less error prone and more let env = addr_enc_payload.into_envelope(self.convo_id.clone()); - self.ds - .borrow_mut() + cx.ds .publish(env) .map_err(|e| ChatError::Generic(format!("Publish: {e}"))) } - - fn send_content(&mut self, content: &[u8]) -> Result<(), ChatError> { - let payloads = self.send_message(content)?; - for payload in payloads { - self.ds - .borrow_mut() - .publish(payload.into_envelope(self.id().into())) - .map_err(|e| ChatError::Delivery(e.to_string()))?; - } - Ok(()) - } } diff --git a/core/conversations/src/conversation/privatev1.rs b/core/conversations/src/conversation/privatev1.rs index f631df6..f946adb 100644 --- a/core/conversations/src/conversation/privatev1.rs +++ b/core/conversations/src/conversation/privatev1.rs @@ -9,14 +9,17 @@ use chat_proto::logoschat::{ use crypto::{PrivateKey, PublicKey, SymmetricKey32}; use double_ratchets::{Header, InstallationKeyPair, RatchetState, restore_ratchet_state}; use prost::{Message as _, bytes::Bytes}; -use std::{cell::RefCell, fmt::Debug, rc::Rc}; +use std::fmt::Debug; use storage::{ConversationKind, ConversationMeta, ConversationStore}; use crate::{ - conversation::{ChatError, ConversationId, Convo, Id}, + DeliveryService, + conversation::{ChatError, ConversationId, Convo}, errors::EncryptionError, + inbox::PRIVATE_V1_INBOX_ADDRESS, outcomes::{Content, ConvoOutcome}, proto, + service_context::{ExternalServices, ServiceContext}, types::AddressedEncryptedPayload, utils::timestamp_millis, }; @@ -56,37 +59,31 @@ impl BaseConvoId { } } -pub struct PrivateV1Convo { +pub struct PrivateV1Convo { local_convo_id: String, remote_convo_id: String, dr_state: RatchetState, - store: Rc>, } -impl PrivateV1Convo { +impl PrivateV1Convo { /// Reconstructs a PrivateV1Convo from persisted metadata and ratchet state. - pub fn new( - store: Rc>, + pub fn new( + store: &S, local_convo_id: String, remote_convo_id: String, ) -> Result { - let dr_record = store.borrow().load_ratchet_state(&local_convo_id)?; - let skipped_keys = store.borrow().load_skipped_keys(&local_convo_id)?; + let dr_record = store.load_ratchet_state(&local_convo_id)?; + let skipped_keys = store.load_skipped_keys(&local_convo_id)?; let dr_state: RatchetState = restore_ratchet_state(dr_record, skipped_keys); Ok(Self { local_convo_id, remote_convo_id, dr_state, - store, }) } - pub fn new_initiator( - store: Rc>, - seed_key: SymmetricKey32, - remote: PublicKey, - ) -> Self { + pub fn new_initiator(seed_key: SymmetricKey32, remote: PublicKey) -> Self { let base_convo_id = BaseConvoId::new(&seed_key); let local_convo_id = base_convo_id.id_for_participant(Role::Initiator); let remote_convo_id = base_convo_id.id_for_participant(Role::Responder); @@ -101,15 +98,10 @@ impl PrivateV1Convo { local_convo_id, remote_convo_id, dr_state, - store, } } - pub fn new_responder( - store: Rc>, - seed_key: SymmetricKey32, - dh_self: &PrivateKey, - ) -> Self { + pub fn new_responder(seed_key: SymmetricKey32, dh_self: &PrivateKey) -> Self { let base_convo_id = BaseConvoId::new(&seed_key); let local_convo_id = base_convo_id.id_for_participant(Role::Responder); let remote_convo_id = base_convo_id.id_for_participant(Role::Initiator); @@ -125,7 +117,6 @@ impl PrivateV1Convo { local_convo_id, remote_convo_id, dr_state, - store, } } @@ -182,14 +173,17 @@ impl PrivateV1Convo { } /// Persists a conversation's metadata and ratchet state to DB. - pub fn persist(&mut self) -> Result { + pub fn persist( + &mut self, + store: &mut S, + ) -> Result { let convo_info = ConversationMeta { local_convo_id: self.id().to_string(), remote_convo_id: self.remote_id(), kind: self.convo_type(), }; - self.store.borrow_mut().save_conversation(&convo_info)?; - self.save_ratchet_state(&mut *self.store.borrow_mut())?; + store.save_conversation(&convo_info)?; + self.save_ratchet_state(store)?; Ok(self.id().to_string()) } @@ -205,18 +199,15 @@ impl PrivateV1Convo { bytes: bytes.into(), } } -} -impl Id for PrivateV1Convo { - fn id(&self) -> &str { + pub fn id(&self) -> &str { &self.local_convo_id } -} -impl Convo for PrivateV1Convo { - fn send_message( + pub fn encrypt_content( &mut self, content: &[u8], + store: &mut S, ) -> Result, ChatError> { let frame = PrivateV1Frame { conversation_id: self.id().into(), @@ -227,28 +218,53 @@ impl Convo for PrivateV1Convo { let data = self.encrypt(frame); - self.save_ratchet_state::(&mut *self.store.borrow_mut())?; + self.save_ratchet_state(store)?; Ok(vec![AddressedEncryptedPayload { - delivery_address: "delivery_address".into(), + delivery_address: PRIVATE_V1_INBOX_ADDRESS.into(), data, }]) } + pub fn remote_id(&self) -> String { + self.remote_convo_id.clone() + } + + pub fn convo_type(&self) -> ConversationKind { + ConversationKind::PrivateV1 + } +} + +impl Convo for PrivateV1Convo { + fn send_content( + &mut self, + cx: &mut ServiceContext, + content: &[u8], + ) -> Result<(), ChatError> { + let payloads = self.encrypt_content(content, &mut cx.store)?; + let remote_id = self.remote_id(); + for payload in payloads { + cx.ds + .publish(payload.into_envelope(remote_id.clone())) + .map_err(|e| ChatError::Delivery(e.to_string()))?; + } + Ok(()) + } + fn handle_frame( &mut self, - encoded_payload: EncryptedPayload, + cx: &mut ServiceContext, + enc: EncryptedPayload, ) -> Result { - // Extract expected frame let frame = self - .decrypt(encoded_payload) + .decrypt(enc) .map_err(|_| ChatError::Protocol("decryption".into()))?; let Some(frame_type) = frame.frame_type else { return Err(ChatError::ProtocolExpectation("None", "Some".into())); }; - self.save_ratchet_state(&mut *self.store.borrow_mut())?; + self.save_ratchet_state(&mut cx.store)?; let content = match frame_type { FrameType::Content(bytes) => Some(self.handle_content(bytes)), @@ -259,17 +275,9 @@ impl Convo for PrivateV1Convo { content, }) } - - fn remote_id(&self) -> String { - self.remote_convo_id.clone() - } - - fn convo_type(&self) -> ConversationKind { - ConversationKind::PrivateV1 - } } -impl Debug for PrivateV1Convo { +impl Debug for PrivateV1Convo { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PrivateV1Convo") .field("dr_state", &"******") @@ -279,7 +287,6 @@ impl Debug for PrivateV1Convo { #[cfg(test)] mod tests { - use chat_sqlite::{ChatStorage, StorageConfig}; use crypto::PrivateKey; use super::*; @@ -289,22 +296,14 @@ mod tests { let saro = PrivateKey::random(); let raya = PrivateKey::random(); - let saro_storage = Rc::new(RefCell::new( - ChatStorage::new(StorageConfig::InMemory).unwrap(), - )); - - let raya_storage = Rc::new(RefCell::new( - ChatStorage::new(StorageConfig::InMemory).unwrap(), - )); - let pub_raya = PublicKey::from(&raya); let seed_key = saro.diffie_hellman(&pub_raya).DANGER_to_bytes(); let seed_key_saro = SymmetricKey32::from(seed_key); let seed_key_raya = SymmetricKey32::from(seed_key); let send_content_bytes = vec![0, 2, 4, 6, 8]; - let mut sr_convo = PrivateV1Convo::new_initiator(saro_storage, seed_key_saro, pub_raya); - let mut rs_convo = PrivateV1Convo::new_responder(raya_storage, seed_key_raya, &raya); + let mut sr_convo = PrivateV1Convo::new_initiator(seed_key_saro, pub_raya); + let mut rs_convo = PrivateV1Convo::new_responder(seed_key_raya, &raya); let send_frame = PrivateV1Frame { conversation_id: "_".into(), diff --git a/core/conversations/src/core.rs b/core/conversations/src/core.rs new file mode 100644 index 0000000..b3a987e --- /dev/null +++ b/core/conversations/src/core.rs @@ -0,0 +1,321 @@ +use crate::account::LogosAccount; +use crate::causal_history::{CausalHistoryStore, MissingMessage}; +use crate::service_context::{ExternalServices, ServiceContext}; +use crate::{DeliveryService, IdentityProvider, RegistrationService}; +use crate::{ + conversation::{Convo, GroupConvo, GroupV1Convo, PrivateV1Convo}, + errors::ChatError, + inbox::Inbox, + inbox_v2::{InboxV2, MlsEphemeralPqProvider, MlsIdentityProvider}, + outcomes::{ConvoOutcome, InboxOutcome, PayloadOutcome}, + proto::{EncryptedPayload, EnvelopeV1, Message}, + types::AccountId, +}; +use crypto::{Identity, PublicKey}; +use openmls::prelude::GroupId; +use storage::{ChatStore, ConversationKind, ConversationStore}; + +pub use crate::conversation::ConversationId; +pub use crate::inbox::Introduction; + +// This is the main entry point to the conversations api. +// `Core` manages lifetimes of objects to process and generate payloads. +// +// Fully synchronous and single-threaded: it owns its services outright (no +// interior mutability, no shared ownership) and drives the inbox/conversation +// primitives with plain `&mut self`. +pub struct Core { + services: ServiceContext, + inbox: Inbox, + pq_inbox: InboxV2, +} + +// Constructors live on the `(DS, RS, CS)` form: `S` can't be inferred backwards +// through `S::DS`, so the bundle is built from the three args here. +impl Core<(DS, RS, CS)> +where + DS: DeliveryService + 'static, + RS: RegistrationService + 'static, + CS: ChatStore + 'static, +{ + /// Opens or creates a `Core` with the given storage configuration. + /// + /// If an identity exists in storage, it will be restored. + /// Otherwise, a new identity will be created with the given name and saved. + pub fn new_from_store( + name: impl Into, + delivery: DS, + registration: RS, + mut store: CS, + ) -> Result { + let name = name.into(); + + // Load or create identity + let identity = if let Some(identity) = store.load_identity()? { + identity + } else { + let identity = Identity::new(&name); + store.save_identity(&identity)?; + identity + }; + + Self::assemble(name, identity, delivery, registration, store) + } + + /// Creates a new in-memory `Core` (for testing). + /// + /// Uses in-memory SQLite database. Each call creates a new isolated database. + pub fn new_with_name( + name: impl Into, + delivery: DS, + registration: RS, + mut store: CS, + ) -> Result { + let name = name.into(); + let identity = Identity::new(&name); + store + .save_identity(&identity) + .expect("in-memory storage should not fail"); + + let mut core = Self::assemble(name, identity, delivery, registration, store)?; + // TODO: (P2) Initialize Account in Core or upper client. + core.register_keypackage()?; + Ok(core) + } + + /// Builds the inbox/account/MLS/causal state, subscribes both inbound + /// addresses, and assembles the service bundle — shared by both constructors. + fn assemble( + name: String, + identity: Identity, + mut delivery: DS, + registration: RS, + store: CS, + ) -> Result { + let inbox = Inbox::new(&identity); + let account = LogosAccount::new_test(name); + let account_id = account.account_id().clone(); + let mls_identity = MlsIdentityProvider::new(account); + let mls_provider = MlsEphemeralPqProvider::new().map_err(ChatError::generic)?; + let causal = CausalHistoryStore::new(); + let pq_inbox = InboxV2::new(account_id); + + // Subscribe to inbound addresses for both conversation stacks. + delivery + .subscribe(inbox.delivery_address()) + .map_err(ChatError::generic)?; + delivery + .subscribe(&pq_inbox.delivery_address()) + .map_err(ChatError::generic)?; + + Ok(Self { + services: ServiceContext { + ds: delivery, + registry: registration, + store, + mls_identity, + mls_provider, + causal, + identity, + }, + inbox, + pq_inbox, + }) + } +} + +impl Core { + pub fn ds(&mut self) -> &mut S::DS { + &mut self.services.ds + } + + pub fn store(&self) -> &S::CS { + &self.services.store + } + + pub fn identity(&self) -> &Identity { + &self.services.identity + } + + /// Returns the unique identifier associated with the account + pub fn account_id(&self) -> &AccountId { + self.pq_inbox.account_id() + } + + /// Submit the local account's MLS KeyPackage to the registration service. + /// Idempotent on the server side (registries that retain history will keep + /// the most recent N submissions; older entries are pruned). + pub fn register_keypackage(&mut self) -> Result<(), ChatError> { + self.pq_inbox.register(&mut self.services) + } + + pub fn installation_name(&self) -> &str { + self.services.identity.get_name() + } + + pub fn installation_key(&self) -> PublicKey { + self.services.identity.public_key() + } + + pub fn create_private_convo( + &mut self, + remote_bundle: &Introduction, + content: &[u8], + ) -> Result { + let (mut convo, payloads) = + self.inbox + .invite_to_private_convo(&mut self.services, remote_bundle, content)?; + + let remote_id = Inbox::inbox_identifier_for_key(*remote_bundle.installation_key()); + let convo_id = convo.persist(&mut self.services.store)?; + for payload in payloads { + self.services + .ds + .publish(payload.into_envelope(remote_id.clone())) + .map_err(|e| ChatError::Delivery(e.to_string()))?; + } + Ok(convo_id) + } + + pub fn create_group_convo( + &mut self, + participants: &[&AccountId], + ) -> Result { + // TODO: (P1) Ensure errors are handled properly. This is a high chance for + // desynchronized state: MlsGroup persistence, conversation persistence, and + // invite delivery all happen separately. + let mut convo = GroupV1Convo::new(&mut self.services)?; + self.services + .store + .save_conversation(&storage::ConversationMeta { + local_convo_id: convo.id().to_string(), + remote_convo_id: "0".into(), + kind: ConversationKind::GroupV1, + })?; + convo.add_member(&mut self.services, participants)?; + Ok(convo.id().to_string()) + } + + /// Add members to an existing group conversation. + pub fn group_add_member( + &mut self, + convo_id: &str, + members: &[&AccountId], + ) -> Result<(), ChatError> { + let mut convo = self.load_group_convo(convo_id)?; + convo.add_member(&mut self.services, members) + } + + pub fn list_conversations(&self) -> Result, ChatError> { + let records = self.services.store.load_conversations()?; + Ok(records.into_iter().map(|r| r.local_convo_id).collect()) + } + + pub fn take_missing_messages(&self) -> Vec { + self.services.causal.take_missing() + } + + /// Encrypt and publish `content` to an existing conversation. + pub fn send_content(&mut self, convo_id: &str, content: &[u8]) -> Result<(), ChatError> { + let mut convo = self.load_convo(convo_id)?; + convo.send_content(&mut self.services, content) + } + + // Decode bytes and send to protocol for processing. + pub fn handle_payload(&mut self, payload: &[u8]) -> Result { + let env = EnvelopeV1::decode(payload)?; + + // TODO: Impl Conversation hinting + let convo_id = env.conversation_hint; + + match convo_id { + c if c == self.inbox.id() => self.dispatch_to_inbox(&env.payload).map(Into::into), + c if c == self.pq_inbox.id() => self.dispatch_to_inbox2(&env.payload).map(Into::into), + c if self.services.store.has_conversation(&c)? => { + self.dispatch_to_convo(&c, &env.payload).map(Into::into) + } + _ => Ok(PayloadOutcome::Empty), + } + } + + // Dispatch encrypted payload to Inbox. The Inbox persists the newly + // created conversation and consumes the ephemeral key internally. + fn dispatch_to_inbox(&mut self, enc_payload_bytes: &[u8]) -> Result { + // EncryptedPayloads are not used by GroupConvos at this time, else this can be performed in `handle_payload` + // TODO: (P1) reconcile envelope parsing between Covno and GroupConvo + let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?; + let public_key_hex = Inbox::extract_ephemeral_key_hex(&enc_payload)?; + self.inbox + .handle_frame(&mut self.services, enc_payload, &public_key_hex) + } + + // Dispatch encrypted payload to the post-quantum inbox. + fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result { + self.pq_inbox.handle_frame(payload, &mut self.services) + } + + // Dispatch encrypted payload to its corresponding conversation. + fn dispatch_to_convo( + &mut self, + convo_id: &str, + enc_payload_bytes: &[u8], + ) -> Result { + let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?; + let mut convo = self.load_convo(convo_id)?; + convo.handle_frame(&mut self.services, enc_payload) + } + + /// Rebuilds a conversation from storage — the one site that branches on + /// `ConversationKind`. + fn load_convo(&mut self, convo_id: &str) -> Result>, ChatError> { + let record = self.load_conversation_meta(convo_id)?; + Ok(match record.kind { + ConversationKind::PrivateV1 => Box::new(PrivateV1Convo::new( + &self.services.store, + record.local_convo_id, + record.remote_convo_id, + )?), + ConversationKind::GroupV1 => Box::new(self.load_mls_convo(&record.local_convo_id)?), + ConversationKind::Unknown(_) => { + return Err(ChatError::UnsupportedConvoType(record.kind.as_str().into())); + } + }) + } + + /// Rebuilds a group conversation; errors if `convo_id` names a non-group. + fn load_group_convo(&mut self, convo_id: &str) -> Result>, ChatError> { + let record = self.load_conversation_meta(convo_id)?; + match record.kind { + ConversationKind::GroupV1 => Ok(Box::new(self.load_mls_convo(&record.local_convo_id)?)), + ConversationKind::PrivateV1 => { + Err(ChatError::NoConvo("this is not a group convo".into())) + } + ConversationKind::Unknown(_) => { + Err(ChatError::UnsupportedConvoType(record.kind.as_str().into())) + } + } + } + + /// Rebuilds a group conversation from storage so an operation can run against it. + fn load_mls_convo(&mut self, convo_id: &str) -> Result { + let group_id_bytes = hex::decode(convo_id).map_err(ChatError::generic)?; + let group_id = GroupId::from_slice(&group_id_bytes); + GroupV1Convo::load(&mut self.services, convo_id.to_string(), group_id) + } + + pub fn create_intro_bundle(&mut self) -> Result, ChatError> { + let intro = self.inbox.create_intro_bundle(&mut self.services)?; + Ok(intro.into()) + } + + /// Loads a conversation's metadata from storage. + fn load_conversation_meta( + &self, + convo_id: &str, + ) -> Result { + self.services + .store + .load_conversation(convo_id)? + .ok_or_else(|| ChatError::NoConvo(convo_id.into())) + } +} diff --git a/core/conversations/src/inbox.rs b/core/conversations/src/inbox.rs index 7415fdb..1ea4073 100644 --- a/core/conversations/src/inbox.rs +++ b/core/conversations/src/inbox.rs @@ -2,5 +2,5 @@ mod handler; mod handshake; mod introduction; -pub use handler::Inbox; +pub use handler::{Inbox, PRIVATE_V1_INBOX_ADDRESS}; pub use introduction::Introduction; diff --git a/core/conversations/src/inbox/handler.rs b/core/conversations/src/inbox/handler.rs index a19257b..359264f 100644 --- a/core/conversations/src/inbox/handler.rs +++ b/core/conversations/src/inbox/handler.rs @@ -3,74 +3,70 @@ use chat_proto::logoschat::encryption::EncryptedPayload; use prost::Message; use prost::bytes::Bytes; use rand_core::OsRng; -use std::cell::RefCell; -use std::rc::Rc; -use storage::{ConversationStore, EphemeralKeyStore, RatchetStore}; +use storage::EphemeralKeyStore; use crypto::{PrekeyBundle, SymmetricKey32}; -use crate::context::Introduction; -use crate::conversation::{ChatError, Convo, Id, PrivateV1Convo}; +use crate::conversation::{ChatError, Convo, PrivateV1Convo}; use crate::crypto::{CopyBytes, PrivateKey, PublicKey}; +use crate::inbox::Introduction; use crate::inbox::handshake::InboxHandshake; use crate::outcomes::{ConversationClass, InboxOutcome, NewConversation}; use crate::proto; +use crate::service_context::{ExternalServices, ServiceContext}; use crate::types::AddressedEncryptedPayload; use crypto::Identity; +/// Transport address shared by all PrivateV1 inbox traffic. +pub const PRIVATE_V1_INBOX_ADDRESS: &str = "delivery_address"; + /// Compute the deterministic Delivery_address for an installation fn delivery_address_for_installation(_: PublicKey) -> String { // TODO: Implement Delivery Address - "delivery_address".into() + PRIVATE_V1_INBOX_ADDRESS.into() } -pub struct Inbox { - ident: Rc, +pub struct Inbox { local_convo_id: String, - store: Rc>, } -impl std::fmt::Debug for Inbox { +impl std::fmt::Debug for Inbox { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Inbox") - .field("ident", &self.ident) .field("convo_id", &self.local_convo_id) .finish() } } -impl Inbox { - pub fn new(store: Rc>, ident: Rc) -> Self { +impl Inbox { + pub fn new(ident: &Identity) -> Self { let local_convo_id = Self::inbox_identifier_for_key(ident.public_key()); - Self { - ident, - local_convo_id, - store, - } + Self { local_convo_id } } /// Creates an intro bundle and returns the Introduction along with the /// generated ephemeral key pair (public_key_hex, private_key) for the caller to persist. - pub fn create_intro_bundle(&self) -> Result { + pub fn create_intro_bundle( + &self, + cx: &mut ServiceContext, + ) -> Result { let ephemeral = PrivateKey::random(); let ephemeral_key: PublicKey = (&ephemeral).into(); let public_key_hex = hex::encode(ephemeral_key.as_bytes()); - self.store - .borrow_mut() - .save_ephemeral_key(&public_key_hex, &ephemeral)?; + cx.store.save_ephemeral_key(&public_key_hex, &ephemeral)?; - let intro = Introduction::new(self.ident.secret(), ephemeral_key, OsRng); + let intro = Introduction::new(cx.identity.secret(), ephemeral_key, OsRng); Ok(intro) } - pub fn invite_to_private_convo( + pub fn invite_to_private_convo( &self, + cx: &mut ServiceContext, remote_bundle: &Introduction, initial_message: &[u8], - private_store: Rc>, - ) -> Result<(PrivateV1Convo, Vec), ChatError> { + ) -> Result<(PrivateV1Convo, Vec), ChatError> { let mut rng = OsRng; let pkb = PrekeyBundle { @@ -81,12 +77,11 @@ impl Inbox { }; let (seed_key, ephemeral_pub) = - InboxHandshake::perform_as_initiator(self.ident.secret(), &pkb, &mut rng); + InboxHandshake::perform_as_initiator(cx.identity.secret(), &pkb, &mut rng); - let mut convo = - PrivateV1Convo::new_initiator(private_store, seed_key, *remote_bundle.ephemeral_key()); + let mut convo = PrivateV1Convo::new_initiator(seed_key, *remote_bundle.ephemeral_key()); - let mut payloads = convo.send_message(initial_message)?; + let mut payloads = convo.encrypt_content(initial_message, &mut cx.store)?; // Wrap First payload in Invite if let Some(first_message) = payloads.get_mut(0) { @@ -97,7 +92,7 @@ impl Inbox { let ciphertext = frame.encode_to_vec(); let header = proto::InboxHeaderV1 { - initiator_static: self.ident.public_key().copy_to_bytes(), + initiator_static: cx.identity.public_key().copy_to_bytes(), initiator_ephemeral: ephemeral_pub.copy_to_bytes(), responder_static: remote_bundle.installation_key().copy_to_bytes(), responder_ephemeral: remote_bundle.ephemeral_key().copy_to_bytes(), @@ -126,15 +121,14 @@ impl Inbox { /// [`InboxOutcome`] describing what was observed — for a successful /// invite, a `new_conversation` and the initial `ConvoOutcome` carrying /// the first message. - pub fn handle_frame( + pub fn handle_frame( &self, + cx: &mut ServiceContext, enc_payload: EncryptedPayload, public_key_hex: &str, - private_store: Rc>, ) -> Result { - let ephemeral_key = self + let ephemeral_key = cx .store - .borrow() .load_ephemeral_key(public_key_hex)? .ok_or(ChatError::UnknownEphemeralKey())?; @@ -146,18 +140,17 @@ impl Inbox { // Perform handshake and decrypt frame let (seed_key, frame) = - self.perform_handshake(&ephemeral_key, header, handshake.payload)?; + self.perform_handshake(&cx.identity, &ephemeral_key, header, handshake.payload)?; let result = match frame.frame_type.unwrap() { proto::inbox_v1_frame::FrameType::InvitePrivateV1(_invite_private_v1) => { - let mut convo = - PrivateV1Convo::new_responder(private_store, seed_key, &ephemeral_key); + let mut convo = PrivateV1Convo::new_responder(seed_key, &ephemeral_key); let Some(enc_payload) = _invite_private_v1.initial_message else { return Err(ChatError::Protocol("missing initial encpayload".into())); }; - let initial = convo.handle_frame(enc_payload)?; + let initial = convo.handle_frame(cx, enc_payload)?; if initial.content.is_none() { return Err(ChatError::Protocol( "expected initial message in invite".into(), @@ -168,7 +161,7 @@ impl Inbox { convo_id: initial.convo_id.clone(), class: ConversationClass::Private, }; - convo.persist()?; + convo.persist(&mut cx.store)?; InboxOutcome { new_conversation, @@ -177,9 +170,7 @@ impl Inbox { } }; - self.store - .borrow_mut() - .remove_ephemeral_key(public_key_hex)?; + cx.store.remove_ephemeral_key(public_key_hex)?; Ok(result) } @@ -215,6 +206,7 @@ impl Inbox { fn perform_handshake( &self, + ident: &Identity, ephemeral_key: &PrivateKey, header: proto::InboxHeaderV1, bytes: Bytes, @@ -231,7 +223,7 @@ impl Inbox { ); let seed_key = InboxHandshake::perform_as_responder( - self.ident.secret(), + ident.secret(), ephemeral_key, None, &initator_static, @@ -265,46 +257,43 @@ impl Inbox { // TODO: Implement ID according to spec hex::encode(Blake2b512::digest(pubkey)) } -} -impl Id for Inbox { - fn id(&self) -> &str { + pub fn id(&self) -> &str { &self.local_convo_id } + + /// Transport address this inbox receives PrivateV1 traffic on. + pub fn delivery_address(&self) -> &str { + PRIVATE_V1_INBOX_ADDRESS + } } #[cfg(test)] mod tests { - use std::cell::RefCell; - use super::*; use chat_sqlite::{ChatStorage, StorageConfig}; #[test] fn test_invite_privatev1_roundtrip() { - let saro_storage = Rc::new(RefCell::new( - ChatStorage::new(StorageConfig::InMemory).unwrap(), - )); - let raya_storage = Rc::new(RefCell::new( - ChatStorage::new(StorageConfig::InMemory).unwrap(), - )); + let saro_storage = ChatStorage::new(StorageConfig::InMemory).unwrap(); + let raya_storage = ChatStorage::new(StorageConfig::InMemory).unwrap(); - let saro_ident = Identity::new("saro"); - let saro_inbox = Inbox::new(Rc::clone(&saro_storage), saro_ident.into()); + let mut saro_cx = ServiceContext::for_test("saro", saro_storage).unwrap(); + let saro_inbox = Inbox::new(&saro_cx.identity); - let raya_ident = Identity::new("raya"); - let raya_inbox = Inbox::new(Rc::clone(&raya_storage), raya_ident.into()); + let mut raya_cx = ServiceContext::for_test("raya", raya_storage).unwrap(); + let raya_inbox = Inbox::new(&raya_cx.identity); - let bundle = raya_inbox.create_intro_bundle().unwrap(); + let bundle = raya_inbox.create_intro_bundle(&mut raya_cx).unwrap(); let (_, mut payloads) = saro_inbox - .invite_to_private_convo(&bundle, "hello".as_bytes(), saro_storage) + .invite_to_private_convo(&mut saro_cx, &bundle, "hello".as_bytes()) .unwrap(); let payload = payloads.remove(0); - let key_hex = Inbox::::extract_ephemeral_key_hex(&payload.data).unwrap(); + let key_hex = Inbox::extract_ephemeral_key_hex(&payload.data).unwrap(); - let result = raya_inbox.handle_frame(payload.data, &key_hex, raya_storage); + let result = raya_inbox.handle_frame(&mut raya_cx, payload.data, &key_hex); assert!( result.is_ok(), diff --git a/core/conversations/src/inbox_v2.rs b/core/conversations/src/inbox_v2.rs index 023c8c9..de63ad9 100644 --- a/core/conversations/src/inbox_v2.rs +++ b/core/conversations/src/inbox_v2.rs @@ -2,34 +2,25 @@ mod identity; mod mls_provider; pub use identity::MlsIdentityProvider; - -use std::cell::RefCell; -use std::rc::Rc; +pub(crate) use mls_provider::MlsEphemeralPqProvider; use chat_proto::logoschat::envelope::EnvelopeV1; use openmls::prelude::tls_codec::Serialize; use openmls::prelude::*; use prost::{Message, Oneof}; -use storage::ChatStore; -use storage::ConversationKind; -use storage::ConversationMeta; +use storage::{ConversationKind, ConversationMeta, ConversationStore}; use crate::AddressedEnvelope; use crate::ChatError; use crate::DeliveryService; -use crate::IdentityProvider; use crate::RegistrationService; -use crate::causal_history::CausalHistoryStore; -use crate::causal_history::MissingMessage; - -// use crate::GroupConvo; -use crate::conversation::{ConversationId, GroupConvo, GroupV1Convo, Id}; +use crate::conversation::ConversationId; +use crate::conversation::GroupV1Convo; use crate::outcomes::{ConversationClass, InboxOutcome, NewConversation}; +use crate::service_context::{ExternalServices, ServiceContext}; use crate::types::AccountId; use crate::utils::{blake2b_hex, hash_size}; -use mls_provider::MlsEphemeralPqProvider; - // Define unique Identifiers derivations used in InboxV2 fn delivery_address_for(account_id: &AccountId) -> String { blake2b_hex::(&["InboxV2|", "delivery_address|", account_id.as_str()]) @@ -53,44 +44,14 @@ pub trait MlsProvider: OpenMlsProvider { /// An PQ focused Conversation initializer. /// InboxV2 Incorporates an Account based identity system to support PQ based conversation protocols /// such as MLS. -pub struct InboxV2 -where - IP: IdentityProvider, -{ +pub struct InboxV2 { // Account_id field is an owned value, so it can be returned via reference. account_id: AccountId, - account: Rc>>, - ds: Rc>, - reg_service: Rc>, - store: Rc>, - causal: CausalHistoryStore, - mls_provider: Rc>, } -impl InboxV2 -where - IP: IdentityProvider, - DS: DeliveryService, - RS: RegistrationService, - CS: ChatStore, -{ - pub fn new( - account: IP, - ds: Rc>, - reg_service: Rc>, - store: Rc>, - ) -> Self { - let account_id = account.account_id().clone(); - let provider = MlsEphemeralPqProvider::new().unwrap(); - Self { - account_id, - account: Rc::new(RefCell::new(MlsIdentityProvider::new(account))), - ds, - reg_service, - store, - causal: CausalHistoryStore::new(), - mls_provider: Rc::new(RefCell::new(provider)), - } +impl InboxV2 { + pub fn new(account_id: AccountId) -> Self { + Self { account_id } } pub fn account_id(&self) -> &AccountId { @@ -98,14 +59,16 @@ where } /// Submit MlsKeypackage to registration service - pub fn register(&mut self) -> Result<(), ChatError> { - let keypackage_bytes = self.create_keypackage()?.tls_serialize_detached()?; + pub fn register( + &self, + cx: &mut ServiceContext, + ) -> Result<(), ChatError> { + let keypackage_bytes = Self::create_keypackage(cx)?.tls_serialize_detached()?; // TODO: (P3) Each keypackage can only be used once either enable... // "LastResort" package or publish multiple - self.reg_service - .borrow_mut() - .register(&*self.account.borrow(), keypackage_bytes) + cx.registry + .register(&cx.mls_identity, keypackage_bytes) .map_err(ChatError::generic) } @@ -117,19 +80,11 @@ where conversation_id_for(&self.account_id) } - pub fn create_group_v1( + pub fn handle_frame( &self, - ) -> Result, ChatError> { - GroupV1Convo::new( - self.account.clone(), - self.mls_provider.clone(), - self.ds.clone(), - self.reg_service.clone(), - self.causal.clone(), - ) - } - - pub fn handle_frame(&self, payload_bytes: &[u8]) -> Result { + payload_bytes: &[u8], + cx: &mut ServiceContext, + ) -> Result { let inbox_frame = InboxV2Frame::decode(payload_bytes)?; let Some(payload) = inbox_frame.payload else { @@ -138,12 +93,16 @@ where match payload { InviteType::GroupV1(group_v1_heavy_invite) => { - self.handle_heavy_invite(group_v1_heavy_invite) + self.handle_heavy_invite(group_v1_heavy_invite, cx) } } } - fn persist_convo(&self, convo: impl GroupConvo) -> Result<(), ChatError> { + fn persist_convo( + &self, + convo: &GroupV1Convo, + cx: &mut ServiceContext, + ) -> Result<(), ChatError> { // TODO: (P2) Remove remote_convo_id this is an implementation detail specific to PrivateV1 // TODO: (P3) Implement From for ConversationMeta let meta = ConversationMeta { @@ -151,12 +110,16 @@ where remote_convo_id: "0".into(), kind: ConversationKind::GroupV1, }; - self.store.borrow_mut().save_conversation(&meta)?; + cx.store.save_conversation(&meta)?; // TODO: (P1) Persist state Ok(()) } - fn handle_heavy_invite(&self, invite: GroupV1HeavyInvite) -> Result { + fn handle_heavy_invite( + &self, + invite: GroupV1HeavyInvite, + cx: &mut ServiceContext, + ) -> Result { let (msg_in, _rest) = MlsMessageIn::tls_deserialize_bytes(invite.welcome_bytes.as_slice())?; let MlsMessageBodyIn::Welcome(welcome) = msg_in.extract() else { @@ -166,16 +129,9 @@ where )); }; - let convo = GroupV1Convo::new_from_welcome( - self.account.clone(), - self.mls_provider.clone(), - self.ds.clone(), - self.reg_service.clone(), - self.causal.clone(), - welcome, - )?; + let convo = GroupV1Convo::new_from_welcome(cx, welcome)?; let convo_id: ConversationId = convo.id().to_string(); - self.persist_convo(convo)?; + self.persist_convo(&convo, cx)?; Ok(InboxOutcome { new_conversation: NewConversation { convo_id, @@ -185,50 +141,27 @@ where }) } - fn create_keypackage(&self) -> Result { + fn create_keypackage( + cx: &ServiceContext, + ) -> Result { let capabilities = Capabilities::builder() .ciphersuites(vec![ Ciphersuite::MLS_256_XWING_CHACHA20POLY1305_SHA256_Ed25519, ]) .extensions(vec![ExtensionType::ApplicationId]) .build(); - - let signer = self.account.borrow(); let a = KeyPackage::builder() .leaf_node_capabilities(capabilities) .build( Ciphersuite::MLS_256_XWING_CHACHA20POLY1305_SHA256_Ed25519, - &*self.mls_provider.borrow(), - &*signer, - signer.get_credential(), + &cx.mls_provider, + &cx.mls_identity, + cx.mls_identity.get_credential(), ) .expect("Failed to build KeyPackage"); Ok(a.key_package().clone()) } - - pub fn load_mls_convo( - &self, - convo_id: String, - ) -> Result, ChatError> { - let group_id_bytes = hex::decode(&convo_id).map_err(ChatError::generic)?; - let group_id = GroupId::from_slice(&group_id_bytes); - let convo = GroupV1Convo::load( - self.account.clone(), - self.mls_provider.clone(), - self.ds.clone(), - self.reg_service.clone(), - self.causal.clone(), - convo_id, - group_id, - )?; - - Ok(convo) - } - - pub fn take_missing_messages(&self) -> Vec { - self.causal.take_missing() - } } #[derive(Clone, PartialEq, Message)] diff --git a/core/conversations/src/lib.rs b/core/conversations/src/lib.rs index f20de26..5867558 100644 --- a/core/conversations/src/lib.rs +++ b/core/conversations/src/lib.rs @@ -1,13 +1,14 @@ mod account; mod causal_history; -mod context; mod conversation; +mod core; mod crypto; mod errors; mod inbox; mod inbox_v2; mod outcomes; mod proto; +mod service_context; mod service_traits; mod types; mod utils; @@ -16,12 +17,12 @@ pub use account::LogosAccount; pub use causal_history::{Frontier, MissingMessage}; pub use chat_sqlite::ChatStorage; pub use chat_sqlite::StorageConfig; -pub use context::{Context, ConversationId, Introduction}; -pub use conversation::GroupConvo; +pub use core::{ConversationId, Core, Introduction}; pub use errors::ChatError; pub use outcomes::{ Content, ConversationClass, ConvoOutcome, InboxOutcome, NewConversation, PayloadOutcome, }; +pub use service_context::ExternalServices; pub use service_traits::{DeliveryService, IdentityProvider, RegistrationService}; pub use storage::ConversationKind; pub use types::{AccountId, AddressedEnvelope}; diff --git a/core/conversations/src/service_context.rs b/core/conversations/src/service_context.rs new file mode 100644 index 0000000..1bce50c --- /dev/null +++ b/core/conversations/src/service_context.rs @@ -0,0 +1,98 @@ +//! Bundles the services a conversation operation needs into one [`ServiceContext`]. + +use crypto::Identity; +use storage::ChatStore; + +use crate::account::LogosAccount; +use crate::causal_history::CausalHistoryStore; +use crate::inbox_v2::{MlsEphemeralPqProvider, MlsIdentityProvider}; +use crate::{DeliveryService, RegistrationService}; + +/// Bundles the external service types (`DS`, `RS`, `CS`) behind one `S`. The +/// `(DS, RS, CS)` tuple impl lets them still be supplied separately. +pub trait ExternalServices { + type DS: DeliveryService; + type RS: RegistrationService; + type CS: ChatStore; +} + +impl ExternalServices for (DS, RS, CS) +where + DS: DeliveryService, + RS: RegistrationService, + CS: ChatStore, +{ + type DS = DS; + type RS = RS; + type CS = CS; +} + +/// Bundles every service a conversation operation may need. +pub(crate) struct ServiceContext { + pub(crate) ds: S::DS, + pub(crate) registry: S::RS, + pub(crate) store: S::CS, + pub(crate) mls_identity: MlsIdentityProvider, + pub(crate) mls_provider: MlsEphemeralPqProvider, + pub(crate) causal: CausalHistoryStore, + pub(crate) identity: Identity, +} + +#[cfg(test)] +mod test_support { + use super::*; + use crate::types::AddressedEnvelope; + use crate::{ChatError, IdentityProvider}; + + /// Delivery double that drops every payload. + #[derive(Debug)] + pub(crate) struct NoopDelivery; + + impl DeliveryService for NoopDelivery { + type Error = std::convert::Infallible; + + fn publish(&mut self, _envelope: AddressedEnvelope) -> Result<(), Self::Error> { + Ok(()) + } + + fn subscribe(&mut self, _delivery_address: &str) -> Result<(), Self::Error> { + Ok(()) + } + } + + /// Registration double that holds no key packages. + #[derive(Debug)] + pub(crate) struct NoopRegistration; + + impl RegistrationService for NoopRegistration { + type Error = std::convert::Infallible; + + fn register( + &mut self, + _identity: &dyn IdentityProvider, + _key_bundle: Vec, + ) -> Result<(), Self::Error> { + Ok(()) + } + + fn retrieve(&self, _device_id: &str) -> Result>, Self::Error> { + Ok(None) + } + } + + impl ServiceContext<(NoopDelivery, NoopRegistration, CS)> { + /// Builds a context around a real store, stubbing other services. + pub(crate) fn for_test(name: &str, store: CS) -> Result { + let account = LogosAccount::new_test(name); + Ok(Self { + ds: NoopDelivery, + registry: NoopRegistration, + store, + mls_identity: MlsIdentityProvider::new(account), + mls_provider: MlsEphemeralPqProvider::new().map_err(ChatError::generic)?, + causal: CausalHistoryStore::new(), + identity: Identity::new(name), + }) + } + } +} diff --git a/core/conversations/src/service_traits.rs b/core/conversations/src/service_traits.rs index bf25162..1f3093c 100644 --- a/core/conversations/src/service_traits.rs +++ b/core/conversations/src/service_traits.rs @@ -10,7 +10,7 @@ use crate::types::{AccountId, AddressedEnvelope}; /// A Delivery service is responsible for payload transport. /// This interface allows Conversations to send payloads on the wire as well as /// register interest in delivery_addresses. Client implementations are responsible -/// for providing the inbound payloads to Context::handle_payload. +/// for providing the inbound payloads to Core::handle_payload. pub trait DeliveryService: Debug { type Error: Display + Debug; fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Self::Error>; diff --git a/core/integration_tests_core/tests/causal_history.rs b/core/integration_tests_core/tests/causal_history.rs index f648fe8..a6f3b65 100644 --- a/core/integration_tests_core/tests/causal_history.rs +++ b/core/integration_tests_core/tests/causal_history.rs @@ -7,21 +7,21 @@ use std::ops::{Deref, DerefMut}; use components::{EphemeralRegistry, LocalBroadcaster, MemStore}; -use libchat::{Context, MissingMessage}; +use libchat::{Core, MissingMessage}; struct Client { - inner: Context, + inner: Core<(LocalBroadcaster, EphemeralRegistry, MemStore)>, } impl Client { - fn init(ctx: Context) -> Self { - Client { inner: ctx } + fn init(core: Core<(LocalBroadcaster, EphemeralRegistry, MemStore)>) -> Self { + Client { inner: core } } /// Poll every pending payload and feed it to the protocol. fn process_messages(&mut self) { let messages: Vec<_> = { - let mut ds = self.inner.ds(); + let ds = self.inner.ds(); std::iter::from_fn(|| ds.poll()).collect() }; for data in messages { @@ -32,13 +32,13 @@ impl Client { /// Poll every pending payload and discard it — simulates messages that /// never reach this client. fn drop_pending_messages(&mut self) { - let mut ds = self.inner.ds(); + let ds = self.inner.ds(); while ds.poll().is_some() {} } } impl Deref for Client { - type Target = Context; + type Target = Core<(LocalBroadcaster, EphemeralRegistry, MemStore)>; fn deref(&self) -> &Self::Target { &self.inner } @@ -56,29 +56,22 @@ fn missing_group_message_is_detected() { let rs = EphemeralRegistry::new(); let saro_ctx = - Context::new_with_name("saro", ds.new_consumer(), rs.clone(), MemStore::new()).unwrap(); + Core::new_with_name("saro", ds.new_consumer(), rs.clone(), MemStore::new()).unwrap(); - let raya_ctx = Context::new_with_name("raya", ds.clone(), rs.clone(), MemStore::new()).unwrap(); + let raya_ctx = Core::new_with_name("raya", ds.clone(), rs.clone(), MemStore::new()).unwrap(); let mut saro = Client::init(saro_ctx); let mut raya = Client::init(raya_ctx); // Saro creates a group with Raya. let raya_id = raya.account_id().clone(); - let convo_id = saro - .create_group_convo(&[&raya_id]) - .unwrap() - .id() - .to_string(); + let convo_id = saro.create_group_convo(&[&raya_id]).unwrap().to_string(); // Raya joins (processes the Welcome + commit). raya.process_messages(); // Message 1 is delivered normally. - saro.get_convo(convo_id.as_str()) - .unwrap() - .send_content(b"first") - .unwrap(); + saro.send_content(convo_id.as_str(), b"first").unwrap(); raya.process_messages(); assert!( raya.take_missing_messages().is_empty(), @@ -86,17 +79,11 @@ fn missing_group_message_is_detected() { ); // Message 2 is published but never reaches Raya. - saro.get_convo(convo_id.as_str()) - .unwrap() - .send_content(b"second") - .unwrap(); + saro.send_content(convo_id.as_str(), b"second").unwrap(); raya.drop_pending_messages(); // Message 3 is delivered; its causal history references the missing M2. - saro.get_convo(convo_id.as_str()) - .unwrap() - .send_content(b"third") - .unwrap(); + saro.send_content(convo_id.as_str(), b"third").unwrap(); raya.process_messages(); let missing: Vec = raya.take_missing_messages(); diff --git a/core/integration_tests_core/tests/mls_integration.rs b/core/integration_tests_core/tests/mls_integration.rs index 7330cf9..7cecf98 100644 --- a/core/integration_tests_core/tests/mls_integration.rs +++ b/core/integration_tests_core/tests/mls_integration.rs @@ -2,15 +2,14 @@ use std::ops::{Deref, DerefMut}; use components::{EphemeralRegistry, LocalBroadcaster, MemStore}; use libchat::{ - Content, Context, ConversationClass, ConvoOutcome, GroupConvo, NewConversation, PayloadOutcome, - hex_trunc, + Content, ConversationClass, ConvoOutcome, Core, NewConversation, PayloadOutcome, hex_trunc, }; type ResultCallback = Box; // Simple client Functionality for testing struct Client { - inner: Context, + inner: Core<(LocalBroadcaster, EphemeralRegistry, MemStore)>, on_result: Option, new_conversations: Vec, received_messages: Vec<(libchat::ConversationId, Content)>, @@ -18,11 +17,11 @@ struct Client { impl Client { fn init( - ctx: Context, + core: Core<(LocalBroadcaster, EphemeralRegistry, MemStore)>, cb: Option, ) -> Self { Client { - inner: ctx, + inner: core, on_result: cb.map(|f| Box::new(f) as ResultCallback), new_conversations: Vec::new(), received_messages: Vec::new(), @@ -31,7 +30,7 @@ impl Client { fn process_messages(&mut self) { let payloads: Vec<_> = { - let mut ds = self.ds(); + let ds = self.ds(); std::iter::from_fn(|| ds.poll()).collect() }; @@ -58,18 +57,10 @@ impl Client { self.received_messages.push((outcome.convo_id, content)); } } - - fn convo( - &mut self, - convo_id: &str, - ) -> Box> { - // TODO: (P1) Convos are being copied somewhere, which means hanging on to a reference causes state desync - self.get_convo(convo_id).unwrap() - } } impl Deref for Client { - type Target = Context; + type Target = Core<(LocalBroadcaster, EphemeralRegistry, MemStore)>; fn deref(&self) -> &Self::Target { &self.inner @@ -121,8 +112,8 @@ fn create_group() { let rs = EphemeralRegistry::new(); let saro_ctx = - Context::new_with_name("saro", ds.new_consumer(), rs.clone(), MemStore::new()).unwrap(); - let raya_ctx = Context::new_with_name("raya", ds.clone(), rs.clone(), MemStore::new()).unwrap(); + Core::new_with_name("saro", ds.new_consumer(), rs.clone(), MemStore::new()).unwrap(); + let raya_ctx = Core::new_with_name("raya", ds.clone(), rs.clone(), MemStore::new()).unwrap(); let mut clients = vec![ Client::init(saro_ctx, Some(pretty_print(" Saro "))), @@ -133,17 +124,17 @@ fn create_group() { const RAYA: usize = 1; let raya_id = clients[RAYA].account_id().clone(); - let s_convo = clients[SARO].create_group_convo(&[&raya_id]).unwrap(); - - let convo_id = s_convo.id().to_string(); + let convo_id = clients[SARO] + .create_group_convo(&[&raya_id]) + .unwrap() + .to_string(); // Raya can read this message because // 1) It was sent after add_members was committed, and // 2) LocalBroadcaster provides historical messages. clients[SARO] - .convo(&convo_id) - .send_content(b"ok who broke the group chat again") + .send_content(&convo_id, b"ok who broke the group chat again") .unwrap(); process(&mut clients); @@ -161,20 +152,18 @@ fn create_group() { ); clients[RAYA] - .convo(&convo_id) - .send_content(b"it was literally working five minutes ago") + .send_content(&convo_id, b"it was literally working five minutes ago") .unwrap(); process(&mut clients); - let pax_ctx = Context::new_with_name("pax", ds, rs, MemStore::new()).unwrap(); + let pax_ctx = Core::new_with_name("pax", ds, rs, MemStore::new()).unwrap(); clients.push(Client::init(pax_ctx, Some(pretty_print(" Pax")))); const PAX: usize = 2; let pax_id = clients[PAX].account_id().clone(); clients[SARO] - .convo(&convo_id) - .add_member(&[&pax_id]) + .group_add_member(&convo_id, &[&pax_id]) .unwrap(); process(&mut clients); @@ -190,15 +179,13 @@ fn create_group() { ); clients[PAX] - .convo(&convo_id) - .send_content(b"ngl the key rotation is cooked") + .send_content(&convo_id, b"ngl the key rotation is cooked") .unwrap(); process(&mut clients); clients[SARO] - .convo(&convo_id) - .send_content(b"bro we literally just added you to the group ") + .send_content(&convo_id, b"bro we literally just added you to the group ") .unwrap(); process(&mut clients); diff --git a/core/integration_tests_core/tests/private_integration.rs b/core/integration_tests_core/tests/private_integration.rs index 3dd9f3f..d09f6ca 100644 --- a/core/integration_tests_core/tests/private_integration.rs +++ b/core/integration_tests_core/tests/private_integration.rs @@ -1,23 +1,44 @@ use chat_sqlite::{ChatStorage, StorageConfig}; -use libchat::{Context, ConversationClass, Introduction, PayloadOutcome}; +use libchat::{ConversationClass, Core, Introduction, PayloadOutcome}; use storage::{ConversationStore, IdentityStore}; use tempfile::tempdir; use components::{EphemeralRegistry, LocalBroadcaster}; +type PrivateCore = Core<(LocalBroadcaster, EphemeralRegistry, ChatStorage)>; + +/// Drains everything published to `receiver`'s delivery service and feeds each +/// payload back through `handle_payload`, returning the observed outcomes. +fn deliver(receiver: &mut PrivateCore) -> Vec { + let payloads: Vec<_> = { + let ds = receiver.ds(); + std::iter::from_fn(|| ds.poll()).collect() + }; + payloads + .iter() + .map(|data| receiver.handle_payload(data).unwrap()) + .collect() +} + +/// Delivers to `receiver`, asserting it observed exactly one outcome. +fn recv_one(receiver: &mut PrivateCore) -> PayloadOutcome { + let mut outcomes = deliver(receiver); + assert_eq!( + outcomes.len(), + 1, + "expected exactly one delivered outcome, got {outcomes:?}" + ); + outcomes.pop().unwrap() +} + fn send_and_verify( - sender: &mut Context, - receiver: &mut Context, + sender: &mut PrivateCore, + receiver: &mut PrivateCore, convo_id: &str, content: &[u8], ) { - let payloads = sender.send_content(convo_id, content).unwrap(); - let payload = payloads.first().unwrap(); - let result = receiver.handle_payload(&payload.data).unwrap(); - let PayloadOutcome::Convo(co) = result else { - panic!("steady-state send should yield PayloadOutcome::Convo, got {result:?}"); - }; - let content_out = co + sender.send_content(convo_id, content).unwrap(); + let content_out = expect_convo(recv_one(receiver)) .content .expect("steady-state send should yield one content"); assert_eq!(content, content_out.bytes.as_slice()); @@ -29,8 +50,8 @@ fn ctx_integration() { let rs = EphemeralRegistry::new(); let mut saro = - Context::new_with_name("saro", ds.clone(), rs.clone(), ChatStorage::in_memory()).unwrap(); - let mut raya = Context::new_with_name("raya", ds, rs, ChatStorage::in_memory()).unwrap(); + Core::new_with_name("saro", ds.clone(), rs.clone(), ChatStorage::in_memory()).unwrap(); + let mut raya = Core::new_with_name("raya", ds, rs, ChatStorage::in_memory()).unwrap(); // Raya creates intro bundle and sends to Saro let bundle = raya.create_intro_bundle().unwrap(); @@ -38,11 +59,10 @@ fn ctx_integration() { // Saro initiates conversation with Raya let mut content = vec![10]; - let (saro_convo_id, payloads) = saro.create_private_convo(&intro, &content).unwrap(); + let saro_convo_id = saro.create_private_convo(&intro, &content).unwrap(); // Raya receives the invite + initial message - let payload = payloads.first().unwrap(); - let initial = raya.handle_payload(&payload.data).unwrap(); + let initial = recv_one(&mut raya); let PayloadOutcome::Inbox(io) = initial else { panic!("invite must yield PayloadOutcome::Inbox, got {initial:?}"); }; @@ -73,7 +93,7 @@ fn identity_persistence() { let ds = LocalBroadcaster::new(); let rs = EphemeralRegistry::new(); let store1 = ChatStorage::new(StorageConfig::InMemory).unwrap(); - let ctx1 = Context::new_with_name("alice", ds, rs, store1).unwrap(); + let ctx1 = Core::new_with_name("alice", ds, rs, store1).unwrap(); let pubkey1 = ctx1.identity().public_key(); let name1 = ctx1.installation_name().to_string(); @@ -92,9 +112,9 @@ fn open_persists_new_identity() { let ds = LocalBroadcaster::new(); let rs = EphemeralRegistry::new(); let store = ChatStorage::new(StorageConfig::File(db_path.clone())).unwrap(); - let ctx = Context::new_from_store("alice", ds, rs, store).unwrap(); - let pubkey = ctx.identity().public_key(); - drop(ctx); + let core = Core::new_from_store("alice", ds, rs, store).unwrap(); + let pubkey = core.identity().public_key(); + drop(core); let store = ChatStorage::new(StorageConfig::File(db_path)).unwrap(); let persisted = store.load_identity().unwrap().unwrap(); @@ -108,15 +128,14 @@ fn conversation_metadata_persistence() { let ds = LocalBroadcaster::new(); let rs = EphemeralRegistry::new(); let mut alice = - Context::new_with_name("alice", ds.clone(), rs.clone(), ChatStorage::in_memory()).unwrap(); - let mut bob = Context::new_with_name("bob", ds, rs, ChatStorage::in_memory()).unwrap(); + Core::new_with_name("alice", ds.clone(), rs.clone(), ChatStorage::in_memory()).unwrap(); + let mut bob = Core::new_with_name("bob", ds, rs, ChatStorage::in_memory()).unwrap(); let bundle = alice.create_intro_bundle().unwrap(); let intro = Introduction::try_from(bundle.as_slice()).unwrap(); - let (_, payloads) = bob.create_private_convo(&intro, b"hi").unwrap(); + bob.create_private_convo(&intro, b"hi").unwrap(); - let payload = payloads.first().unwrap(); - let result = alice.handle_payload(&payload.data).unwrap(); + let result = recv_one(&mut alice); let PayloadOutcome::Inbox(io) = result else { panic!("invite must yield PayloadOutcome::Inbox, got {result:?}"); }; @@ -135,33 +154,34 @@ fn conversation_full_flow() { let ds = LocalBroadcaster::new(); let rs = EphemeralRegistry::new(); let mut alice = - Context::new_with_name("alice", ds.clone(), rs.clone(), ChatStorage::in_memory()).unwrap(); - let mut bob = Context::new_with_name("bob", ds, rs, ChatStorage::in_memory()).unwrap(); + Core::new_with_name("alice", ds.clone(), rs.clone(), ChatStorage::in_memory()).unwrap(); + let mut bob = Core::new_with_name("bob", ds, rs, ChatStorage::in_memory()).unwrap(); let bundle = alice.create_intro_bundle().unwrap(); let intro = Introduction::try_from(bundle.as_slice()).unwrap(); - let (bob_convo_id, payloads) = bob.create_private_convo(&intro, b"hello").unwrap(); + let bob_convo_id = bob.create_private_convo(&intro, b"hello").unwrap(); - let payload = payloads.first().unwrap(); - let result = alice.handle_payload(&payload.data).unwrap(); + let result = recv_one(&mut alice); let PayloadOutcome::Inbox(io) = result else { panic!("invite must yield PayloadOutcome::Inbox, got {result:?}"); }; let alice_convo_id = io.new_conversation.convo_id.clone(); - let payloads = alice.send_content(&alice_convo_id, b"reply 1").unwrap(); - let payload = payloads.first().unwrap(); - let result = bob.handle_payload(&payload.data).unwrap(); + alice.send_content(&alice_convo_id, b"reply 1").unwrap(); assert_eq!( - expect_convo(result).content.expect("message content").bytes, + expect_convo(recv_one(&mut bob)) + .content + .expect("message content") + .bytes, b"reply 1" ); - let payloads = bob.send_content(&bob_convo_id, b"reply 2").unwrap(); - let payload = payloads.first().unwrap(); - let result = alice.handle_payload(&payload.data).unwrap(); + bob.send_content(&bob_convo_id, b"reply 2").unwrap(); assert_eq!( - expect_convo(result).content.expect("message content").bytes, + expect_convo(recv_one(&mut alice)) + .content + .expect("message content") + .bytes, b"reply 2" ); @@ -170,20 +190,22 @@ fn conversation_full_flow() { assert_eq!(convo_ids.len(), 1); // Continue exchanging messages - let payloads = bob.send_content(&bob_convo_id, b"more messages").unwrap(); - let payload = payloads.first().unwrap(); - let result = alice.handle_payload(&payload.data).expect("should decrypt"); + bob.send_content(&bob_convo_id, b"more messages").unwrap(); assert_eq!( - expect_convo(result).content.expect("message content").bytes, + expect_convo(recv_one(&mut alice)) + .content + .expect("message content") + .bytes, b"more messages" ); // Alice can also send back - let payloads = alice.send_content(&alice_convo_id, b"alice reply").unwrap(); - let payload = payloads.first().unwrap(); - let result = bob.handle_payload(&payload.data).unwrap(); + alice.send_content(&alice_convo_id, b"alice reply").unwrap(); assert_eq!( - expect_convo(result).content.expect("message content").bytes, + expect_convo(recv_one(&mut bob)) + .content + .expect("message content") + .bytes, b"alice reply" ); } diff --git a/crates/client-ffi/src/api.rs b/crates/client-ffi/src/api.rs index fc3ab54..6b4a25a 100644 --- a/crates/client-ffi/src/api.rs +++ b/crates/client-ffi/src/api.rs @@ -1,6 +1,7 @@ use safer_ffi::prelude::*; use crate::delivery::{CDelivery, DeliverFn}; +use libchat::ChatError; use logos_chat::{ChatClient, ClientError, ConversationClass, Event}; // --------------------------------------------------------------------------- @@ -245,12 +246,12 @@ fn client_create_conversation( error_code: ErrorCode::None as i32, convo_id: Some(convo_id), }, - Err(ClientError::Chat(_)) => CreateConvoResult { - error_code: ErrorCode::BadIntro as i32, + Err(ClientError::Chat(ChatError::Delivery(_))) => CreateConvoResult { + error_code: ErrorCode::DeliveryFail as i32, convo_id: None, }, - Err(ClientError::Delivery(_)) => CreateConvoResult { - error_code: ErrorCode::DeliveryFail as i32, + Err(ClientError::Chat(_)) => CreateConvoResult { + error_code: ErrorCode::BadIntro as i32, convo_id: None, }, }; @@ -291,7 +292,7 @@ fn client_send_message( }; match handle.0.send_message(id_str, content.as_slice()) { Ok(()) => ErrorCode::None, - Err(ClientError::Delivery(_)) => ErrorCode::DeliveryFail, + Err(ClientError::Chat(ChatError::Delivery(_))) => ErrorCode::DeliveryFail, Err(_) => ErrorCode::UnknownError, } } @@ -320,10 +321,6 @@ fn client_receive( error_code: ErrorCode::BadPayload as i32, events: Vec::new(), }, - Err(ClientError::Delivery(_)) => EventList { - error_code: ErrorCode::DeliveryFail as i32, - events: Vec::new(), - }, }; Box::new(result).into() } diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 5b66e4d..9cc018d 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -1,9 +1,8 @@ use std::sync::Arc; use libchat::{ - AddressedEnvelope, ChatError, ChatStorage, Context, ConversationId, ConvoOutcome, - DeliveryService, InboxOutcome, Introduction, PayloadOutcome, RegistrationService, - StorageConfig, + ChatError, ChatStorage, ConversationId, ConvoOutcome, Core, DeliveryService, InboxOutcome, + Introduction, PayloadOutcome, RegistrationService, StorageConfig, }; use components::EphemeralRegistry; @@ -12,7 +11,7 @@ use crate::errors::ClientError; use crate::event::Event; pub struct ChatClient { - ctx: Context, + core: Core<(D, R, ChatStorage)>, } // ── Default-registry constructors ──────────────────────────────────────────── @@ -23,7 +22,7 @@ impl ChatClient { let registry = EphemeralRegistry::new(); let store = ChatStorage::in_memory(); Self { - ctx: Context::new_with_name(name, delivery, registry, store).unwrap(), + core: Core::new_with_name(name, delivery, registry, store).unwrap(), } } @@ -35,11 +34,11 @@ impl ChatClient { name: impl Into, config: StorageConfig, delivery: D, - ) -> Result> { + ) -> Result { let store = ChatStorage::new(config).map_err(ChatError::from)?; let registry = EphemeralRegistry::new(); - let ctx = Context::new_from_store(name, delivery, registry, store)?; - Ok(Self { ctx }) + let core = Core::new_from_store(name, delivery, registry, store)?; + Ok(Self { core }) } } @@ -63,68 +62,54 @@ where config: StorageConfig, delivery: D, registry: R, - ) -> Result> { + ) -> Result { let store = ChatStorage::new(config).map_err(ChatError::from)?; - let mut ctx = Context::new_from_store(name, delivery, registry, store)?; - ctx.register_keypackage()?; - Ok(Self { ctx }) + let mut core = Core::new_from_store(name, delivery, registry, store)?; + core.register_keypackage()?; + Ok(Self { core }) } /// Returns the installation name (identity label) of this client. pub fn installation_name(&self) -> &str { - self.ctx.installation_name() + self.core.installation_name() } /// Produce a serialised introduction bundle for sharing out-of-band. - pub fn create_intro_bundle(&mut self) -> Result, ClientError> { - self.ctx.create_intro_bundle().map_err(Into::into) + pub fn create_intro_bundle(&mut self) -> Result, ClientError> { + self.core.create_intro_bundle().map_err(Into::into) } - /// Parse intro bundle bytes, initiate a private conversation, and deliver - /// all outbound envelopes. Returns this side's conversation ID. + /// Parse intro bundle bytes and initiate a private conversation. Returns + /// this side's conversation ID. pub fn create_conversation( &mut self, intro_bundle: &[u8], initial_content: &[u8], - ) -> Result> { + ) -> Result { let intro = Introduction::try_from(intro_bundle)?; - let (convo_id, envelopes) = self.ctx.create_private_convo(&intro, initial_content)?; - self.dispatch_all(envelopes)?; - Ok(convo_id) + self.core + .create_private_convo(&intro, initial_content) + .map_err(Into::into) } /// List all conversation IDs known to this client. - pub fn list_conversations(&self) -> Result, ClientError> { - self.ctx.list_conversations().map_err(Into::into) + pub fn list_conversations(&self) -> Result, ClientError> { + self.core.list_conversations().map_err(Into::into) } - /// Encrypt `content` and dispatch all outbound envelopes. - pub fn send_message( - &mut self, - convo_id: &str, - content: &[u8], - ) -> Result<(), ClientError> { - let envelopes = self.ctx.send_content(convo_id, content)?; - self.dispatch_all(envelopes) + /// Encrypt and send `content` to an existing conversation. + pub fn send_message(&mut self, convo_id: &str, content: &[u8]) -> Result<(), ClientError> { + self.core + .send_content(convo_id, content) + .map_err(Into::into) } /// Decrypt an inbound payload. Returns the events the payload produced, /// in causal order. May be empty for protocol-only frames. - pub fn receive(&mut self, payload: &[u8]) -> Result, ClientError> { - let result = self.ctx.handle_payload(payload)?; + pub fn receive(&mut self, payload: &[u8]) -> Result, ClientError> { + let result = self.core.handle_payload(payload)?; Ok(events_from_inbound(result)) } - - fn dispatch_all( - &mut self, - envelopes: Vec, - ) -> Result<(), ClientError> { - for env in envelopes { - let mut delivery = self.ctx.ds(); - delivery.publish(env).map_err(ClientError::Delivery)?; - } - Ok(()) - } } /// Walk an [`PayloadOutcome`] in causal order and emit one `Event` per diff --git a/crates/client/src/errors.rs b/crates/client/src/errors.rs index 322104c..23f352a 100644 --- a/crates/client/src/errors.rs +++ b/crates/client/src/errors.rs @@ -1,11 +1,7 @@ use libchat::ChatError; #[derive(Debug, thiserror::Error)] -pub enum ClientError { +pub enum ClientError { #[error(transparent)] Chat(#[from] ChatError), - /// Crypto state advanced but at least one envelope failed delivery. - /// Caller decides whether to retry. - #[error("delivery failed: {0:?}")] - Delivery(D), }