feat: delegate DR storage oprations to dr crate

This commit is contained in:
kaichaosun 2026-02-05 14:52:04 +08:00
parent d27b439c2d
commit fa5e48e8de
No known key found for this signature in database
GPG Key ID: 223E0F992F4F03BF
5 changed files with 87 additions and 329 deletions

View File

@ -6,6 +6,8 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::rc::Rc; use std::rc::Rc;
use double_ratchets::storage::RatchetStorage;
use crate::{ use crate::{
common::{Chat, HasChatId, InboundMessageHandler}, common::{Chat, HasChatId, InboundMessageHandler},
dm::privatev1::PrivateV1Convo, dm::privatev1::PrivateV1Convo,
@ -67,7 +69,10 @@ pub struct ChatManager {
/// In-memory cache of active chats. Chats are loaded from storage on demand. /// In-memory cache of active chats. Chats are loaded from storage on demand.
chats: HashMap<String, PrivateV1Convo>, chats: HashMap<String, PrivateV1Convo>,
inbox: Inbox, inbox: Inbox,
/// Storage for chat metadata (identity, inbox keys, chat records).
storage: ChatStorage, storage: ChatStorage,
/// Storage for ratchet state (delegated to double-ratchets crate).
ratchet_storage: RatchetStorage,
} }
impl ChatManager { impl ChatManager {
@ -78,7 +83,10 @@ impl ChatManager {
/// ///
/// Inbox ephemeral keys are loaded lazily when handling incoming handshakes. /// Inbox ephemeral keys are loaded lazily when handling incoming handshakes.
pub fn open(config: StorageConfig) -> Result<Self, ChatManagerError> { pub fn open(config: StorageConfig) -> Result<Self, ChatManagerError> {
let mut storage = ChatStorage::new(config)?; let mut storage = ChatStorage::new(config.clone())?;
// Initialize ratchet storage (delegated to double-ratchets crate)
let ratchet_storage = RatchetStorage::with_config(config)?;
// Load or create identity // Load or create identity
let identity = if let Some(identity) = storage.load_identity()? { let identity = if let Some(identity) = storage.load_identity()? {
@ -100,6 +108,7 @@ impl ChatManager {
chats: HashMap::new(), chats: HashMap::new(),
inbox, inbox,
storage, storage,
ratchet_storage,
}) })
} }
@ -160,10 +169,8 @@ impl ChatManager {
); );
self.storage.save_chat(&chat_record)?; self.storage.save_chat(&chat_record)?;
// Persist ratchet state // Persist ratchet state (delegated to double-ratchets storage)
let (state, skipped_keys) = convo.to_storage(); self.ratchet_storage.save(&chat_id, convo.ratchet_state())?;
self.storage
.save_ratchet_state(&chat_id, &state, &skipped_keys)?;
// Store in memory cache // Store in memory cache
self.chats.insert(chat_id.clone(), convo); self.chats.insert(chat_id.clone(), convo);
@ -189,10 +196,8 @@ impl ChatManager {
let payloads = chat.send_message(content)?; let payloads = chat.send_message(content)?;
// Persist updated ratchet state // Persist updated ratchet state (delegated to double-ratchets storage)
let (state, skipped_keys) = chat.to_storage(); self.ratchet_storage.save(chat_id, chat.ratchet_state())?;
self.storage
.save_ratchet_state(chat_id, &state, &skipped_keys)?;
let remote_id = chat.remote_id(); let remote_id = chat.remote_id();
Ok(payloads Ok(payloads
@ -207,9 +212,10 @@ impl ChatManager {
return Ok(()); return Ok(());
} }
// Try to load from storage // Try to load ratchet state from double-ratchets storage
if let Some((state, skipped_keys)) = self.storage.load_ratchet_state(chat_id)? { if self.ratchet_storage.exists(chat_id)? {
let convo = PrivateV1Convo::from_storage(chat_id.to_string(), state, skipped_keys); let dr_state = self.ratchet_storage.load(chat_id)?;
let convo = PrivateV1Convo::from_state(chat_id.to_string(), dr_state);
self.chats.insert(chat_id.to_string(), convo); self.chats.insert(chat_id.to_string(), convo);
Ok(()) Ok(())
} else if self.storage.chat_exists(chat_id)? { } else if self.storage.chat_exists(chat_id)? {
@ -301,6 +307,8 @@ impl ChatManager {
pub fn delete_chat(&mut self, chat_id: &str) -> Result<(), ChatManagerError> { pub fn delete_chat(&mut self, chat_id: &str) -> Result<(), ChatManagerError> {
self.chats.remove(chat_id); self.chats.remove(chat_id);
self.storage.delete_chat(chat_id)?; self.storage.delete_chat(chat_id)?;
// Also delete ratchet state from double-ratchets storage
let _ = self.ratchet_storage.delete(chat_id);
Ok(()) Ok(())
} }
} }
@ -419,10 +427,9 @@ mod tests {
// Scope 1: Create chat and send messages // Scope 1: Create chat and send messages
{ {
let mut alice = ChatManager::open(StorageConfig::File( let mut alice =
db_path.to_str().unwrap().to_string(), ChatManager::open(StorageConfig::File(db_path.to_str().unwrap().to_string()))
)) .unwrap();
.unwrap();
let result = alice.start_private_chat(&bob_intro, "Message 1").unwrap(); let result = alice.start_private_chat(&bob_intro, "Message 1").unwrap();
chat_id = result.0; chat_id = result.0;
@ -438,10 +445,9 @@ mod tests {
// Scope 2: Reopen and verify chat is restored // Scope 2: Reopen and verify chat is restored
{ {
let mut alice2 = ChatManager::open(StorageConfig::File( let mut alice2 =
db_path.to_str().unwrap().to_string(), ChatManager::open(StorageConfig::File(db_path.to_str().unwrap().to_string()))
)) .unwrap();
.unwrap();
// Chat is in storage but not loaded yet // Chat is in storage but not loaded yet
assert!(alice2.list_stored_chats().unwrap().contains(&chat_id)); assert!(alice2.list_stored_chats().unwrap().contains(&chat_id));

View File

@ -12,7 +12,6 @@ use crate::{
common::{Chat, ChatId, HasChatId}, common::{Chat, ChatId, HasChatId},
errors::{ChatError, EncryptionError}, errors::{ChatError, EncryptionError},
proto, proto,
storage::types::{RatchetStateRecord, SkippedKeyRecord},
types::AddressedEncryptedPayload, types::AddressedEncryptedPayload,
utils::timestamp_millis, utils::timestamp_millis,
}; };
@ -34,7 +33,11 @@ impl PrivateV1Convo {
} }
} }
pub fn new_responder(chat_id: String, seed_key: SecretKey, dh_self: InstallationKeyPair) -> Self { pub fn new_responder(
chat_id: String,
seed_key: SecretKey,
dh_self: InstallationKeyPair,
) -> Self {
Self { Self {
chat_id, chat_id,
// TODO: Danger - Fix double-ratchets types to Accept SecretKey // TODO: Danger - Fix double-ratchets types to Accept SecretKey
@ -42,62 +45,14 @@ impl PrivateV1Convo {
} }
} }
/// Restore a conversation from stored ratchet state. /// Restore a conversation from a loaded RatchetState.
pub fn from_storage( pub fn from_state(chat_id: String, dr_state: RatchetState) -> Self {
chat_id: String,
state: RatchetStateRecord,
skipped_keys: Vec<SkippedKeyRecord>,
) -> Self {
use std::collections::HashMap;
let dh_self = InstallationKeyPair::from_secret_bytes(state.dh_self_secret);
let dh_remote = state.dh_remote.map(PublicKey::from);
let skipped: HashMap<(PublicKey, u32), [u8; 32]> = skipped_keys
.into_iter()
.map(|sk| ((PublicKey::from(sk.public_key), sk.msg_num), sk.message_key))
.collect();
let dr_state = RatchetState::from_parts(
state.root_key,
state.sending_chain,
state.receiving_chain,
dh_self,
dh_remote,
state.msg_send,
state.msg_recv,
state.prev_chain_len,
skipped,
);
Self { chat_id, dr_state } Self { chat_id, dr_state }
} }
/// Get the current ratchet state for storage. /// Get a reference to the ratchet state for storage.
pub fn to_storage(&self) -> (RatchetStateRecord, Vec<SkippedKeyRecord>) { pub fn ratchet_state(&self) -> &RatchetState {
let state = RatchetStateRecord { &self.dr_state
root_key: self.dr_state.root_key,
sending_chain: self.dr_state.sending_chain,
receiving_chain: self.dr_state.receiving_chain,
dh_self_secret: *self.dr_state.dh_self.secret_bytes(),
dh_remote: self.dr_state.dh_remote.map(|pk| pk.to_bytes()),
msg_send: self.dr_state.msg_send,
msg_recv: self.dr_state.msg_recv,
prev_chain_len: self.dr_state.prev_chain_len,
};
let skipped_keys: Vec<SkippedKeyRecord> = self
.dr_state
.skipped_keys
.iter()
.map(|((pk, msg_num), key)| SkippedKeyRecord {
public_key: pk.to_bytes(),
msg_num: *msg_num,
message_key: *key,
})
.collect();
(state, skipped_keys)
} }
fn encrypt(&mut self, frame: PrivateV1Frame) -> EncryptedPayload { fn encrypt(&mut self, frame: PrivateV1Frame) -> EncryptedPayload {

View File

@ -5,10 +5,11 @@ use std::collections::HashMap;
use storage::{RusqliteError, SqliteDb, StorageConfig, StorageError, params}; use storage::{RusqliteError, SqliteDb, StorageConfig, StorageError, params};
use x25519_dalek::StaticSecret; use x25519_dalek::StaticSecret;
use super::types::{ChatRecord, IdentityRecord, RatchetStateRecord, SkippedKeyRecord}; use super::types::{ChatRecord, IdentityRecord};
use crate::identity::Identity; use crate::identity::Identity;
/// Schema for chat storage tables. /// Schema for chat storage tables.
/// Note: Ratchet state is stored by double_ratchets::RatchetStorage separately.
const CHAT_SCHEMA: &str = " const CHAT_SCHEMA: &str = "
-- Identity table (single row) -- Identity table (single row)
CREATE TABLE IF NOT EXISTS identity ( CREATE TABLE IF NOT EXISTS identity (
@ -33,39 +34,14 @@ const CHAT_SCHEMA: &str = "
); );
CREATE INDEX IF NOT EXISTS idx_chats_type ON chats(chat_type); CREATE INDEX IF NOT EXISTS idx_chats_type ON chats(chat_type);
-- Ratchet state for each conversation
CREATE TABLE IF NOT EXISTS ratchet_state (
conversation_id TEXT PRIMARY KEY,
root_key BLOB NOT NULL,
sending_chain BLOB,
receiving_chain BLOB,
dh_self_secret BLOB NOT NULL,
dh_remote BLOB,
msg_send INTEGER NOT NULL,
msg_recv INTEGER NOT NULL,
prev_chain_len INTEGER NOT NULL
);
-- Skipped message keys (for out-of-order messages)
CREATE TABLE IF NOT EXISTS skipped_keys (
conversation_id TEXT NOT NULL,
public_key BLOB NOT NULL,
msg_num INTEGER NOT NULL,
message_key BLOB NOT NULL,
created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
PRIMARY KEY (conversation_id, public_key, msg_num),
FOREIGN KEY (conversation_id) REFERENCES ratchet_state(conversation_id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_skipped_keys_conversation
ON skipped_keys(conversation_id);
"; ";
/// Chat-specific storage operations. /// Chat-specific storage operations.
/// ///
/// This struct wraps a `SqliteDb` and provides domain-specific /// This struct wraps a SqliteDb and provides domain-specific
/// storage operations for chat state. /// storage operations for chat state (identity, inbox keys, chat metadata).
///
/// Note: Ratchet state persistence is delegated to double_ratchets::RatchetStorage.
pub struct ChatStorage { pub struct ChatStorage {
db: SqliteDb, db: SqliteDb,
} }
@ -125,35 +101,6 @@ impl ChatStorage {
} }
} }
/// Saves a chat record.
pub fn save_chat(&mut self, chat: &ChatRecord) -> Result<(), StorageError> {
self.db.connection().execute(
"INSERT OR REPLACE INTO chats (chat_id, chat_type, remote_public_key, remote_address, created_at)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![
chat.chat_id,
chat.chat_type,
chat.remote_public_key.as_ref().map(|k| k.as_slice()),
chat.remote_address,
chat.created_at,
],
)?;
Ok(())
}
/// Lists all chat IDs.
pub fn list_chat_ids(&self) -> Result<Vec<String>, StorageError> {
let mut stmt = self.db.connection().prepare("SELECT chat_id FROM chats")?;
let rows = stmt.query_map([], |row| row.get(0))?;
let mut ids = Vec::new();
for row in rows {
ids.push(row?);
}
Ok(ids)
}
// ==================== Inbox Key Operations ==================== // ==================== Inbox Key Operations ====================
/// Saves an inbox ephemeral key. /// Saves an inbox ephemeral key.
@ -230,7 +177,23 @@ impl ChatStorage {
Ok(()) Ok(())
} }
// ==================== Chat Operations ==================== // ==================== Chat Metadata Operations ====================
/// Saves a chat record.
pub fn save_chat(&mut self, chat: &ChatRecord) -> Result<(), StorageError> {
self.db.connection().execute(
"INSERT OR REPLACE INTO chats (chat_id, chat_type, remote_public_key, remote_address, created_at)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![
chat.chat_id,
chat.chat_type,
chat.remote_public_key.as_ref().map(|k| k.as_slice()),
chat.remote_address,
chat.created_at,
],
)?;
Ok(())
}
/// Loads a chat record by ID. /// Loads a chat record by ID.
pub fn load_chat(&self, chat_id: &str) -> Result<Option<ChatRecord>, StorageError> { pub fn load_chat(&self, chat_id: &str) -> Result<Option<ChatRecord>, StorageError> {
@ -273,6 +236,19 @@ impl ChatStorage {
} }
} }
/// Lists all chat IDs.
pub fn list_chat_ids(&self) -> Result<Vec<String>, StorageError> {
let mut stmt = self.db.connection().prepare("SELECT chat_id FROM chats")?;
let rows = stmt.query_map([], |row| row.get(0))?;
let mut ids = Vec::new();
for row in rows {
ids.push(row?);
}
Ok(ids)
}
/// Checks if a chat exists in storage. /// Checks if a chat exists in storage.
pub fn chat_exists(&self, chat_id: &str) -> Result<bool, StorageError> { pub fn chat_exists(&self, chat_id: &str) -> Result<bool, StorageError> {
let mut stmt = self let mut stmt = self
@ -284,181 +260,14 @@ impl ChatStorage {
Ok(exists) Ok(exists)
} }
/// Deletes a chat record and its ratchet state. /// Deletes a chat record.
/// Note: Ratchet state must be deleted separately via RatchetStorage.
pub fn delete_chat(&mut self, chat_id: &str) -> Result<(), StorageError> { pub fn delete_chat(&mut self, chat_id: &str) -> Result<(), StorageError> {
let tx = self.db.transaction()?; self.db
// Delete skipped keys first (foreign key constraint) .connection()
tx.execute( .execute("DELETE FROM chats WHERE chat_id = ?1", params![chat_id])?;
"DELETE FROM skipped_keys WHERE conversation_id = ?1",
params![chat_id],
)?;
tx.execute(
"DELETE FROM ratchet_state WHERE conversation_id = ?1",
params![chat_id],
)?;
tx.execute("DELETE FROM chats WHERE chat_id = ?1", params![chat_id])?;
tx.commit()?;
Ok(()) Ok(())
} }
// ==================== Ratchet State Operations ====================
/// Saves the ratchet state for a conversation.
pub fn save_ratchet_state(
&mut self,
conversation_id: &str,
state: &RatchetStateRecord,
skipped_keys: &[SkippedKeyRecord],
) -> Result<(), StorageError> {
let tx = self.db.transaction()?;
// Upsert main state
tx.execute(
"
INSERT INTO ratchet_state (
conversation_id, root_key, sending_chain, receiving_chain,
dh_self_secret, dh_remote, msg_send, msg_recv, prev_chain_len
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
ON CONFLICT(conversation_id) DO UPDATE SET
root_key = excluded.root_key,
sending_chain = excluded.sending_chain,
receiving_chain = excluded.receiving_chain,
dh_self_secret = excluded.dh_self_secret,
dh_remote = excluded.dh_remote,
msg_send = excluded.msg_send,
msg_recv = excluded.msg_recv,
prev_chain_len = excluded.prev_chain_len
",
params![
conversation_id,
state.root_key.as_slice(),
state.sending_chain.as_ref().map(|c| c.as_slice()),
state.receiving_chain.as_ref().map(|c| c.as_slice()),
state.dh_self_secret.as_slice(),
state.dh_remote.as_ref().map(|c| c.as_slice()),
state.msg_send,
state.msg_recv,
state.prev_chain_len,
],
)?;
// Sync skipped keys: delete old ones and insert new
tx.execute(
"DELETE FROM skipped_keys WHERE conversation_id = ?1",
params![conversation_id],
)?;
for sk in skipped_keys {
tx.execute(
"INSERT INTO skipped_keys (conversation_id, public_key, msg_num, message_key)
VALUES (?1, ?2, ?3, ?4)",
params![
conversation_id,
sk.public_key.as_slice(),
sk.msg_num,
sk.message_key.as_slice(),
],
)?;
}
tx.commit()?;
Ok(())
}
/// Loads the ratchet state for a conversation.
pub fn load_ratchet_state(
&self,
conversation_id: &str,
) -> Result<Option<(RatchetStateRecord, Vec<SkippedKeyRecord>)>, StorageError> {
// Load main state
let state = self.load_ratchet_state_data(conversation_id)?;
let state = match state {
Some(s) => s,
None => return Ok(None),
};
// Load skipped keys
let skipped_keys = self.load_skipped_keys(conversation_id)?;
Ok(Some((state, skipped_keys)))
}
fn load_ratchet_state_data(
&self,
conversation_id: &str,
) -> Result<Option<RatchetStateRecord>, StorageError> {
let conn = self.db.connection();
let mut stmt = conn.prepare(
"
SELECT root_key, sending_chain, receiving_chain, dh_self_secret,
dh_remote, msg_send, msg_recv, prev_chain_len
FROM ratchet_state
WHERE conversation_id = ?1
",
)?;
let result = stmt.query_row(params![conversation_id], |row| {
Ok(RatchetStateRecord {
root_key: blob_to_array(row.get::<_, Vec<u8>>(0)?),
sending_chain: row.get::<_, Option<Vec<u8>>>(1)?.map(blob_to_array),
receiving_chain: row.get::<_, Option<Vec<u8>>>(2)?.map(blob_to_array),
dh_self_secret: blob_to_array(row.get::<_, Vec<u8>>(3)?),
dh_remote: row.get::<_, Option<Vec<u8>>>(4)?.map(blob_to_array),
msg_send: row.get(5)?,
msg_recv: row.get(6)?,
prev_chain_len: row.get(7)?,
})
});
match result {
Ok(record) => Ok(Some(record)),
Err(RusqliteError::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(StorageError::Database(e.to_string())),
}
}
fn load_skipped_keys(
&self,
conversation_id: &str,
) -> Result<Vec<SkippedKeyRecord>, StorageError> {
let conn = self.db.connection();
let mut stmt = conn.prepare(
"
SELECT public_key, msg_num, message_key
FROM skipped_keys
WHERE conversation_id = ?1
",
)?;
let rows = stmt.query_map(params![conversation_id], |row| {
Ok(SkippedKeyRecord {
public_key: blob_to_array(row.get::<_, Vec<u8>>(0)?),
msg_num: row.get(1)?,
message_key: blob_to_array(row.get::<_, Vec<u8>>(2)?),
})
})?;
rows.collect::<Result<Vec<_>, _>>()
.map_err(|e| StorageError::Database(e.to_string()))
}
/// Checks if a ratchet state exists for a conversation.
pub fn ratchet_state_exists(&self, conversation_id: &str) -> Result<bool, StorageError> {
let conn = self.db.connection();
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM ratchet_state WHERE conversation_id = ?1",
params![conversation_id],
|row| row.get(0),
)?;
Ok(count > 0)
}
}
/// Helper to convert a Vec<u8> to a fixed-size array.
fn blob_to_array(blob: Vec<u8>) -> [u8; 32] {
let mut arr = [0u8; 32];
arr.copy_from_slice(&blob);
arr
} }
#[cfg(test)] #[cfg(test)]

View File

@ -1,4 +1,7 @@
//! Storage record types for serialization/deserialization. //! Storage record types for serialization/deserialization.
//!
//! Note: Ratchet state types (RatchetStateRecord, SkippedKeyRecord) are in
//! double_ratchets::storage module and handled by RatchetStorage.
use x25519_dalek::{PublicKey, StaticSecret}; use x25519_dalek::{PublicKey, StaticSecret};
@ -27,7 +30,7 @@ impl From<IdentityRecord> for Identity {
} }
/// Record for storing chat metadata. /// Record for storing chat metadata.
/// Note: The actual double ratchet state is stored separately by the DR storage. /// Note: The actual double ratchet state is stored separately by RatchetStorage.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ChatRecord { pub struct ChatRecord {
/// Unique chat identifier. /// Unique chat identifier.
@ -57,24 +60,3 @@ impl ChatRecord {
} }
} }
} }
/// Raw ratchet state data for SQLite storage.
#[derive(Debug, Clone)]
pub struct RatchetStateRecord {
pub root_key: [u8; 32],
pub sending_chain: Option<[u8; 32]>,
pub receiving_chain: Option<[u8; 32]>,
pub dh_self_secret: [u8; 32],
pub dh_remote: Option<[u8; 32]>,
pub msg_send: u32,
pub msg_recv: u32,
pub prev_chain_len: u32,
}
/// Skipped key record for out-of-order message handling.
#[derive(Debug, Clone)]
pub struct SkippedKeyRecord {
pub public_key: [u8; 32],
pub msg_num: u32,
pub message_key: [u8; 32],
}

View File

@ -47,6 +47,12 @@ pub struct RatchetStorage {
} }
impl RatchetStorage { impl RatchetStorage {
/// Creates a new RatchetStorage with the given configuration.
pub fn with_config(config: storage::StorageConfig) -> Result<Self, StorageError> {
let db = SqliteDb::new(config)?;
Self::run_migration(db)
}
/// Opens an existing encrypted database file. /// Opens an existing encrypted database file.
pub fn new(path: &str, key: &str) -> Result<Self, StorageError> { pub fn new(path: &str, key: &str) -> Result<Self, StorageError> {
let db = SqliteDb::sqlcipher(path.to_string(), key.to_string())?; let db = SqliteDb::sqlcipher(path.to_string(), key.to_string())?;