feat: wire with the double ratchet storage

This commit is contained in:
kaichaosun 2026-03-13 10:30:45 +08:00
parent 330059fd2d
commit bc9397d2e0
No known key found for this signature in database
GPG Key ID: 223E0F992F4F03BF
4 changed files with 132 additions and 29 deletions

View File

@ -1,9 +1,10 @@
use std::rc::Rc; use std::rc::Rc;
use double_ratchets::{RatchetState, RatchetStorage};
use storage::StorageConfig; use storage::StorageConfig;
use crate::{ use crate::{
conversation::{ConversationId, ConversationStore, Convo, Id}, conversation::{ConversationId, ConversationStore, Convo, Id, PrivateV1Convo},
errors::ChatError, errors::ChatError,
identity::Identity, identity::Identity,
inbox::Inbox, inbox::Inbox,
@ -22,6 +23,7 @@ pub struct Context {
store: ConversationStore, store: ConversationStore,
inbox: Inbox, inbox: Inbox,
storage: ChatStorage, storage: ChatStorage,
ratchet_storage: RatchetStorage,
} }
impl Context { impl Context {
@ -30,7 +32,8 @@ impl Context {
/// If an identity exists in storage, it will be restored. /// If an identity exists in storage, it will be restored.
/// Otherwise, a new identity will be created with the given name and saved. /// Otherwise, a new identity will be created with the given name and saved.
pub fn open(name: impl Into<String>, config: StorageConfig) -> Result<Self, ChatError> { pub fn open(name: impl Into<String>, config: StorageConfig) -> Result<Self, ChatError> {
let mut storage = ChatStorage::new(config)?; let mut storage = ChatStorage::new(config.clone())?;
let ratchet_storage = RatchetStorage::from_config(config)?;
let name = name.into(); let name = name.into();
// Load or create identity // Load or create identity
@ -45,11 +48,31 @@ impl Context {
let identity = Rc::new(identity); let identity = Rc::new(identity);
let inbox = Inbox::new(Rc::clone(&identity)); let inbox = Inbox::new(Rc::clone(&identity));
// Restore persisted conversations
let mut store = ConversationStore::new();
let conversation_records = storage.load_conversations()?;
for record in conversation_records {
let convo: Box<dyn Convo> = match record.convo_type.as_str() {
"private_v1" => {
let dr_state: RatchetState =
ratchet_storage.load(&record.local_convo_id)?;
Box::new(PrivateV1Convo::from_stored(
record.local_convo_id,
record.remote_convo_id,
dr_state,
))
}
_ => continue, // Skip unknown conversation types
};
store.insert_convo(convo);
}
Ok(Self { Ok(Self {
_identity: identity, _identity: identity,
store: ConversationStore::new(), store,
inbox, inbox,
storage, storage,
ratchet_storage,
}) })
} }
@ -93,16 +116,18 @@ impl Context {
convo_id: ConversationId, convo_id: ConversationId,
content: &[u8], content: &[u8],
) -> Result<Vec<AddressedEnvelope>, ChatError> { ) -> Result<Vec<AddressedEnvelope>, ChatError> {
// Lookup convo by id let convo = self
let convo = self.get_convo_mut(convo_id)?; .store
.get_mut(convo_id)
.ok_or_else(|| ChatError::NoConvo(convo_id.into()))?;
// Generate encrypted payloads
let payloads = convo.send_message(content)?; let payloads = convo.send_message(content)?;
let remote_id = convo.remote_id();
convo.save_ratchet_state(&mut self.ratchet_storage)?;
// Attach conversation_ids to Envelopes
Ok(payloads Ok(payloads
.into_iter() .into_iter()
.map(|p| p.into_envelope(convo.remote_id())) .map(|p| p.into_envelope(remote_id.clone()))
.collect()) .collect())
} }
@ -151,7 +176,12 @@ impl Context {
return Err(ChatError::Protocol("convo id not found".into())); return Err(ChatError::Protocol("convo id not found".into()));
}; };
convo.handle_frame(enc_payload) let result = convo.handle_frame(enc_payload)?;
// Persist updated ratchet state
convo.save_ratchet_state(&mut self.ratchet_storage)?;
Ok(result)
} }
pub fn create_intro_bundle(&mut self) -> Result<Vec<u8>, ChatError> { pub fn create_intro_bundle(&mut self) -> Result<Vec<u8>, ChatError> {
@ -162,21 +192,16 @@ impl Context {
} }
fn add_convo(&mut self, convo: Box<dyn Convo>) -> ConversationIdOwned { fn add_convo(&mut self, convo: Box<dyn Convo>) -> ConversationIdOwned {
// Persist conversation metadata to storage // Persist conversation metadata and ratchet state
let _ = self.storage.save_conversation( let _ = self.storage.save_conversation(
convo.id(), convo.id(),
&convo.remote_id(), &convo.remote_id(),
convo.convo_type(), convo.convo_type(),
); );
let _ = convo.save_ratchet_state(&mut self.ratchet_storage);
self.store.insert_convo(convo) self.store.insert_convo(convo)
} }
// Returns a mutable reference to a Convo for a given ConvoId
fn get_convo_mut(&mut self, convo_id: ConversationId) -> Result<&mut dyn Convo, ChatError> {
self.store
.get_mut(convo_id)
.ok_or_else(|| ChatError::NoConvo(convo_id.into()))
}
} }
#[cfg(test)] #[cfg(test)]
@ -247,7 +272,6 @@ mod tests {
#[test] #[test]
fn identity_persistence() { fn identity_persistence() {
// Use file-based storage to test real persistence
let dir = tempfile::tempdir().unwrap(); let dir = tempfile::tempdir().unwrap();
let db_path = dir let db_path = dir
.path() .path()
@ -256,18 +280,15 @@ mod tests {
.to_string(); .to_string();
let config = StorageConfig::File(db_path); let config = StorageConfig::File(db_path);
// Create context - this should create and save a new identity
let ctx1 = Context::open("alice", config.clone()).unwrap(); let ctx1 = Context::open("alice", config.clone()).unwrap();
let pubkey1 = ctx1._identity.public_key(); let pubkey1 = ctx1._identity.public_key();
let name1 = ctx1.installation_name().to_string(); let name1 = ctx1.installation_name().to_string();
// Drop and reopen - should load the same identity
drop(ctx1); drop(ctx1);
let ctx2 = Context::open("alice", config).unwrap(); let ctx2 = Context::open("alice", config).unwrap();
let pubkey2 = ctx2._identity.public_key(); let pubkey2 = ctx2._identity.public_key();
let name2 = ctx2.installation_name().to_string(); let name2 = ctx2.installation_name().to_string();
// Identity should be the same
assert_eq!(pubkey1, pubkey2, "public key should persist"); assert_eq!(pubkey1, pubkey2, "public key should persist");
assert_eq!(name1, name2, "name should persist"); assert_eq!(name1, name2, "name should persist");
} }
@ -282,20 +303,16 @@ mod tests {
.to_string(); .to_string();
let config = StorageConfig::File(db_path); let config = StorageConfig::File(db_path);
// Create context and generate an intro bundle (creates ephemeral key)
let mut ctx1 = Context::open("alice", config.clone()).unwrap(); let mut ctx1 = Context::open("alice", config.clone()).unwrap();
let bundle1 = ctx1.create_intro_bundle().unwrap(); let bundle1 = ctx1.create_intro_bundle().unwrap();
// Drop and reopen - ephemeral keys should be restored from db
drop(ctx1); drop(ctx1);
let mut ctx2 = Context::open("alice", config.clone()).unwrap(); let mut ctx2 = Context::open("alice", config.clone()).unwrap();
// Use the intro bundle from before restart to start a conversation
let intro = Introduction::try_from(bundle1.as_slice()).unwrap(); let intro = Introduction::try_from(bundle1.as_slice()).unwrap();
let mut bob = Context::new_with_name("bob"); let mut bob = Context::new_with_name("bob");
let (_, payloads) = bob.create_private_convo(&intro, b"hello after restart"); let (_, payloads) = bob.create_private_convo(&intro, b"hello after restart");
// Alice (ctx2) should be able to handle the payload using the persisted ephemeral key
let payload = payloads.first().unwrap(); let payload = payloads.first().unwrap();
let content = ctx2 let content = ctx2
.handle_payload(&payload.data) .handle_payload(&payload.data)
@ -315,7 +332,6 @@ mod tests {
.to_string(); .to_string();
let config = StorageConfig::File(db_path); let config = StorageConfig::File(db_path);
// Create context, establish a conversation
let mut alice = Context::open("alice", config.clone()).unwrap(); let mut alice = Context::open("alice", config.clone()).unwrap();
let mut bob = Context::new_with_name("bob"); let mut bob = Context::new_with_name("bob");
@ -327,17 +343,73 @@ mod tests {
let content = alice.handle_payload(&payload.data).unwrap().unwrap(); let content = alice.handle_payload(&payload.data).unwrap().unwrap();
assert!(content.is_new_convo); assert!(content.is_new_convo);
// Verify conversation metadata was persisted
let convos = alice.storage.load_conversations().unwrap(); let convos = alice.storage.load_conversations().unwrap();
assert_eq!(convos.len(), 1); assert_eq!(convos.len(), 1);
assert_eq!(convos[0].convo_type, "private_v1"); assert_eq!(convos[0].convo_type, "private_v1");
assert!(!convos[0].local_convo_id.is_empty());
assert!(!convos[0].remote_convo_id.is_empty());
// Drop and reopen - metadata should still be there
drop(alice); drop(alice);
let alice2 = Context::open("alice", config).unwrap(); let alice2 = Context::open("alice", config).unwrap();
let convos = alice2.storage.load_conversations().unwrap(); let convos = alice2.storage.load_conversations().unwrap();
assert_eq!(convos.len(), 1, "conversation metadata should persist"); assert_eq!(convos.len(), 1, "conversation metadata should persist");
} }
#[test]
fn conversation_full_persistence() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir
.path()
.join("test_full_persist.db")
.to_string_lossy()
.to_string();
let config = StorageConfig::File(db_path);
// Alice and Bob establish a conversation
let mut alice = Context::open("alice", config.clone()).unwrap();
let mut bob = Context::new_with_name("bob");
let bundle = alice.create_intro_bundle().unwrap();
let intro = Introduction::try_from(bundle.as_slice()).unwrap();
let (bob_convo_id, payloads) = bob.create_private_convo(&intro, b"hello");
let payload = payloads.first().unwrap();
let content = alice.handle_payload(&payload.data).unwrap().unwrap();
let alice_convo_id = content.conversation_id;
// Exchange a few messages to advance ratchet state
let payloads = alice.send_content(&alice_convo_id, b"reply 1").unwrap();
let payload = payloads.first().unwrap();
bob.handle_payload(&payload.data).unwrap().unwrap();
let payloads = bob.send_content(&bob_convo_id, b"reply 2").unwrap();
let payload = payloads.first().unwrap();
alice.handle_payload(&payload.data).unwrap().unwrap();
// Drop Alice and reopen - conversation should survive
drop(alice);
let mut alice2 = Context::open("alice", config).unwrap();
// Verify conversation was restored
let convo_ids = alice2.list_conversations().unwrap();
assert_eq!(convo_ids.len(), 1);
// Bob sends a new message - Alice should be able to decrypt after restart
let payloads = bob.send_content(&bob_convo_id, b"after restart").unwrap();
let payload = payloads.first().unwrap();
let content = alice2
.handle_payload(&payload.data)
.expect("should decrypt after restart")
.expect("should have content");
assert_eq!(content.data, b"after restart");
// Alice can also send back
let payloads = alice2
.send_content(&alice_convo_id, b"alice after restart")
.unwrap();
let payload = payloads.first().unwrap();
let content = bob
.handle_payload(&payload.data)
.unwrap()
.expect("bob should receive");
assert_eq!(content.data, b"alice after restart");
}
} }

