From bc9397d2e0fba7da1fa24b2557927c7193939201 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Fri, 13 Mar 2026 10:30:45 +0800 Subject: [PATCH] feat: wire with the double ratchet storage --- conversations/src/context.rs | 130 +++++++++++++++----- conversations/src/conversation.rs | 6 + conversations/src/conversation/privatev1.rs | 19 +++ double-ratchets/src/storage/db.rs | 6 + 4 files changed, 132 insertions(+), 29 deletions(-) diff --git a/conversations/src/context.rs b/conversations/src/context.rs index 46378f6..f6e9cdd 100644 --- a/conversations/src/context.rs +++ b/conversations/src/context.rs @@ -1,9 +1,10 @@ use std::rc::Rc; +use double_ratchets::{RatchetState, RatchetStorage}; use storage::StorageConfig; use crate::{ - conversation::{ConversationId, ConversationStore, Convo, Id}, + conversation::{ConversationId, ConversationStore, Convo, Id, PrivateV1Convo}, errors::ChatError, identity::Identity, inbox::Inbox, @@ -22,6 +23,7 @@ pub struct Context { store: ConversationStore, inbox: Inbox, storage: ChatStorage, + ratchet_storage: RatchetStorage, } impl Context { @@ -30,7 +32,8 @@ impl Context { /// If an identity exists in storage, it will be restored. /// Otherwise, a new identity will be created with the given name and saved. pub fn open(name: impl Into, config: StorageConfig) -> Result { - let mut storage = ChatStorage::new(config)?; + let mut storage = ChatStorage::new(config.clone())?; + let ratchet_storage = RatchetStorage::from_config(config)?; let name = name.into(); // Load or create identity @@ -45,11 +48,31 @@ impl Context { let identity = Rc::new(identity); let inbox = Inbox::new(Rc::clone(&identity)); + // Restore persisted conversations + let mut store = ConversationStore::new(); + let conversation_records = storage.load_conversations()?; + for record in conversation_records { + let convo: Box = match record.convo_type.as_str() { + "private_v1" => { + let dr_state: RatchetState = + ratchet_storage.load(&record.local_convo_id)?; + Box::new(PrivateV1Convo::from_stored( + record.local_convo_id, + record.remote_convo_id, + dr_state, + )) + } + _ => continue, // Skip unknown conversation types + }; + store.insert_convo(convo); + } + Ok(Self { _identity: identity, - store: ConversationStore::new(), + store, inbox, storage, + ratchet_storage, }) } @@ -93,16 +116,18 @@ impl Context { convo_id: ConversationId, content: &[u8], ) -> Result, ChatError> { - // Lookup convo by id - let convo = self.get_convo_mut(convo_id)?; + let convo = self + .store + .get_mut(convo_id) + .ok_or_else(|| ChatError::NoConvo(convo_id.into()))?; - // Generate encrypted payloads let payloads = convo.send_message(content)?; + let remote_id = convo.remote_id(); + convo.save_ratchet_state(&mut self.ratchet_storage)?; - // Attach conversation_ids to Envelopes Ok(payloads .into_iter() - .map(|p| p.into_envelope(convo.remote_id())) + .map(|p| p.into_envelope(remote_id.clone())) .collect()) } @@ -151,7 +176,12 @@ impl Context { return Err(ChatError::Protocol("convo id not found".into())); }; - convo.handle_frame(enc_payload) + let result = convo.handle_frame(enc_payload)?; + + // Persist updated ratchet state + convo.save_ratchet_state(&mut self.ratchet_storage)?; + + Ok(result) } pub fn create_intro_bundle(&mut self) -> Result, ChatError> { @@ -162,21 +192,16 @@ impl Context { } fn add_convo(&mut self, convo: Box) -> ConversationIdOwned { - // Persist conversation metadata to storage + // Persist conversation metadata and ratchet state let _ = self.storage.save_conversation( convo.id(), &convo.remote_id(), convo.convo_type(), ); + let _ = convo.save_ratchet_state(&mut self.ratchet_storage); self.store.insert_convo(convo) } - // Returns a mutable reference to a Convo for a given ConvoId - fn get_convo_mut(&mut self, convo_id: ConversationId) -> Result<&mut dyn Convo, ChatError> { - self.store - .get_mut(convo_id) - .ok_or_else(|| ChatError::NoConvo(convo_id.into())) - } } #[cfg(test)] @@ -247,7 +272,6 @@ mod tests { #[test] fn identity_persistence() { - // Use file-based storage to test real persistence let dir = tempfile::tempdir().unwrap(); let db_path = dir .path() @@ -256,18 +280,15 @@ mod tests { .to_string(); let config = StorageConfig::File(db_path); - // Create context - this should create and save a new identity let ctx1 = Context::open("alice", config.clone()).unwrap(); let pubkey1 = ctx1._identity.public_key(); let name1 = ctx1.installation_name().to_string(); - // Drop and reopen - should load the same identity drop(ctx1); let ctx2 = Context::open("alice", config).unwrap(); let pubkey2 = ctx2._identity.public_key(); let name2 = ctx2.installation_name().to_string(); - // Identity should be the same assert_eq!(pubkey1, pubkey2, "public key should persist"); assert_eq!(name1, name2, "name should persist"); } @@ -282,20 +303,16 @@ mod tests { .to_string(); let config = StorageConfig::File(db_path); - // Create context and generate an intro bundle (creates ephemeral key) let mut ctx1 = Context::open("alice", config.clone()).unwrap(); let bundle1 = ctx1.create_intro_bundle().unwrap(); - // Drop and reopen - ephemeral keys should be restored from db drop(ctx1); let mut ctx2 = Context::open("alice", config.clone()).unwrap(); - // Use the intro bundle from before restart to start a conversation let intro = Introduction::try_from(bundle1.as_slice()).unwrap(); let mut bob = Context::new_with_name("bob"); let (_, payloads) = bob.create_private_convo(&intro, b"hello after restart"); - // Alice (ctx2) should be able to handle the payload using the persisted ephemeral key let payload = payloads.first().unwrap(); let content = ctx2 .handle_payload(&payload.data) @@ -315,7 +332,6 @@ mod tests { .to_string(); let config = StorageConfig::File(db_path); - // Create context, establish a conversation let mut alice = Context::open("alice", config.clone()).unwrap(); let mut bob = Context::new_with_name("bob"); @@ -327,17 +343,73 @@ mod tests { let content = alice.handle_payload(&payload.data).unwrap().unwrap(); assert!(content.is_new_convo); - // Verify conversation metadata was persisted let convos = alice.storage.load_conversations().unwrap(); assert_eq!(convos.len(), 1); assert_eq!(convos[0].convo_type, "private_v1"); - assert!(!convos[0].local_convo_id.is_empty()); - assert!(!convos[0].remote_convo_id.is_empty()); - // Drop and reopen - metadata should still be there drop(alice); let alice2 = Context::open("alice", config).unwrap(); let convos = alice2.storage.load_conversations().unwrap(); assert_eq!(convos.len(), 1, "conversation metadata should persist"); } + + #[test] + fn conversation_full_persistence() { + let dir = tempfile::tempdir().unwrap(); + let db_path = dir + .path() + .join("test_full_persist.db") + .to_string_lossy() + .to_string(); + let config = StorageConfig::File(db_path); + + // Alice and Bob establish a conversation + let mut alice = Context::open("alice", config.clone()).unwrap(); + let mut bob = Context::new_with_name("bob"); + + let bundle = alice.create_intro_bundle().unwrap(); + let intro = Introduction::try_from(bundle.as_slice()).unwrap(); + let (bob_convo_id, payloads) = bob.create_private_convo(&intro, b"hello"); + + let payload = payloads.first().unwrap(); + let content = alice.handle_payload(&payload.data).unwrap().unwrap(); + let alice_convo_id = content.conversation_id; + + // Exchange a few messages to advance ratchet state + let payloads = alice.send_content(&alice_convo_id, b"reply 1").unwrap(); + let payload = payloads.first().unwrap(); + bob.handle_payload(&payload.data).unwrap().unwrap(); + + let payloads = bob.send_content(&bob_convo_id, b"reply 2").unwrap(); + let payload = payloads.first().unwrap(); + alice.handle_payload(&payload.data).unwrap().unwrap(); + + // Drop Alice and reopen - conversation should survive + drop(alice); + let mut alice2 = Context::open("alice", config).unwrap(); + + // Verify conversation was restored + let convo_ids = alice2.list_conversations().unwrap(); + assert_eq!(convo_ids.len(), 1); + + // Bob sends a new message - Alice should be able to decrypt after restart + let payloads = bob.send_content(&bob_convo_id, b"after restart").unwrap(); + let payload = payloads.first().unwrap(); + let content = alice2 + .handle_payload(&payload.data) + .expect("should decrypt after restart") + .expect("should have content"); + assert_eq!(content.data, b"after restart"); + + // Alice can also send back + let payloads = alice2 + .send_content(&alice_convo_id, b"alice after restart") + .unwrap(); + let payload = payloads.first().unwrap(); + let content = bob + .handle_payload(&payload.data) + .unwrap() + .expect("bob should receive"); + assert_eq!(content.data, b"alice after restart"); + } } diff --git a/conversations/src/conversation.rs b/conversations/src/conversation.rs index 329993b..db4e76d 100644 --- a/conversations/src/conversation.rs +++ b/conversations/src/conversation.rs @@ -4,6 +4,7 @@ use std::sync::Arc; pub use crate::errors::ChatError; use crate::types::{AddressedEncryptedPayload, ContentData}; +use double_ratchets::RatchetStorage; pub type ConversationId<'a> = &'a str; pub type ConversationIdOwned = Arc; @@ -30,6 +31,11 @@ pub trait Convo: Id + Debug { /// Returns the conversation type identifier for storage. fn convo_type(&self) -> &str; + + /// Persists ratchet state to storage. Default is no-op. + fn save_ratchet_state(&self, _storage: &mut RatchetStorage) -> Result<(), ChatError> { + Ok(()) + } } pub struct ConversationStore { diff --git a/conversations/src/conversation/privatev1.rs b/conversations/src/conversation/privatev1.rs index 5273bf1..3cd506d 100644 --- a/conversations/src/conversation/privatev1.rs +++ b/conversations/src/conversation/privatev1.rs @@ -18,6 +18,7 @@ use crate::{ types::{AddressedEncryptedPayload, ContentData}, utils::timestamp_millis, }; +use double_ratchets::RatchetStorage; // Represents the potential participant roles in this Conversation enum Role { @@ -77,6 +78,19 @@ impl PrivateV1Convo { } } + /// Reconstructs a PrivateV1Convo from persisted metadata and ratchet state. + pub fn from_stored( + local_convo_id: String, + remote_convo_id: String, + dr_state: RatchetState, + ) -> Self { + Self { + local_convo_id, + remote_convo_id, + dr_state, + } + } + pub fn new_responder(seed_key: SymmetricKey32, dh_self: &PrivateKey) -> Self { let base_convo_id = BaseConvoId::new(&seed_key); let local_convo_id = base_convo_id.id_for_participant(Role::Responder); @@ -213,6 +227,11 @@ impl Convo for PrivateV1Convo { fn convo_type(&self) -> &str { "private_v1" } + + fn save_ratchet_state(&self, storage: &mut RatchetStorage) -> Result<(), ChatError> { + storage.save(&self.local_convo_id, &self.dr_state)?; + Ok(()) + } } impl Debug for PrivateV1Convo { diff --git a/double-ratchets/src/storage/db.rs b/double-ratchets/src/storage/db.rs index 43b3f4b..c69d813 100644 --- a/double-ratchets/src/storage/db.rs +++ b/double-ratchets/src/storage/db.rs @@ -59,6 +59,12 @@ impl RatchetStorage { Self::run_migration(db) } + /// Creates a ratchet storage from a generic storage configuration. + pub fn from_config(config: storage::StorageConfig) -> Result { + let db = SqliteDb::new(config)?; + Self::run_migration(db) + } + /// Creates a new ratchet storage with the given database. fn run_migration(db: SqliteDb) -> Result { // Initialize schema