From 77a670c66889db7c5f5b0ccd2334a517d1101f81 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Thu, 5 Feb 2026 16:58:30 +0800 Subject: [PATCH] chore: refactor empheral keys storage --- conversations/examples/persist_chat.rs | 2 +- conversations/src/chat.rs | 34 ++++++---- conversations/src/common.rs | 14 ---- conversations/src/inbox/inbox.rs | 88 ++++++++++---------------- conversations/src/storage/db.rs | 45 +++++++------ 5 files changed, 82 insertions(+), 101 deletions(-) diff --git a/conversations/examples/persist_chat.rs b/conversations/examples/persist_chat.rs index 82702fe..0e8f215 100644 --- a/conversations/examples/persist_chat.rs +++ b/conversations/examples/persist_chat.rs @@ -3,7 +3,7 @@ //! This example demonstrates the complete chat flow using ChatManager, //! which automatically handles all storage operations. //! -//! Run with: cargo run -p logos-chat --example chat_sesspersist_chat +//! Run with: cargo run -p logos-chat --example persist_chat use logos_chat::{ChatManager, StorageConfig}; use tempfile::TempDir; diff --git a/conversations/src/chat.rs b/conversations/src/chat.rs index 5a3887d..69b5829 100644 --- a/conversations/src/chat.rs +++ b/conversations/src/chat.rs @@ -9,7 +9,7 @@ use double_ratchets::storage::RatchetStorage; use prost::Message; use crate::{ - common::{Chat, HasChatId, InboundMessageHandler}, + common::{Chat, HasChatId}, dm::privatev1::PrivateV1Convo, errors::ChatError, identity::Identity, @@ -92,10 +92,7 @@ impl ChatManager { }; let identity = Rc::new(identity); - - // Load inbox ephemeral keys from storage - let inbox_keys = storage.load_all_inbox_keys()?; - let inbox = Inbox::with_keys(Rc::clone(&identity), inbox_keys); + let inbox = Inbox::new(Rc::clone(&identity)); Ok(Self { identity, @@ -109,7 +106,7 @@ impl ChatManager { /// /// Uses a shared in-memory SQLite database so that multiple storage /// instances within the same ChatManager share data. - /// + /// /// The `db_name` should be unique per ChatManager instance to avoid /// sharing data between different users. pub fn in_memory(db_name: &str) -> Result { @@ -244,9 +241,10 @@ impl ChatManager { return self.handle_inbox_handshake(chat_id, &envelope.payload); } - // Not a valid envelope - generate a new chat ID (for backwards compatibility) - let new_chat_id = crate::utils::generate_chat_id(); - self.handle_inbox_handshake(&new_chat_id, payload) + // Not a valid envelope format + Err(ChatManagerError::Chat(ChatError::Protocol( + "invalid envelope format".to_string(), + ))) } /// Handle an inbox handshake to establish a new chat. @@ -255,10 +253,17 @@ impl ChatManager { conversation_hint: &str, payload: &[u8], ) -> Result { + // Extract the ephemeral key hex from the payload + let key_hex = Inbox::extract_ephemeral_key_hex(payload)?; + + // Load the ephemeral key from storage + let ephemeral_key = self.storage.load_inbox_key(&key_hex)? + .ok_or_else(|| ChatManagerError::Chat(ChatError::UnknownEphemeralKey()))?; + let ratchet_storage = self.create_ratchet_storage()?; let result = self .inbox - .handle_frame(ratchet_storage, conversation_hint, payload)?; + .handle_frame(ratchet_storage, conversation_hint, payload, &ephemeral_key)?; let chat_id = result.convo.id().to_string(); @@ -272,6 +277,9 @@ impl ChatManager { }; self.storage.save_chat(&chat_record)?; + // Delete the ephemeral key from storage after successful handshake + self.storage.delete_inbox_key(&key_hex)?; + // Ratchet state is automatically persisted by RatchetSession // result.convo is dropped here - state already saved @@ -394,9 +402,9 @@ mod tests { let intro = manager.create_intro_bundle().unwrap(); let key_hex = hex::encode(intro.ephemeral_key.as_bytes()); - // Key should be persisted - let all_keys = manager.storage.load_all_inbox_keys().unwrap(); - assert!(all_keys.contains_key(&key_hex)); + // Key should be persisted - load it directly + let loaded_key = manager.storage.load_inbox_key(&key_hex).unwrap(); + assert!(loaded_key.is_some()); } #[test] diff --git a/conversations/src/common.rs b/conversations/src/common.rs index 6f0cfad..402389d 100644 --- a/conversations/src/common.rs +++ b/conversations/src/common.rs @@ -3,7 +3,6 @@ use std::fmt::Debug; use crate::dm::privatev1::PrivateV1Convo; pub use crate::errors::ChatError; use crate::types::AddressedEncryptedPayload; -use double_ratchets::storage::RatchetStorage; pub type ChatId<'a> = &'a str; @@ -21,19 +20,6 @@ pub struct InboxHandleResult { pub initial_content: Option>, } -pub trait InboundMessageHandler { - /// Handle an incoming inbox frame. - /// - /// `conversation_hint` is the sender's conversation ID from the envelope, - /// which should be used as the shared conversation ID for this chat. - fn handle_frame( - &mut self, - storage: RatchetStorage, - conversation_hint: &str, - encoded_payload: &[u8], - ) -> Result; -} - pub trait Chat: HasChatId + Debug { fn send_message(&mut self, content: &[u8]) -> Result, ChatError>; diff --git a/conversations/src/inbox/inbox.rs b/conversations/src/inbox/inbox.rs index 06969da..c027201 100644 --- a/conversations/src/inbox/inbox.rs +++ b/conversations/src/inbox/inbox.rs @@ -2,13 +2,12 @@ use hex; use prost::Message; use prost::bytes::Bytes; use rand_core::OsRng; -use std::collections::HashMap; use std::rc::Rc; use crypto::{PrekeyBundle, SecretKey}; use double_ratchets::storage::RatchetStorage; -use crate::common::{Chat, ChatId, HasChatId, InboundMessageHandler, InboxHandleResult}; +use crate::common::{Chat, ChatId, HasChatId, InboxHandleResult}; use crate::dm::privatev1::PrivateV1Convo; use crate::errors::ChatError; use crate::identity::Identity; @@ -29,7 +28,6 @@ fn delivery_address_for_installation(_: PublicKey) -> String { pub struct Inbox { ident: Rc, local_convo_id: String, - ephemeral_keys: HashMap, } impl<'a> std::fmt::Debug for Inbox { @@ -37,10 +35,6 @@ impl<'a> std::fmt::Debug for Inbox { f.debug_struct("Inbox") .field("ident", &self.ident) .field("convo_id", &self.local_convo_id) - .field( - "ephemeral_keys", - &format!("<{} keys>", self.ephemeral_keys.len()), - ) .finish() } } @@ -51,29 +45,14 @@ impl Inbox { Self { ident, local_convo_id, - ephemeral_keys: HashMap::::new(), - } - } - - /// Creates a new Inbox with pre-loaded ephemeral keys (for restoring from storage). - pub fn with_keys(ident: Rc, keys: HashMap) -> Self { - let local_convo_id = ident.address(); - Self { - ident, - local_convo_id, - ephemeral_keys: keys, } } /// Creates a prekey bundle and returns both the bundle and the ephemeral secret. - /// The caller is responsible for persisting the secret. - pub fn create_bundle(&mut self) -> (PrekeyBundle, StaticSecret) { + /// The caller is responsible for persisting the secret to storage. + pub fn create_bundle(&self) -> (PrekeyBundle, StaticSecret) { let ephemeral = StaticSecret::random(); let signed_prekey = PublicKey::from(&ephemeral); - - // Store in memory - self.ephemeral_keys - .insert(hex::encode(signed_prekey.as_bytes()), ephemeral.clone()); let bundle = PrekeyBundle { identity_key: self.ident.public_key(), @@ -85,12 +64,6 @@ impl Inbox { (bundle, ephemeral) } - /// Removes an ephemeral key after it has been used in a handshake. - /// Returns the public key hex for the caller to delete from storage. - pub fn consume_ephemeral_key(&mut self, public_key_hex: &str) -> Option { - self.ephemeral_keys.remove(public_key_hex).map(|_| public_key_hex.to_string()) - } - pub fn invite_to_private_convo( &self, storage: RatchetStorage, @@ -214,25 +187,30 @@ impl Inbox { Ok(frame) } - fn lookup_ephemeral_key(&self, key: &str) -> Result<&StaticSecret, ChatError> { - self.ephemeral_keys - .get(key) - .ok_or_else(|| return ChatError::UnknownEphemeralKey()) - } -} + /// Extracts the ephemeral public key hex from an incoming handshake message. + /// Returns the key hex that should be used to look up the secret from storage. + pub fn extract_ephemeral_key_hex(message: &[u8]) -> Result { + if message.is_empty() { + return Err(ChatError::Protocol("empty message".into())); + } -impl HasChatId for Inbox { - fn id(&self) -> ChatId<'_> { - &self.local_convo_id - } -} + let handshake = Self::extract_payload(proto::EncryptedPayload::decode(message)?)?; + let header = handshake + .header + .ok_or(ChatError::UnexpectedPayload("InboxV1Header".into()))?; -impl InboundMessageHandler for Inbox { - fn handle_frame( - &mut self, + Ok(hex::encode(header.responder_ephemeral.as_ref())) + } + + /// Handle an incoming inbox handshake frame. + /// + /// The ephemeral_key must be provided by the caller (loaded from storage). + pub fn handle_frame( + &self, storage: RatchetStorage, conversation_hint: &str, message: &[u8], + ephemeral_key: &StaticSecret, ) -> Result { if message.is_empty() { return Err(ChatError::Protocol("empty message".into())); @@ -251,10 +229,6 @@ impl InboundMessageHandler for Inbox { .try_into() .map_err(|_| ChatError::InvalidKeyLength)?; - // Get Ephemeral key used by the initiator - let key_index = hex::encode(header.responder_ephemeral.as_ref()); - let ephemeral_key = self.lookup_ephemeral_key(&key_index)?; - // Perform handshake and decrypt frame let (seed_key, frame) = self.perform_handshake(ephemeral_key, header, handshake.payload)?; @@ -279,9 +253,6 @@ impl InboundMessageHandler for Inbox { None }; - // Consume the ephemeral key after successful handshake - self.consume_ephemeral_key(&key_index); - Ok(InboxHandleResult { convo, remote_public_key, @@ -292,6 +263,12 @@ impl InboundMessageHandler for Inbox { } } +impl HasChatId for Inbox { + fn id(&self) -> ChatId<'_> { + &self.local_convo_id + } +} + #[cfg(test)] mod tests { use super::*; @@ -302,13 +279,14 @@ mod tests { let saro_inbox = Inbox::new(saro_ident.into()); let raya_ident = Identity::new(); - let mut raya_inbox = Inbox::new(raya_ident.into()); + let raya_inbox = Inbox::new(raya_ident.into()); // Create in-memory storage for both parties let storage_sender = RatchetStorage::in_memory().unwrap(); let storage_receiver = RatchetStorage::in_memory().unwrap(); - let (bundle, _secret) = raya_inbox.create_bundle(); + // Create bundle - keep the secret for later use + let (bundle, ephemeral_secret) = raya_inbox.create_bundle(); let (saro_convo, payloads) = saro_inbox .invite_to_private_convo(storage_sender, &bundle.into(), "hello".into()) .unwrap(); @@ -323,8 +301,8 @@ mod tests { let mut buf = Vec::new(); payload.data.encode(&mut buf).unwrap(); - // Test handle_frame with valid payload - let result = raya_inbox.handle_frame(storage_receiver, &conversation_hint, &buf); + // Test handle_frame with valid payload - pass the ephemeral key directly + let result = raya_inbox.handle_frame(storage_receiver, &conversation_hint, &buf, &ephemeral_secret); assert!( result.is_ok(), diff --git a/conversations/src/storage/db.rs b/conversations/src/storage/db.rs index dd8652f..3bd0085 100644 --- a/conversations/src/storage/db.rs +++ b/conversations/src/storage/db.rs @@ -1,7 +1,5 @@ //! Chat-specific storage implementation. -use std::collections::HashMap; - use storage::{RusqliteError, SqliteDb, StorageConfig, StorageError, params}; use x25519_dalek::StaticSecret; @@ -111,29 +109,40 @@ impl ChatStorage { Ok(()) } - /// Loads all inbox ephemeral keys. - pub fn load_all_inbox_keys(&self) -> Result, StorageError> { + /// Loads a single inbox ephemeral key by public key hex. + pub fn load_inbox_key( + &self, + public_key_hex: &str, + ) -> Result, StorageError> { let mut stmt = self .db .connection() - .prepare("SELECT public_key_hex, secret_key FROM inbox_keys")?; + .prepare("SELECT secret_key FROM inbox_keys WHERE public_key_hex = ?1")?; - let rows = stmt.query_map([], |row| { - let public_key_hex: String = row.get(0)?; - let secret_key: Vec = row.get(1)?; - Ok((public_key_hex, secret_key)) - })?; + let result = stmt.query_row(params![public_key_hex], |row| { + let secret_key: Vec = row.get(0)?; + Ok(secret_key) + }); - let mut keys = HashMap::new(); - for row in rows { - let (public_key_hex, secret_key) = row?; - let bytes: [u8; 32] = secret_key - .try_into() - .map_err(|_| StorageError::InvalidData("Invalid secret key length".into()))?; - keys.insert(public_key_hex, StaticSecret::from(bytes)); + match result { + Ok(secret_key) => { + let bytes: [u8; 32] = secret_key + .try_into() + .map_err(|_| StorageError::InvalidData("Invalid secret key length".into()))?; + Ok(Some(StaticSecret::from(bytes))) + } + Err(RusqliteError::QueryReturnedNoRows) => Ok(None), + Err(e) => Err(e.into()), } + } - Ok(keys) + /// Deletes an inbox ephemeral key after it has been used. + pub fn delete_inbox_key(&mut self, public_key_hex: &str) -> Result<(), StorageError> { + self.db.connection().execute( + "DELETE FROM inbox_keys WHERE public_key_hex = ?1", + params![public_key_hex], + )?; + Ok(()) } // ==================== Chat Metadata Operations ====================