View File

@ -4,6 +4,7 @@ use std::sync::Arc;
pub use crate::errors::ChatError; pub use crate::errors::ChatError;
use crate::types::{AddressedEncryptedPayload, ContentData}; use crate::types::{AddressedEncryptedPayload, ContentData};
use double_ratchets::RatchetStorage;
pub type ConversationId<'a> = &'a str; pub type ConversationId<'a> = &'a str;
pub type ConversationIdOwned = Arc<str>; pub type ConversationIdOwned = Arc<str>;
@ -30,6 +31,11 @@ pub trait Convo: Id + Debug {
/// Returns the conversation type identifier for storage. /// Returns the conversation type identifier for storage.
fn convo_type(&self) -> &str; fn convo_type(&self) -> &str;
/// Persists ratchet state to storage. Default is no-op.
fn save_ratchet_state(&self, _storage: &mut RatchetStorage) -> Result<(), ChatError> {
Ok(())
}
} }
pub struct ConversationStore { pub struct ConversationStore {

View File

@ -18,6 +18,7 @@ use crate::{
types::{AddressedEncryptedPayload, ContentData}, types::{AddressedEncryptedPayload, ContentData},
utils::timestamp_millis, utils::timestamp_millis,
}; };
use double_ratchets::RatchetStorage;
// Represents the potential participant roles in this Conversation // Represents the potential participant roles in this Conversation
enum Role { enum Role {
@ -77,6 +78,19 @@ impl PrivateV1Convo {
} }
} }
/// Reconstructs a PrivateV1Convo from persisted metadata and ratchet state.
pub fn from_stored(
local_convo_id: String,
remote_convo_id: String,
dr_state: RatchetState,
) -> Self {
Self {
local_convo_id,
remote_convo_id,
dr_state,
}
}
pub fn new_responder(seed_key: SymmetricKey32, dh_self: &PrivateKey) -> Self { pub fn new_responder(seed_key: SymmetricKey32, dh_self: &PrivateKey) -> Self {
let base_convo_id = BaseConvoId::new(&seed_key); let base_convo_id = BaseConvoId::new(&seed_key);
let local_convo_id = base_convo_id.id_for_participant(Role::Responder); let local_convo_id = base_convo_id.id_for_participant(Role::Responder);
@ -213,6 +227,11 @@ impl Convo for PrivateV1Convo {
fn convo_type(&self) -> &str { fn convo_type(&self) -> &str {
"private_v1" "private_v1"
} }
fn save_ratchet_state(&self, storage: &mut RatchetStorage) -> Result<(), ChatError> {
storage.save(&self.local_convo_id, &self.dr_state)?;
Ok(())
}
} }
impl Debug for PrivateV1Convo { impl Debug for PrivateV1Convo {

View File

@ -59,6 +59,12 @@ impl RatchetStorage {
Self::run_migration(db) Self::run_migration(db)
} }
/// Creates a ratchet storage from a generic storage configuration.
pub fn from_config(config: storage::StorageConfig) -> Result<Self, StorageError> {
let db = SqliteDb::new(config)?;
Self::run_migration(db)
}
/// Creates a new ratchet storage with the given database. /// Creates a new ratchet storage with the given database.
fn run_migration(db: SqliteDb) -> Result<Self, StorageError> { fn run_migration(db: SqliteDb) -> Result<Self, StorageError> {
// Initialize schema // Initialize schema