mirror of
https://github.com/logos-messaging/libchat.git
synced 2026-02-10 08:53:08 +00:00
feat: integrate double ratchet
This commit is contained in:
parent
22cbb1074c
commit
d27b439c2d
@ -3,10 +3,12 @@
|
||||
//! This is the main entry point for the conversations API. It handles all
|
||||
//! storage operations internally - users don't need to interact with storage directly.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::rc::Rc;
|
||||
|
||||
use crate::{
|
||||
common::{Chat, ChatStore, HasChatId, InboundMessageHandler},
|
||||
common::{Chat, HasChatId, InboundMessageHandler},
|
||||
dm::privatev1::PrivateV1Convo,
|
||||
errors::ChatError,
|
||||
identity::Identity,
|
||||
inbox::{Inbox, Introduction},
|
||||
@ -62,7 +64,8 @@ pub enum ChatManagerError {
|
||||
/// ```
|
||||
pub struct ChatManager {
|
||||
identity: Rc<Identity>,
|
||||
store: ChatStore,
|
||||
/// In-memory cache of active chats. Chats are loaded from storage on demand.
|
||||
chats: HashMap<String, PrivateV1Convo>,
|
||||
inbox: Inbox,
|
||||
storage: ChatStorage,
|
||||
}
|
||||
@ -94,7 +97,7 @@ impl ChatManager {
|
||||
|
||||
Ok(Self {
|
||||
identity,
|
||||
store: ChatStore::new(),
|
||||
chats: HashMap::new(),
|
||||
inbox,
|
||||
storage,
|
||||
})
|
||||
@ -157,8 +160,13 @@ impl ChatManager {
|
||||
);
|
||||
self.storage.save_chat(&chat_record)?;
|
||||
|
||||
// Store in memory
|
||||
self.store.insert_chat(convo);
|
||||
// Persist ratchet state
|
||||
let (state, skipped_keys) = convo.to_storage();
|
||||
self.storage
|
||||
.save_ratchet_state(&chat_id, &state, &skipped_keys)?;
|
||||
|
||||
// Store in memory cache
|
||||
self.chats.insert(chat_id.clone(), convo);
|
||||
|
||||
Ok((chat_id, envelopes))
|
||||
}
|
||||
@ -171,27 +179,50 @@ impl ChatManager {
|
||||
chat_id: &str,
|
||||
content: &[u8],
|
||||
) -> Result<Vec<AddressedEnvelope>, ChatManagerError> {
|
||||
// Try to get chat from memory first
|
||||
let chat = match self.store.get_mut_chat(chat_id) {
|
||||
Some(chat) => chat,
|
||||
None => {
|
||||
// Check if chat exists in storage but not loaded
|
||||
if self.storage.chat_exists(chat_id)? {
|
||||
return Err(ChatManagerError::ChatNotLoaded(chat_id.to_string()));
|
||||
} else {
|
||||
return Err(ChatManagerError::ChatNotFound(chat_id.to_string()));
|
||||
}
|
||||
}
|
||||
};
|
||||
// Try to load chat from storage if not in memory
|
||||
self.ensure_chat_loaded(chat_id)?;
|
||||
|
||||
let chat = self
|
||||
.chats
|
||||
.get_mut(chat_id)
|
||||
.ok_or_else(|| ChatManagerError::ChatNotFound(chat_id.to_string()))?;
|
||||
|
||||
let payloads = chat.send_message(content)?;
|
||||
|
||||
// Persist updated ratchet state
|
||||
let (state, skipped_keys) = chat.to_storage();
|
||||
self.storage
|
||||
.save_ratchet_state(chat_id, &state, &skipped_keys)?;
|
||||
|
||||
let remote_id = chat.remote_id();
|
||||
Ok(payloads
|
||||
.into_iter()
|
||||
.map(|p| p.to_envelope(chat.remote_id()))
|
||||
.map(|p| p.to_envelope(remote_id.clone()))
|
||||
.collect())
|
||||
}
|
||||
|
||||
/// Ensure a chat is loaded into memory. Loads from storage if needed.
|
||||
fn ensure_chat_loaded(&mut self, chat_id: &str) -> Result<(), ChatManagerError> {
|
||||
if self.chats.contains_key(chat_id) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Try to load from storage
|
||||
if let Some((state, skipped_keys)) = self.storage.load_ratchet_state(chat_id)? {
|
||||
let convo = PrivateV1Convo::from_storage(chat_id.to_string(), state, skipped_keys);
|
||||
self.chats.insert(chat_id.to_string(), convo);
|
||||
Ok(())
|
||||
} else if self.storage.chat_exists(chat_id)? {
|
||||
// Chat metadata exists but no ratchet state - this is a data inconsistency
|
||||
Err(ChatManagerError::ChatNotFound(format!(
|
||||
"{} (corrupted: missing ratchet state)",
|
||||
chat_id
|
||||
)))
|
||||
} else {
|
||||
Err(ChatManagerError::ChatNotFound(chat_id.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle an incoming payload from the network.
|
||||
///
|
||||
/// This processes both inbox handshakes (to establish new chats) and
|
||||
@ -205,8 +236,7 @@ impl ChatManager {
|
||||
Ok((chat, content_data)) => {
|
||||
let chat_id = chat.id().to_string();
|
||||
|
||||
// Persist the new chat
|
||||
// Note: We don't have full remote info here, using placeholder
|
||||
// Persist the new chat metadata
|
||||
let chat_record = ChatRecord {
|
||||
chat_id: chat_id.clone(),
|
||||
chat_type: "private_v1".to_string(),
|
||||
@ -216,8 +246,10 @@ impl ChatManager {
|
||||
};
|
||||
self.storage.save_chat(&chat_record)?;
|
||||
|
||||
// Store chat in memory
|
||||
self.store.insert_boxed_chat(chat);
|
||||
// TODO: Persist ratchet state for incoming chats
|
||||
// This requires modifying InboundMessageHandler to return PrivateV1Convo
|
||||
// or adding downcast support. For now, new chats from inbox won't persist
|
||||
// their ratchet state until next send_message call.
|
||||
|
||||
// Return first content if any, otherwise empty
|
||||
if let Some(first) = content_data.into_iter().next() {
|
||||
@ -241,13 +273,15 @@ impl ChatManager {
|
||||
}
|
||||
|
||||
/// Get a reference to an active chat.
|
||||
pub fn get_chat(&self, chat_id: &str) -> Option<&dyn Chat> {
|
||||
self.store.get_chat(chat_id)
|
||||
pub fn get_chat(&mut self, chat_id: &str) -> Option<&PrivateV1Convo> {
|
||||
// Try to load from storage if not in memory
|
||||
let _ = self.ensure_chat_loaded(chat_id);
|
||||
self.chats.get(chat_id)
|
||||
}
|
||||
|
||||
/// List all active chat IDs (in memory).
|
||||
pub fn list_chats(&self) -> Vec<String> {
|
||||
self.store.chat_ids().map(|id| id.to_string()).collect()
|
||||
self.chats.keys().cloned().collect()
|
||||
}
|
||||
|
||||
/// List all chat IDs from storage.
|
||||
@ -257,7 +291,7 @@ impl ChatManager {
|
||||
|
||||
/// Check if a chat exists (in memory or storage).
|
||||
pub fn chat_exists(&self, chat_id: &str) -> Result<bool, ChatManagerError> {
|
||||
if self.store.get_chat(chat_id).is_some() {
|
||||
if self.chats.contains_key(chat_id) {
|
||||
return Ok(true);
|
||||
}
|
||||
Ok(self.storage.chat_exists(chat_id)?)
|
||||
@ -265,7 +299,7 @@ impl ChatManager {
|
||||
|
||||
/// Delete a chat from both memory and storage.
|
||||
pub fn delete_chat(&mut self, chat_id: &str) -> Result<(), ChatManagerError> {
|
||||
self.store.remove_chat(chat_id);
|
||||
self.chats.remove(chat_id);
|
||||
self.storage.delete_chat(chat_id)?;
|
||||
Ok(())
|
||||
}
|
||||
@ -369,4 +403,56 @@ mod tests {
|
||||
assert!(!alice.chat_exists(&chat_id).unwrap());
|
||||
assert!(alice.list_chats().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ratchet_state_persistence() {
|
||||
use tempfile::tempdir;
|
||||
|
||||
// Create a temporary directory for the database
|
||||
let dir = tempdir().unwrap();
|
||||
let db_path = dir.path().join("test.db");
|
||||
|
||||
let mut bob = ChatManager::in_memory().unwrap();
|
||||
let bob_intro = bob.create_intro_bundle().unwrap();
|
||||
|
||||
let chat_id;
|
||||
|
||||
// Scope 1: Create chat and send messages
|
||||
{
|
||||
let mut alice = ChatManager::open(StorageConfig::File(
|
||||
db_path.to_str().unwrap().to_string(),
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let result = alice.start_private_chat(&bob_intro, "Message 1").unwrap();
|
||||
chat_id = result.0;
|
||||
|
||||
// Send more messages - this advances the ratchet
|
||||
alice.send_message(&chat_id, b"Message 2").unwrap();
|
||||
alice.send_message(&chat_id, b"Message 3").unwrap();
|
||||
|
||||
// Chat should be in memory
|
||||
assert!(alice.chats.contains_key(&chat_id));
|
||||
}
|
||||
// alice is dropped here, simulating app close
|
||||
|
||||
// Scope 2: Reopen and verify chat is restored
|
||||
{
|
||||
let mut alice2 = ChatManager::open(StorageConfig::File(
|
||||
db_path.to_str().unwrap().to_string(),
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
// Chat is in storage but not loaded yet
|
||||
assert!(alice2.list_stored_chats().unwrap().contains(&chat_id));
|
||||
assert!(!alice2.chats.contains_key(&chat_id));
|
||||
|
||||
// Send another message - this will load the chat and advance ratchet
|
||||
let result = alice2.send_message(&chat_id, b"Message 4");
|
||||
assert!(result.is_ok(), "Should be able to send after restore");
|
||||
|
||||
// Chat should now be in memory
|
||||
assert!(alice2.chats.contains_key(&chat_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,12 +1,9 @@
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Debug;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use crate::errors::ChatError;
|
||||
use crate::types::{AddressedEncryptedPayload, ContentData};
|
||||
|
||||
pub type ChatId<'a> = &'a str;
|
||||
pub type ChatIdOwned = Arc<str>;
|
||||
|
||||
pub trait HasChatId: Debug {
|
||||
fn id(&self) -> ChatId<'_>;
|
||||
@ -21,66 +18,7 @@ pub trait InboundMessageHandler {
|
||||
|
||||
pub trait Chat: HasChatId + Debug {
|
||||
fn send_message(&mut self, content: &[u8])
|
||||
-> Result<Vec<AddressedEncryptedPayload>, ChatError>;
|
||||
-> Result<Vec<AddressedEncryptedPayload>, ChatError>;
|
||||
|
||||
fn remote_id(&self) -> String;
|
||||
}
|
||||
|
||||
pub struct ChatStore {
|
||||
chats: HashMap<Arc<str>, Box<dyn Chat>>,
|
||||
handlers: HashMap<Arc<str>, Box<dyn InboundMessageHandler>>,
|
||||
}
|
||||
|
||||
impl ChatStore {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
chats: HashMap::new(),
|
||||
handlers: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert_chat(&mut self, conversation: impl Chat + HasChatId + 'static) -> ChatIdOwned {
|
||||
let key: ChatIdOwned = Arc::from(conversation.id());
|
||||
self.chats.insert(key.clone(), Box::new(conversation));
|
||||
key
|
||||
}
|
||||
|
||||
pub fn insert_boxed_chat(&mut self, conversation: Box<dyn Chat>) -> ChatIdOwned {
|
||||
let key: ChatIdOwned = Arc::from(conversation.id());
|
||||
self.chats.insert(key.clone(), conversation);
|
||||
key
|
||||
}
|
||||
|
||||
pub fn remove_chat(&mut self, id: &str) -> Option<Box<dyn Chat>> {
|
||||
self.chats.remove(id)
|
||||
}
|
||||
|
||||
pub fn register_handler(
|
||||
&mut self,
|
||||
handler: impl InboundMessageHandler + HasChatId + 'static,
|
||||
) -> ChatIdOwned {
|
||||
let key: ChatIdOwned = Arc::from(handler.id());
|
||||
self.handlers.insert(key.clone(), Box::new(handler));
|
||||
key
|
||||
}
|
||||
|
||||
pub fn get_chat(&self, id: ChatId) -> Option<&(dyn Chat + '_)> {
|
||||
self.chats.get(id).map(|c| c.as_ref())
|
||||
}
|
||||
|
||||
pub fn get_mut_chat(&mut self, id: &str) -> Option<&mut (dyn Chat + '_)> {
|
||||
Some(self.chats.get_mut(id)?.as_mut())
|
||||
}
|
||||
|
||||
pub fn get_handler(&mut self, id: ChatId) -> Option<&mut (dyn InboundMessageHandler + '_)> {
|
||||
Some(self.handlers.get_mut(id)?.as_mut())
|
||||
}
|
||||
|
||||
pub fn chat_ids(&self) -> impl Iterator<Item = ChatIdOwned> + '_ {
|
||||
self.chats.keys().cloned()
|
||||
}
|
||||
|
||||
pub fn handler_ids(&self) -> impl Iterator<Item = ChatIdOwned> + '_ {
|
||||
self.handlers.keys().cloned()
|
||||
}
|
||||
}
|
||||
|
||||
@ -12,32 +12,94 @@ use crate::{
|
||||
common::{Chat, ChatId, HasChatId},
|
||||
errors::{ChatError, EncryptionError},
|
||||
proto,
|
||||
storage::types::{RatchetStateRecord, SkippedKeyRecord},
|
||||
types::AddressedEncryptedPayload,
|
||||
utils::timestamp_millis,
|
||||
};
|
||||
|
||||
pub struct PrivateV1Convo {
|
||||
chat_id: String,
|
||||
dr_state: RatchetState,
|
||||
}
|
||||
|
||||
impl PrivateV1Convo {
|
||||
pub fn new_initiator(seed_key: SecretKey, remote: PublicKey) -> Self {
|
||||
pub fn new_initiator(chat_id: String, seed_key: SecretKey, remote: PublicKey) -> Self {
|
||||
// TODO: Danger - Fix double-ratchets types to Accept SecretKey
|
||||
// perhaps update the DH to work with cryptocrate.
|
||||
// init_sender doesn't take ownership of the key so a reference can be used.
|
||||
let shared_secret: [u8; 32] = seed_key.as_bytes().to_vec().try_into().unwrap();
|
||||
Self {
|
||||
chat_id,
|
||||
dr_state: RatchetState::init_sender(shared_secret, remote),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_responder(seed_key: SecretKey, dh_self: InstallationKeyPair) -> Self {
|
||||
pub fn new_responder(chat_id: String, seed_key: SecretKey, dh_self: InstallationKeyPair) -> Self {
|
||||
Self {
|
||||
chat_id,
|
||||
// TODO: Danger - Fix double-ratchets types to Accept SecretKey
|
||||
dr_state: RatchetState::init_receiver(seed_key.as_bytes().to_owned(), dh_self),
|
||||
}
|
||||
}
|
||||
|
||||
/// Restore a conversation from stored ratchet state.
|
||||
pub fn from_storage(
|
||||
chat_id: String,
|
||||
state: RatchetStateRecord,
|
||||
skipped_keys: Vec<SkippedKeyRecord>,
|
||||
) -> Self {
|
||||
use std::collections::HashMap;
|
||||
|
||||
let dh_self = InstallationKeyPair::from_secret_bytes(state.dh_self_secret);
|
||||
let dh_remote = state.dh_remote.map(PublicKey::from);
|
||||
|
||||
let skipped: HashMap<(PublicKey, u32), [u8; 32]> = skipped_keys
|
||||
.into_iter()
|
||||
.map(|sk| ((PublicKey::from(sk.public_key), sk.msg_num), sk.message_key))
|
||||
.collect();
|
||||
|
||||
let dr_state = RatchetState::from_parts(
|
||||
state.root_key,
|
||||
state.sending_chain,
|
||||
state.receiving_chain,
|
||||
dh_self,
|
||||
dh_remote,
|
||||
state.msg_send,
|
||||
state.msg_recv,
|
||||
state.prev_chain_len,
|
||||
skipped,
|
||||
);
|
||||
|
||||
Self { chat_id, dr_state }
|
||||
}
|
||||
|
||||
/// Get the current ratchet state for storage.
|
||||
pub fn to_storage(&self) -> (RatchetStateRecord, Vec<SkippedKeyRecord>) {
|
||||
let state = RatchetStateRecord {
|
||||
root_key: self.dr_state.root_key,
|
||||
sending_chain: self.dr_state.sending_chain,
|
||||
receiving_chain: self.dr_state.receiving_chain,
|
||||
dh_self_secret: *self.dr_state.dh_self.secret_bytes(),
|
||||
dh_remote: self.dr_state.dh_remote.map(|pk| pk.to_bytes()),
|
||||
msg_send: self.dr_state.msg_send,
|
||||
msg_recv: self.dr_state.msg_recv,
|
||||
prev_chain_len: self.dr_state.prev_chain_len,
|
||||
};
|
||||
|
||||
let skipped_keys: Vec<SkippedKeyRecord> = self
|
||||
.dr_state
|
||||
.skipped_keys
|
||||
.iter()
|
||||
.map(|((pk, msg_num), key)| SkippedKeyRecord {
|
||||
public_key: pk.to_bytes(),
|
||||
msg_num: *msg_num,
|
||||
message_key: *key,
|
||||
})
|
||||
.collect();
|
||||
|
||||
(state, skipped_keys)
|
||||
}
|
||||
|
||||
fn encrypt(&mut self, frame: PrivateV1Frame) -> EncryptedPayload {
|
||||
let encoded_bytes = frame.encode_to_vec();
|
||||
let (cipher_text, header) = self.dr_state.encrypt_message(&encoded_bytes);
|
||||
@ -93,8 +155,7 @@ impl PrivateV1Convo {
|
||||
|
||||
impl HasChatId for PrivateV1Convo {
|
||||
fn id(&self) -> ChatId<'_> {
|
||||
// TODO: implementation
|
||||
"private_v1_convo_id"
|
||||
&self.chat_id
|
||||
}
|
||||
}
|
||||
|
||||
@ -147,11 +208,15 @@ mod tests {
|
||||
|
||||
let seed_key = saro.diffie_hellman(&pub_raya);
|
||||
let send_content_bytes = vec![0, 2, 4, 6, 8];
|
||||
let mut sr_convo =
|
||||
PrivateV1Convo::new_initiator(SecretKey::from(seed_key.to_bytes()), pub_raya);
|
||||
let mut sr_convo = PrivateV1Convo::new_initiator(
|
||||
"test_chat".to_string(),
|
||||
SecretKey::from(seed_key.to_bytes()),
|
||||
pub_raya,
|
||||
);
|
||||
|
||||
let installation_key_pair = InstallationKeyPair::from(raya);
|
||||
let mut rs_convo = PrivateV1Convo::new_responder(
|
||||
"test_chat".to_string(),
|
||||
SecretKey::from(seed_key.to_bytes()),
|
||||
installation_key_pair,
|
||||
);
|
||||
|
||||
@ -15,6 +15,7 @@ use crate::identity::{PublicKey, StaticSecret};
|
||||
use crate::inbox::handshake::InboxHandshake;
|
||||
use crate::proto::{self, CopyBytes};
|
||||
use crate::types::{AddressedEncryptedPayload, ContentData};
|
||||
use crate::utils::generate_chat_id;
|
||||
|
||||
use super::Introduction;
|
||||
|
||||
@ -107,7 +108,10 @@ impl Inbox {
|
||||
let (seed_key, ephemeral_pub) =
|
||||
InboxHandshake::perform_as_initiator(&self.ident.secret(), &pkb, &mut rng);
|
||||
|
||||
let mut convo = PrivateV1Convo::new_initiator(seed_key, remote_bundle.ephemeral_key);
|
||||
// Generate unique chat ID
|
||||
let chat_id = generate_chat_id();
|
||||
let mut convo =
|
||||
PrivateV1Convo::new_initiator(chat_id, seed_key, remote_bundle.ephemeral_key);
|
||||
|
||||
let mut payloads = convo.send_message(initial_message.as_bytes())?;
|
||||
|
||||
@ -245,7 +249,11 @@ impl InboundMessageHandler for Inbox {
|
||||
|
||||
match frame.frame_type.unwrap() {
|
||||
proto::inbox_v1_frame::FrameType::InvitePrivateV1(_invite_private_v1) => {
|
||||
let convo = PrivateV1Convo::new_responder(seed_key, ephemeral_key.clone().into());
|
||||
// Generate unique chat ID for the responder
|
||||
let chat_id = generate_chat_id();
|
||||
let installation_keypair =
|
||||
double_ratchets::InstallationKeyPair::from(ephemeral_key.clone());
|
||||
let convo = PrivateV1Convo::new_responder(chat_id, seed_key, installation_keypair);
|
||||
|
||||
// TODO: Update PrivateV1 Constructor with DR, initial_message
|
||||
Ok((Box::new(convo), vec![]))
|
||||
|
||||
@ -5,7 +5,7 @@ use std::collections::HashMap;
|
||||
use storage::{RusqliteError, SqliteDb, StorageConfig, StorageError, params};
|
||||
use x25519_dalek::StaticSecret;
|
||||
|
||||
use super::types::{ChatRecord, IdentityRecord};
|
||||
use super::types::{ChatRecord, IdentityRecord, RatchetStateRecord, SkippedKeyRecord};
|
||||
use crate::identity::Identity;
|
||||
|
||||
/// Schema for chat storage tables.
|
||||
@ -33,6 +33,33 @@ const CHAT_SCHEMA: &str = "
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_chats_type ON chats(chat_type);
|
||||
|
||||
-- Ratchet state for each conversation
|
||||
CREATE TABLE IF NOT EXISTS ratchet_state (
|
||||
conversation_id TEXT PRIMARY KEY,
|
||||
root_key BLOB NOT NULL,
|
||||
sending_chain BLOB,
|
||||
receiving_chain BLOB,
|
||||
dh_self_secret BLOB NOT NULL,
|
||||
dh_remote BLOB,
|
||||
msg_send INTEGER NOT NULL,
|
||||
msg_recv INTEGER NOT NULL,
|
||||
prev_chain_len INTEGER NOT NULL
|
||||
);
|
||||
|
||||
-- Skipped message keys (for out-of-order messages)
|
||||
CREATE TABLE IF NOT EXISTS skipped_keys (
|
||||
conversation_id TEXT NOT NULL,
|
||||
public_key BLOB NOT NULL,
|
||||
msg_num INTEGER NOT NULL,
|
||||
message_key BLOB NOT NULL,
|
||||
created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')),
|
||||
PRIMARY KEY (conversation_id, public_key, msg_num),
|
||||
FOREIGN KEY (conversation_id) REFERENCES ratchet_state(conversation_id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_skipped_keys_conversation
|
||||
ON skipped_keys(conversation_id);
|
||||
";
|
||||
|
||||
/// Chat-specific storage operations.
|
||||
@ -257,13 +284,181 @@ impl ChatStorage {
|
||||
Ok(exists)
|
||||
}
|
||||
|
||||
/// Deletes a chat record.
|
||||
/// Deletes a chat record and its ratchet state.
|
||||
pub fn delete_chat(&mut self, chat_id: &str) -> Result<(), StorageError> {
|
||||
self.db
|
||||
.connection()
|
||||
.execute("DELETE FROM chats WHERE chat_id = ?1", params![chat_id])?;
|
||||
let tx = self.db.transaction()?;
|
||||
// Delete skipped keys first (foreign key constraint)
|
||||
tx.execute(
|
||||
"DELETE FROM skipped_keys WHERE conversation_id = ?1",
|
||||
params![chat_id],
|
||||
)?;
|
||||
tx.execute(
|
||||
"DELETE FROM ratchet_state WHERE conversation_id = ?1",
|
||||
params![chat_id],
|
||||
)?;
|
||||
tx.execute("DELETE FROM chats WHERE chat_id = ?1", params![chat_id])?;
|
||||
tx.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ==================== Ratchet State Operations ====================
|
||||
|
||||
/// Saves the ratchet state for a conversation.
|
||||
pub fn save_ratchet_state(
|
||||
&mut self,
|
||||
conversation_id: &str,
|
||||
state: &RatchetStateRecord,
|
||||
skipped_keys: &[SkippedKeyRecord],
|
||||
) -> Result<(), StorageError> {
|
||||
let tx = self.db.transaction()?;
|
||||
|
||||
// Upsert main state
|
||||
tx.execute(
|
||||
"
|
||||
INSERT INTO ratchet_state (
|
||||
conversation_id, root_key, sending_chain, receiving_chain,
|
||||
dh_self_secret, dh_remote, msg_send, msg_recv, prev_chain_len
|
||||
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
|
||||
ON CONFLICT(conversation_id) DO UPDATE SET
|
||||
root_key = excluded.root_key,
|
||||
sending_chain = excluded.sending_chain,
|
||||
receiving_chain = excluded.receiving_chain,
|
||||
dh_self_secret = excluded.dh_self_secret,
|
||||
dh_remote = excluded.dh_remote,
|
||||
msg_send = excluded.msg_send,
|
||||
msg_recv = excluded.msg_recv,
|
||||
prev_chain_len = excluded.prev_chain_len
|
||||
",
|
||||
params![
|
||||
conversation_id,
|
||||
state.root_key.as_slice(),
|
||||
state.sending_chain.as_ref().map(|c| c.as_slice()),
|
||||
state.receiving_chain.as_ref().map(|c| c.as_slice()),
|
||||
state.dh_self_secret.as_slice(),
|
||||
state.dh_remote.as_ref().map(|c| c.as_slice()),
|
||||
state.msg_send,
|
||||
state.msg_recv,
|
||||
state.prev_chain_len,
|
||||
],
|
||||
)?;
|
||||
|
||||
// Sync skipped keys: delete old ones and insert new
|
||||
tx.execute(
|
||||
"DELETE FROM skipped_keys WHERE conversation_id = ?1",
|
||||
params![conversation_id],
|
||||
)?;
|
||||
|
||||
for sk in skipped_keys {
|
||||
tx.execute(
|
||||
"INSERT INTO skipped_keys (conversation_id, public_key, msg_num, message_key)
|
||||
VALUES (?1, ?2, ?3, ?4)",
|
||||
params![
|
||||
conversation_id,
|
||||
sk.public_key.as_slice(),
|
||||
sk.msg_num,
|
||||
sk.message_key.as_slice(),
|
||||
],
|
||||
)?;
|
||||
}
|
||||
|
||||
tx.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Loads the ratchet state for a conversation.
|
||||
pub fn load_ratchet_state(
|
||||
&self,
|
||||
conversation_id: &str,
|
||||
) -> Result<Option<(RatchetStateRecord, Vec<SkippedKeyRecord>)>, StorageError> {
|
||||
// Load main state
|
||||
let state = self.load_ratchet_state_data(conversation_id)?;
|
||||
let state = match state {
|
||||
Some(s) => s,
|
||||
None => return Ok(None),
|
||||
};
|
||||
|
||||
// Load skipped keys
|
||||
let skipped_keys = self.load_skipped_keys(conversation_id)?;
|
||||
|
||||
Ok(Some((state, skipped_keys)))
|
||||
}
|
||||
|
||||
fn load_ratchet_state_data(
|
||||
&self,
|
||||
conversation_id: &str,
|
||||
) -> Result<Option<RatchetStateRecord>, StorageError> {
|
||||
let conn = self.db.connection();
|
||||
let mut stmt = conn.prepare(
|
||||
"
|
||||
SELECT root_key, sending_chain, receiving_chain, dh_self_secret,
|
||||
dh_remote, msg_send, msg_recv, prev_chain_len
|
||||
FROM ratchet_state
|
||||
WHERE conversation_id = ?1
|
||||
",
|
||||
)?;
|
||||
|
||||
let result = stmt.query_row(params![conversation_id], |row| {
|
||||
Ok(RatchetStateRecord {
|
||||
root_key: blob_to_array(row.get::<_, Vec<u8>>(0)?),
|
||||
sending_chain: row.get::<_, Option<Vec<u8>>>(1)?.map(blob_to_array),
|
||||
receiving_chain: row.get::<_, Option<Vec<u8>>>(2)?.map(blob_to_array),
|
||||
dh_self_secret: blob_to_array(row.get::<_, Vec<u8>>(3)?),
|
||||
dh_remote: row.get::<_, Option<Vec<u8>>>(4)?.map(blob_to_array),
|
||||
msg_send: row.get(5)?,
|
||||
msg_recv: row.get(6)?,
|
||||
prev_chain_len: row.get(7)?,
|
||||
})
|
||||
});
|
||||
|
||||
match result {
|
||||
Ok(record) => Ok(Some(record)),
|
||||
Err(RusqliteError::QueryReturnedNoRows) => Ok(None),
|
||||
Err(e) => Err(StorageError::Database(e.to_string())),
|
||||
}
|
||||
}
|
||||
|
||||
fn load_skipped_keys(
|
||||
&self,
|
||||
conversation_id: &str,
|
||||
) -> Result<Vec<SkippedKeyRecord>, StorageError> {
|
||||
let conn = self.db.connection();
|
||||
let mut stmt = conn.prepare(
|
||||
"
|
||||
SELECT public_key, msg_num, message_key
|
||||
FROM skipped_keys
|
||||
WHERE conversation_id = ?1
|
||||
",
|
||||
)?;
|
||||
|
||||
let rows = stmt.query_map(params![conversation_id], |row| {
|
||||
Ok(SkippedKeyRecord {
|
||||
public_key: blob_to_array(row.get::<_, Vec<u8>>(0)?),
|
||||
msg_num: row.get(1)?,
|
||||
message_key: blob_to_array(row.get::<_, Vec<u8>>(2)?),
|
||||
})
|
||||
})?;
|
||||
|
||||
rows.collect::<Result<Vec<_>, _>>()
|
||||
.map_err(|e| StorageError::Database(e.to_string()))
|
||||
}
|
||||
|
||||
/// Checks if a ratchet state exists for a conversation.
|
||||
pub fn ratchet_state_exists(&self, conversation_id: &str) -> Result<bool, StorageError> {
|
||||
let conn = self.db.connection();
|
||||
let count: i64 = conn.query_row(
|
||||
"SELECT COUNT(*) FROM ratchet_state WHERE conversation_id = ?1",
|
||||
params![conversation_id],
|
||||
|row| row.get(0),
|
||||
)?;
|
||||
Ok(count > 0)
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper to convert a Vec<u8> to a fixed-size array.
|
||||
fn blob_to_array(blob: Vec<u8>) -> [u8; 32] {
|
||||
let mut arr = [0u8; 32];
|
||||
arr.copy_from_slice(&blob);
|
||||
arr
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@ -7,7 +7,7 @@
|
||||
//! handles all storage operations automatically.
|
||||
|
||||
mod db;
|
||||
mod types;
|
||||
pub(crate) mod types;
|
||||
|
||||
pub(crate) use db::ChatStorage;
|
||||
pub(crate) use storage::StorageError;
|
||||
|
||||
@ -57,3 +57,24 @@ impl ChatRecord {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Raw ratchet state data for SQLite storage.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RatchetStateRecord {
|
||||
pub root_key: [u8; 32],
|
||||
pub sending_chain: Option<[u8; 32]>,
|
||||
pub receiving_chain: Option<[u8; 32]>,
|
||||
pub dh_self_secret: [u8; 32],
|
||||
pub dh_remote: Option<[u8; 32]>,
|
||||
pub msg_send: u32,
|
||||
pub msg_recv: u32,
|
||||
pub prev_chain_len: u32,
|
||||
}
|
||||
|
||||
/// Skipped key record for out-of-order message handling.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SkippedKeyRecord {
|
||||
pub public_key: [u8; 32],
|
||||
pub msg_num: u32,
|
||||
pub message_key: [u8; 32],
|
||||
}
|
||||
|
||||
@ -1,8 +1,17 @@
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use rand_core::OsRng;
|
||||
use x25519_dalek::StaticSecret;
|
||||
|
||||
pub fn timestamp_millis() -> i64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis() as i64
|
||||
}
|
||||
|
||||
/// Generate a unique chat ID using random bytes.
|
||||
pub fn generate_chat_id() -> String {
|
||||
let secret = StaticSecret::random_from_rng(OsRng);
|
||||
hex::encode(&secret.as_bytes()[..16])
|
||||
}
|
||||
|
||||
@ -291,6 +291,35 @@ impl<D: HkdfInfo> RatchetState<D> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Reconstructs a RatchetState from its component parts.
|
||||
///
|
||||
/// This is used for restoring state from storage.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn from_parts(
|
||||
root_key: RootKey,
|
||||
sending_chain: Option<ChainKey>,
|
||||
receiving_chain: Option<ChainKey>,
|
||||
dh_self: InstallationKeyPair,
|
||||
dh_remote: Option<PublicKey>,
|
||||
msg_send: u32,
|
||||
msg_recv: u32,
|
||||
prev_chain_len: u32,
|
||||
skipped_keys: HashMap<(PublicKey, u32), MessageKey>,
|
||||
) -> Self {
|
||||
Self {
|
||||
root_key,
|
||||
sending_chain,
|
||||
receiving_chain,
|
||||
dh_self,
|
||||
dh_remote,
|
||||
msg_send,
|
||||
msg_recv,
|
||||
prev_chain_len,
|
||||
skipped_keys,
|
||||
_domain: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Performs a receiving-side DH ratchet when a new remote DH public key is observed.
|
||||
///
|
||||
/// # Arguments
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user