From 44949b00431845b4cf8a9e2c0a1561ab8cb1e7c1 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Fri, 13 Mar 2026 10:38:00 +0800 Subject: [PATCH] feat: remove conversation store --- conversations/src/context.rs | 82 ++++++++------------ conversations/src/conversation.rs | 36 --------- conversations/src/conversation/group_test.rs | 45 ----------- conversations/src/storage/db.rs | 34 ++++++++ 4 files changed, 65 insertions(+), 132 deletions(-) delete mode 100644 conversations/src/conversation/group_test.rs diff --git a/conversations/src/context.rs b/conversations/src/context.rs index f6e9cdd..e695c28 100644 --- a/conversations/src/context.rs +++ b/conversations/src/context.rs @@ -1,10 +1,11 @@ use std::rc::Rc; +use std::sync::Arc; use double_ratchets::{RatchetState, RatchetStorage}; use storage::StorageConfig; use crate::{ - conversation::{ConversationId, ConversationStore, Convo, Id, PrivateV1Convo}, + conversation::{ConversationId, Convo, Id, PrivateV1Convo}, errors::ChatError, identity::Identity, inbox::Inbox, @@ -20,7 +21,6 @@ pub use crate::inbox::Introduction; // Ctx manages lifetimes of objects to process and generate payloads. pub struct Context { _identity: Rc, - store: ConversationStore, inbox: Inbox, storage: ChatStorage, ratchet_storage: RatchetStorage, @@ -48,28 +48,8 @@ impl Context { let identity = Rc::new(identity); let inbox = Inbox::new(Rc::clone(&identity)); - // Restore persisted conversations - let mut store = ConversationStore::new(); - let conversation_records = storage.load_conversations()?; - for record in conversation_records { - let convo: Box = match record.convo_type.as_str() { - "private_v1" => { - let dr_state: RatchetState = - ratchet_storage.load(&record.local_convo_id)?; - Box::new(PrivateV1Convo::from_stored( - record.local_convo_id, - record.remote_convo_id, - dr_state, - )) - } - _ => continue, // Skip unknown conversation types - }; - store.insert_convo(convo); - } - Ok(Self { _identity: identity, - store, inbox, storage, ratchet_storage, @@ -103,12 +83,16 @@ impl Context { .map(|p| p.into_envelope(remote_id.clone())) .collect(); - let convo_id = self.add_convo(Box::new(convo)); + let convo_id = self.persist_convo(&convo); (convo_id, payload_bytes) } pub fn list_conversations(&self) -> Result, ChatError> { - Ok(self.store.conversation_ids()) + let records = self.storage.load_conversations()?; + Ok(records + .into_iter() + .map(|r| Arc::from(r.local_convo_id.as_str())) + .collect()) } pub fn send_content( @@ -116,10 +100,7 @@ impl Context { convo_id: ConversationId, content: &[u8], ) -> Result, ChatError> { - let convo = self - .store - .get_mut(convo_id) - .ok_or_else(|| ChatError::NoConvo(convo_id.into()))?; + let mut convo = self.load_convo(convo_id)?; let payloads = convo.send_message(content)?; let remote_id = convo.remote_id(); @@ -140,7 +121,7 @@ impl Context { let enc = EncryptedPayload::decode(env.payload)?; match convo_id { c if c == self.inbox.id() => self.dispatch_to_inbox(enc), - c if self.store.has(&c) => self.dispatch_to_convo(&c, enc), + c if self.storage.has_conversation(&c)? => self.dispatch_to_convo(&c, enc), _ => Ok(None), } } @@ -162,7 +143,7 @@ impl Context { // Remove consumed ephemeral key from storage self.storage.remove_ephemeral_key(&key_hex)?; - self.add_convo(convo); + self.persist_convo(convo.as_ref()); Ok(content) } @@ -172,13 +153,9 @@ impl Context { convo_id: ConversationId, enc_payload: EncryptedPayload, ) -> Result, ChatError> { - let Some(convo) = self.store.get_mut(convo_id) else { - return Err(ChatError::Protocol("convo id not found".into())); - }; + let mut convo = self.load_convo(convo_id)?; let result = convo.handle_frame(enc_payload)?; - - // Persist updated ratchet state convo.save_ratchet_state(&mut self.ratchet_storage)?; Ok(result) @@ -191,34 +168,37 @@ impl Context { Ok(intro.into()) } - fn add_convo(&mut self, convo: Box) -> ConversationIdOwned { - // Persist conversation metadata and ratchet state + /// Loads a conversation from DB by constructing it from metadata + ratchet state. + fn load_convo(&self, convo_id: ConversationId) -> Result { + let record = self + .storage + .load_conversation(convo_id)? + .ok_or_else(|| ChatError::NoConvo(convo_id.into()))?; + + let dr_state: RatchetState = self.ratchet_storage.load(&record.local_convo_id)?; + + Ok(PrivateV1Convo::from_stored( + record.local_convo_id, + record.remote_convo_id, + dr_state, + )) + } + + /// Persists a conversation's metadata and ratchet state to DB. + fn persist_convo(&mut self, convo: &dyn Convo) -> ConversationIdOwned { let _ = self.storage.save_conversation( convo.id(), &convo.remote_id(), convo.convo_type(), ); let _ = convo.save_ratchet_state(&mut self.ratchet_storage); - self.store.insert_convo(convo) + Arc::from(convo.id()) } - } #[cfg(test)] mod tests { use super::*; - use crate::conversation::GroupTestConvo; - - #[test] - fn convo_store_get() { - let mut store: ConversationStore = ConversationStore::new(); - - let new_convo = GroupTestConvo::new(); - let convo_id = store.insert_convo(Box::new(new_convo)); - - let convo = store.get_mut(&convo_id).ok_or(0); - convo.unwrap(); - } fn send_and_verify( sender: &mut Context, diff --git a/conversations/src/conversation.rs b/conversations/src/conversation.rs index db4e76d..4e15373 100644 --- a/conversations/src/conversation.rs +++ b/conversations/src/conversation.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; @@ -38,42 +37,7 @@ pub trait Convo: Id + Debug { } } -pub struct ConversationStore { - conversations: HashMap, Box>, -} - -impl ConversationStore { - pub fn new() -> Self { - Self { - conversations: HashMap::new(), - } - } - - pub fn insert_convo(&mut self, conversation: Box) -> ConversationIdOwned { - let key: ConversationIdOwned = Arc::from(conversation.id()); - self.conversations.insert(key.clone(), conversation); - key - } - - pub fn has(&self, id: ConversationId) -> bool { - self.conversations.contains_key(id) - } - - pub fn get_mut(&mut self, id: &str) -> Option<&mut (dyn Convo + '_)> { - Some(self.conversations.get_mut(id)?.as_mut()) - } - - #[allow(dead_code)] - pub fn conversation_ids(&self) -> Vec { - self.conversations.keys().cloned().collect() - } -} - -#[cfg(test)] -mod group_test; mod privatev1; use chat_proto::logoschat::encryption::EncryptedPayload; -#[cfg(test)] -pub(crate) use group_test::GroupTestConvo; pub use privatev1::PrivateV1Convo; diff --git a/conversations/src/conversation/group_test.rs b/conversations/src/conversation/group_test.rs deleted file mode 100644 index 0ce4084..0000000 --- a/conversations/src/conversation/group_test.rs +++ /dev/null @@ -1,45 +0,0 @@ -use crate::{ - conversation::{ChatError, ConversationId, Convo, Id}, - proto::EncryptedPayload, - types::{AddressedEncryptedPayload, ContentData}, -}; - -#[derive(Debug)] -pub struct GroupTestConvo {} - -impl GroupTestConvo { - pub fn new() -> Self { - Self {} - } -} - -impl Id for GroupTestConvo { - fn id(&self) -> ConversationId<'_> { - // implementation - "grouptest" - } -} - -impl Convo for GroupTestConvo { - fn send_message( - &mut self, - _content: &[u8], - ) -> Result, ChatError> { - Ok(vec![]) - } - - fn handle_frame( - &mut self, - _encoded_payload: EncryptedPayload, - ) -> Result, ChatError> { - Ok(None) - } - - fn remote_id(&self) -> String { - self.id().to_string() - } - - fn convo_type(&self) -> &str { - "group_test" - } -} diff --git a/conversations/src/storage/db.rs b/conversations/src/storage/db.rs index b950e7b..6530224 100644 --- a/conversations/src/storage/db.rs +++ b/conversations/src/storage/db.rs @@ -126,6 +126,40 @@ impl ChatStorage { Ok(()) } + /// Checks if a conversation exists by its local ID. + pub fn has_conversation(&self, local_convo_id: &str) -> Result { + let exists: bool = self.db.connection().query_row( + "SELECT EXISTS(SELECT 1 FROM conversations WHERE local_convo_id = ?1)", + params![local_convo_id], + |row| row.get(0), + )?; + Ok(exists) + } + + /// Loads a single conversation record by its local ID. + pub fn load_conversation( + &self, + local_convo_id: &str, + ) -> Result, StorageError> { + let mut stmt = self.db.connection().prepare( + "SELECT local_convo_id, remote_convo_id, convo_type FROM conversations WHERE local_convo_id = ?1", + )?; + + let result = stmt.query_row(params![local_convo_id], |row| { + Ok(ConversationRecord { + local_convo_id: row.get(0)?, + remote_convo_id: row.get(1)?, + convo_type: row.get(2)?, + }) + }); + + match result { + Ok(record) => Ok(Some(record)), + Err(RusqliteError::QueryReturnedNoRows) => Ok(None), + Err(e) => Err(e.into()), + } + } + /// Loads all conversation records. pub fn load_conversations(&self) -> Result, StorageError> { let mut stmt = self.db.connection().prepare(