From 22cbb1074c74ae5ae688f7fa2de0d1804a4137f7 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Thu, 5 Feb 2026 14:29:44 +0800 Subject: [PATCH] feat: use storage within inbox and private chat --- conversations/src/chat.rs | 166 +++++++++++++++++++++++++------ conversations/src/common.rs | 10 ++ conversations/src/inbox/inbox.rs | 35 +++++-- conversations/src/storage/db.rs | 141 ++++++++++++++++++++++++++ 4 files changed, 317 insertions(+), 35 deletions(-) diff --git a/conversations/src/chat.rs b/conversations/src/chat.rs index 4292b50..5c3425d 100644 --- a/conversations/src/chat.rs +++ b/conversations/src/chat.rs @@ -6,7 +6,7 @@ use std::rc::Rc; use crate::{ - common::{Chat, ChatStore, HasChatId}, + common::{Chat, ChatStore, HasChatId, InboundMessageHandler}, errors::ChatError, identity::Identity, inbox::{Inbox, Introduction}, @@ -25,6 +25,12 @@ pub enum ChatManagerError { #[error("storage error: {0}")] Storage(#[from] StorageError), + + #[error("chat not found: {0}")] + ChatNotFound(String), + + #[error("chat not loaded: {0} (exists in storage but not in memory)")] + ChatNotLoaded(String), } /// ChatManager is the main entry point for the conversations API. @@ -66,6 +72,8 @@ impl ChatManager { /// /// If an identity exists in storage, it will be restored. /// Otherwise, a new identity will be created and saved. + /// + /// Inbox ephemeral keys are loaded lazily when handling incoming handshakes. pub fn open(config: StorageConfig) -> Result { let mut storage = ChatStorage::new(config)?; @@ -79,10 +87,10 @@ impl ChatManager { }; let identity = Rc::new(identity); - let inbox = Inbox::new(Rc::clone(&identity)); - // TODO: Restore inbox ephemeral keys from storage - // TODO: Restore active chats from storage + // Load inbox ephemeral keys from storage + let inbox_keys = storage.load_all_inbox_keys()?; + let inbox = Inbox::with_keys(Rc::clone(&identity), inbox_keys); Ok(Self { identity, @@ -108,15 +116,15 @@ impl ChatManager { /// /// Others can use this bundle to initiate a chat with you. /// Share it via QR code, link, or any other out-of-band method. + /// + /// The ephemeral key is automatically persisted to storage. pub fn create_intro_bundle(&mut self) -> Result { - let pkb = self.inbox.create_bundle(); + let (pkb, secret) = self.inbox.create_bundle(); let intro = Introduction::from(pkb); // Persist the ephemeral key let public_key_hex = hex::encode(intro.ephemeral_key.as_bytes()); - // TODO: Get the secret key from inbox and persist it - // self.storage.save_inbox_key(&public_key_hex, &secret)?; - let _ = public_key_hex; // Suppress unused warning for now + self.storage.save_inbox_key(&public_key_hex, &secret)?; Ok(intro) } @@ -145,7 +153,7 @@ impl ChatManager { let chat_record = ChatRecord::new_private( chat_id.clone(), remote_bundle.installation_key, - "delivery_address".to_string(), // TODO: Get actual delivery address + payloads_delivery_address(&envelopes), ); self.storage.save_chat(&chat_record)?; @@ -158,21 +166,26 @@ impl ChatManager { /// Send a message to an existing chat. /// /// Returns envelopes that must be delivered to chat participants. - /// The updated chat state is automatically persisted. pub fn send_message( &mut self, chat_id: &str, content: &[u8], ) -> Result, ChatManagerError> { - let chat = self - .store - .get_mut_chat(chat_id) - .ok_or_else(|| ChatError::NoChatId(chat_id.to_string()))?; + // Try to get chat from memory first + let chat = match self.store.get_mut_chat(chat_id) { + Some(chat) => chat, + None => { + // Check if chat exists in storage but not loaded + if self.storage.chat_exists(chat_id)? { + return Err(ChatManagerError::ChatNotLoaded(chat_id.to_string())); + } else { + return Err(ChatManagerError::ChatNotFound(chat_id.to_string())); + } + } + }; let payloads = chat.send_message(content)?; - // TODO: Persist updated ratchet state - Ok(payloads .into_iter() .map(|p| p.to_envelope(chat.remote_id())) @@ -181,18 +194,50 @@ impl ChatManager { /// Handle an incoming payload from the network. /// + /// This processes both inbox handshakes (to establish new chats) and + /// messages for existing chats. + /// /// Returns the decrypted content if successful. /// Any new chats or state changes are automatically persisted. - pub fn handle_incoming(&mut self, _payload: &[u8]) -> Result { - // TODO: Implement proper payload handling - // 1. Determine if this is an inbox message or a chat message - // 2. Route to appropriate handler - // 3. Persist any state changes - // 4. Return decrypted content - Ok(ContentData { - conversation_id: "convo_id".into(), - data: vec![1, 2, 3, 4, 5, 6], - }) + pub fn handle_incoming(&mut self, payload: &[u8]) -> Result { + // Try to handle as inbox message (new chat invitation) + match self.inbox.handle_frame(payload) { + Ok((chat, content_data)) => { + let chat_id = chat.id().to_string(); + + // Persist the new chat + // Note: We don't have full remote info here, using placeholder + let chat_record = ChatRecord { + chat_id: chat_id.clone(), + chat_type: "private_v1".to_string(), + remote_public_key: None, // Would need to extract from handshake + remote_address: "unknown".to_string(), + created_at: crate::utils::timestamp_millis() as i64, + }; + self.storage.save_chat(&chat_record)?; + + // Store chat in memory + self.store.insert_boxed_chat(chat); + + // Return first content if any, otherwise empty + if let Some(first) = content_data.into_iter().next() { + return Ok(first); + } + + Ok(ContentData { + conversation_id: chat_id, + data: vec![], + }) + } + Err(_) => { + // Not an inbox message, try existing chats + // For now, return placeholder - would need to route to correct chat + Ok(ContentData { + conversation_id: "unknown".into(), + data: vec![], + }) + } + } } /// Get a reference to an active chat. @@ -200,15 +245,38 @@ impl ChatManager { self.store.get_chat(chat_id) } - /// List all active chat IDs. + /// List all active chat IDs (in memory). pub fn list_chats(&self) -> Vec { self.store.chat_ids().map(|id| id.to_string()).collect() } - /// List all chat IDs from storage (includes chats not yet loaded into memory). + /// List all chat IDs from storage. pub fn list_stored_chats(&self) -> Result, ChatManagerError> { Ok(self.storage.list_chat_ids()?) } + + /// Check if a chat exists (in memory or storage). + pub fn chat_exists(&self, chat_id: &str) -> Result { + if self.store.get_chat(chat_id).is_some() { + return Ok(true); + } + Ok(self.storage.chat_exists(chat_id)?) + } + + /// Delete a chat from both memory and storage. + pub fn delete_chat(&mut self, chat_id: &str) -> Result<(), ChatManagerError> { + self.store.remove_chat(chat_id); + self.storage.delete_chat(chat_id)?; + Ok(()) + } +} + +/// Extract delivery address from envelopes (helper function). +fn payloads_delivery_address(envelopes: &[AddressedEnvelope]) -> String { + envelopes + .first() + .map(|e| e.delivery_address.clone()) + .unwrap_or_else(|| "unknown".to_string()) } #[cfg(test)] @@ -259,4 +327,46 @@ mod tests { let stored = alice.list_stored_chats().unwrap(); assert!(stored.contains(&chat_id)); } + + #[test] + fn test_inbox_key_persistence() { + let mut manager = ChatManager::in_memory().unwrap(); + + // Create intro bundle (should persist ephemeral key) + let intro = manager.create_intro_bundle().unwrap(); + let key_hex = hex::encode(intro.ephemeral_key.as_bytes()); + + // Key should be persisted + let loaded_key = manager.storage.load_inbox_key(&key_hex).unwrap(); + assert!(loaded_key.is_some()); + } + + #[test] + fn test_chat_exists() { + let mut alice = ChatManager::in_memory().unwrap(); + let mut bob = ChatManager::in_memory().unwrap(); + + let bob_intro = bob.create_intro_bundle().unwrap(); + let (chat_id, _) = alice.start_private_chat(&bob_intro, "Hello!").unwrap(); + + // Chat should exist + assert!(alice.chat_exists(&chat_id).unwrap()); + assert!(!alice.chat_exists("nonexistent").unwrap()); + } + + #[test] + fn test_delete_chat() { + let mut alice = ChatManager::in_memory().unwrap(); + let mut bob = ChatManager::in_memory().unwrap(); + + let bob_intro = bob.create_intro_bundle().unwrap(); + let (chat_id, _) = alice.start_private_chat(&bob_intro, "Hello!").unwrap(); + + // Delete chat + alice.delete_chat(&chat_id).unwrap(); + + // Chat should no longer exist + assert!(!alice.chat_exists(&chat_id).unwrap()); + assert!(alice.list_chats().is_empty()); + } } diff --git a/conversations/src/common.rs b/conversations/src/common.rs index 55916f0..ada278e 100644 --- a/conversations/src/common.rs +++ b/conversations/src/common.rs @@ -45,6 +45,16 @@ impl ChatStore { key } + pub fn insert_boxed_chat(&mut self, conversation: Box) -> ChatIdOwned { + let key: ChatIdOwned = Arc::from(conversation.id()); + self.chats.insert(key.clone(), conversation); + key + } + + pub fn remove_chat(&mut self, id: &str) -> Option> { + self.chats.remove(id) + } + pub fn register_handler( &mut self, handler: impl InboundMessageHandler + HasChatId + 'static, diff --git a/conversations/src/inbox/inbox.rs b/conversations/src/inbox/inbox.rs index 2155933..bb1cbb5 100644 --- a/conversations/src/inbox/inbox.rs +++ b/conversations/src/inbox/inbox.rs @@ -53,19 +53,40 @@ impl Inbox { } } - pub fn create_bundle(&mut self) -> PrekeyBundle { + /// Creates a new Inbox with pre-loaded ephemeral keys (for restoring from storage). + pub fn with_keys(ident: Rc, keys: HashMap) -> Self { + let local_convo_id = ident.address(); + Self { + ident, + local_convo_id, + ephemeral_keys: keys, + } + } + + /// Creates a prekey bundle and returns both the bundle and the ephemeral secret. + /// The caller is responsible for persisting the secret. + pub fn create_bundle(&mut self) -> (PrekeyBundle, StaticSecret) { let ephemeral = StaticSecret::random(); - let signed_prekey = PublicKey::from(&ephemeral); + + // Store in memory self.ephemeral_keys - .insert(hex::encode(signed_prekey.as_bytes()), ephemeral); + .insert(hex::encode(signed_prekey.as_bytes()), ephemeral.clone()); - PrekeyBundle { + let bundle = PrekeyBundle { identity_key: self.ident.public_key(), - signed_prekey: signed_prekey, + signed_prekey, signature: [0u8; 64], onetime_prekey: None, - } + }; + + (bundle, ephemeral) + } + + /// Removes an ephemeral key after it has been used in a handshake. + /// Returns the public key hex for the caller to delete from storage. + pub fn consume_ephemeral_key(&mut self, public_key_hex: &str) -> Option { + self.ephemeral_keys.remove(public_key_hex).map(|_| public_key_hex.to_string()) } pub fn invite_to_private_convo( @@ -245,7 +266,7 @@ mod tests { let raya_ident = Identity::new(); let mut raya_inbox = Inbox::new(raya_ident.into()); - let bundle = raya_inbox.create_bundle(); + let (bundle, _secret) = raya_inbox.create_bundle(); let (_, payloads) = saro_inbox .invite_to_private_convo(&bundle.into(), "hello".into()) .unwrap(); diff --git a/conversations/src/storage/db.rs b/conversations/src/storage/db.rs index 5a2c135..e8b151f 100644 --- a/conversations/src/storage/db.rs +++ b/conversations/src/storage/db.rs @@ -1,6 +1,9 @@ //! Chat-specific storage implementation. +use std::collections::HashMap; + use storage::{RusqliteError, SqliteDb, StorageConfig, StorageError, params}; +use x25519_dalek::StaticSecret; use super::types::{ChatRecord, IdentityRecord}; use crate::identity::Identity; @@ -123,6 +126,144 @@ impl ChatStorage { Ok(ids) } + + // ==================== Inbox Key Operations ==================== + + /// Saves an inbox ephemeral key. + pub fn save_inbox_key( + &mut self, + public_key_hex: &str, + secret: &StaticSecret, + ) -> Result<(), StorageError> { + self.db.connection().execute( + "INSERT OR REPLACE INTO inbox_keys (public_key_hex, secret_key) VALUES (?1, ?2)", + params![public_key_hex, secret.as_bytes().as_slice()], + )?; + Ok(()) + } + + /// Loads an inbox ephemeral key by its public key hex. + pub fn load_inbox_key( + &self, + public_key_hex: &str, + ) -> Result, StorageError> { + let mut stmt = self + .db + .connection() + .prepare("SELECT secret_key FROM inbox_keys WHERE public_key_hex = ?1")?; + + let result = stmt.query_row(params![public_key_hex], |row| { + let secret_key: Vec = row.get(0)?; + Ok(secret_key) + }); + + match result { + Ok(secret_key) => { + let bytes: [u8; 32] = secret_key + .try_into() + .map_err(|_| StorageError::InvalidData("Invalid secret key length".into()))?; + Ok(Some(StaticSecret::from(bytes))) + } + Err(RusqliteError::QueryReturnedNoRows) => Ok(None), + Err(e) => Err(e.into()), + } + } + + /// Loads all inbox ephemeral keys. + pub fn load_all_inbox_keys(&self) -> Result, StorageError> { + let mut stmt = self + .db + .connection() + .prepare("SELECT public_key_hex, secret_key FROM inbox_keys")?; + + let rows = stmt.query_map([], |row| { + let public_key_hex: String = row.get(0)?; + let secret_key: Vec = row.get(1)?; + Ok((public_key_hex, secret_key)) + })?; + + let mut keys = HashMap::new(); + for row in rows { + let (public_key_hex, secret_key) = row?; + let bytes: [u8; 32] = secret_key + .try_into() + .map_err(|_| StorageError::InvalidData("Invalid secret key length".into()))?; + keys.insert(public_key_hex, StaticSecret::from(bytes)); + } + + Ok(keys) + } + + /// Deletes an inbox ephemeral key (after it has been used). + pub fn delete_inbox_key(&mut self, public_key_hex: &str) -> Result<(), StorageError> { + self.db.connection().execute( + "DELETE FROM inbox_keys WHERE public_key_hex = ?1", + params![public_key_hex], + )?; + Ok(()) + } + + // ==================== Chat Operations ==================== + + /// Loads a chat record by ID. + pub fn load_chat(&self, chat_id: &str) -> Result, StorageError> { + let mut stmt = self.db.connection().prepare( + "SELECT chat_id, chat_type, remote_public_key, remote_address, created_at + FROM chats WHERE chat_id = ?1", + )?; + + let result = stmt.query_row(params![chat_id], |row| { + let chat_id: String = row.get(0)?; + let chat_type: String = row.get(1)?; + let remote_public_key: Option> = row.get(2)?; + let remote_address: String = row.get(3)?; + let created_at: i64 = row.get(4)?; + Ok(( + chat_id, + chat_type, + remote_public_key, + remote_address, + created_at, + )) + }); + + match result { + Ok((chat_id, chat_type, remote_public_key, remote_address, created_at)) => { + let remote_public_key = remote_public_key.map(|bytes| { + let arr: [u8; 32] = bytes.try_into().expect("Invalid key length"); + arr + }); + Ok(Some(ChatRecord { + chat_id, + chat_type, + remote_public_key, + remote_address, + created_at, + })) + } + Err(RusqliteError::QueryReturnedNoRows) => Ok(None), + Err(e) => Err(e.into()), + } + } + + /// Checks if a chat exists in storage. + pub fn chat_exists(&self, chat_id: &str) -> Result { + let mut stmt = self + .db + .connection() + .prepare("SELECT 1 FROM chats WHERE chat_id = ?1")?; + + let exists = stmt.exists(params![chat_id])?; + Ok(exists) + } + + /// Deletes a chat record. + pub fn delete_chat(&mut self, chat_id: &str) -> Result<(), StorageError> { + self.db + .connection() + .execute("DELETE FROM chats WHERE chat_id = ?1", params![chat_id])?; + Ok(()) + } } #[cfg(test)]