feat: remove conversation store

This commit is contained in:
kaichaosun 2026-03-13 10:38:00 +08:00
parent bc9397d2e0
commit 44949b0043
No known key found for this signature in database
GPG Key ID: 223E0F992F4F03BF
4 changed files with 65 additions and 132 deletions

View File

@ -1,10 +1,11 @@
use std::rc::Rc;
use std::sync::Arc;
use double_ratchets::{RatchetState, RatchetStorage};
use storage::StorageConfig;
use crate::{
conversation::{ConversationId, ConversationStore, Convo, Id, PrivateV1Convo},
conversation::{ConversationId, Convo, Id, PrivateV1Convo},
errors::ChatError,
identity::Identity,
inbox::Inbox,
@ -20,7 +21,6 @@ pub use crate::inbox::Introduction;
// Ctx manages lifetimes of objects to process and generate payloads.
pub struct Context {
_identity: Rc<Identity>,
store: ConversationStore,
inbox: Inbox,
storage: ChatStorage,
ratchet_storage: RatchetStorage,
@ -48,28 +48,8 @@ impl Context {
let identity = Rc::new(identity);
let inbox = Inbox::new(Rc::clone(&identity));
// Restore persisted conversations
let mut store = ConversationStore::new();
let conversation_records = storage.load_conversations()?;
for record in conversation_records {
let convo: Box<dyn Convo> = match record.convo_type.as_str() {
"private_v1" => {
let dr_state: RatchetState =
ratchet_storage.load(&record.local_convo_id)?;
Box::new(PrivateV1Convo::from_stored(
record.local_convo_id,
record.remote_convo_id,
dr_state,
))
}
_ => continue, // Skip unknown conversation types
};
store.insert_convo(convo);
}
Ok(Self {
_identity: identity,
store,
inbox,
storage,
ratchet_storage,
@ -103,12 +83,16 @@ impl Context {
.map(|p| p.into_envelope(remote_id.clone()))
.collect();
let convo_id = self.add_convo(Box::new(convo));
let convo_id = self.persist_convo(&convo);
(convo_id, payload_bytes)
}
pub fn list_conversations(&self) -> Result<Vec<ConversationIdOwned>, ChatError> {
Ok(self.store.conversation_ids())
let records = self.storage.load_conversations()?;
Ok(records
.into_iter()
.map(|r| Arc::from(r.local_convo_id.as_str()))
.collect())
}
pub fn send_content(
@ -116,10 +100,7 @@ impl Context {
convo_id: ConversationId,
content: &[u8],
) -> Result<Vec<AddressedEnvelope>, ChatError> {
let convo = self
.store
.get_mut(convo_id)
.ok_or_else(|| ChatError::NoConvo(convo_id.into()))?;
let mut convo = self.load_convo(convo_id)?;
let payloads = convo.send_message(content)?;
let remote_id = convo.remote_id();
@ -140,7 +121,7 @@ impl Context {
let enc = EncryptedPayload::decode(env.payload)?;
match convo_id {
c if c == self.inbox.id() => self.dispatch_to_inbox(enc),
c if self.store.has(&c) => self.dispatch_to_convo(&c, enc),
c if self.storage.has_conversation(&c)? => self.dispatch_to_convo(&c, enc),
_ => Ok(None),
}
}
@ -162,7 +143,7 @@ impl Context {
// Remove consumed ephemeral key from storage
self.storage.remove_ephemeral_key(&key_hex)?;
self.add_convo(convo);
self.persist_convo(convo.as_ref());
Ok(content)
}
@ -172,13 +153,9 @@ impl Context {
convo_id: ConversationId,
enc_payload: EncryptedPayload,
) -> Result<Option<ContentData>, ChatError> {
let Some(convo) = self.store.get_mut(convo_id) else {
return Err(ChatError::Protocol("convo id not found".into()));
};
let mut convo = self.load_convo(convo_id)?;
let result = convo.handle_frame(enc_payload)?;
// Persist updated ratchet state
convo.save_ratchet_state(&mut self.ratchet_storage)?;
Ok(result)
@ -191,34 +168,37 @@ impl Context {
Ok(intro.into())
}
fn add_convo(&mut self, convo: Box<dyn Convo>) -> ConversationIdOwned {
// Persist conversation metadata and ratchet state
/// Loads a conversation from DB by constructing it from metadata + ratchet state.
fn load_convo(&self, convo_id: ConversationId) -> Result<PrivateV1Convo, ChatError> {
let record = self
.storage
.load_conversation(convo_id)?
.ok_or_else(|| ChatError::NoConvo(convo_id.into()))?;
let dr_state: RatchetState = self.ratchet_storage.load(&record.local_convo_id)?;
Ok(PrivateV1Convo::from_stored(
record.local_convo_id,
record.remote_convo_id,
dr_state,
))
}
/// Persists a conversation's metadata and ratchet state to DB.
fn persist_convo(&mut self, convo: &dyn Convo) -> ConversationIdOwned {
let _ = self.storage.save_conversation(
convo.id(),
&convo.remote_id(),
convo.convo_type(),
);
let _ = convo.save_ratchet_state(&mut self.ratchet_storage);
self.store.insert_convo(convo)
Arc::from(convo.id())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::conversation::GroupTestConvo;
#[test]
fn convo_store_get() {
let mut store: ConversationStore = ConversationStore::new();
let new_convo = GroupTestConvo::new();
let convo_id = store.insert_convo(Box::new(new_convo));
let convo = store.get_mut(&convo_id).ok_or(0);
convo.unwrap();
}
fn send_and_verify(
sender: &mut Context,

View File

@ -1,4 +1,3 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
@ -38,42 +37,7 @@ pub trait Convo: Id + Debug {
}
}
pub struct ConversationStore {
conversations: HashMap<Arc<str>, Box<dyn Convo>>,
}
impl ConversationStore {
pub fn new() -> Self {
Self {
conversations: HashMap::new(),
}
}
pub fn insert_convo(&mut self, conversation: Box<dyn Convo>) -> ConversationIdOwned {
let key: ConversationIdOwned = Arc::from(conversation.id());
self.conversations.insert(key.clone(), conversation);
key
}
pub fn has(&self, id: ConversationId) -> bool {
self.conversations.contains_key(id)
}
pub fn get_mut(&mut self, id: &str) -> Option<&mut (dyn Convo + '_)> {
Some(self.conversations.get_mut(id)?.as_mut())
}
#[allow(dead_code)]
pub fn conversation_ids(&self) -> Vec<ConversationIdOwned> {
self.conversations.keys().cloned().collect()
}
}
#[cfg(test)]
mod group_test;
mod privatev1;
use chat_proto::logoschat::encryption::EncryptedPayload;
#[cfg(test)]
pub(crate) use group_test::GroupTestConvo;
pub use privatev1::PrivateV1Convo;

View File

@ -1,45 +0,0 @@
use crate::{
conversation::{ChatError, ConversationId, Convo, Id},
proto::EncryptedPayload,
types::{AddressedEncryptedPayload, ContentData},
};
#[derive(Debug)]
pub struct GroupTestConvo {}
impl GroupTestConvo {
pub fn new() -> Self {
Self {}
}
}
impl Id for GroupTestConvo {
fn id(&self) -> ConversationId<'_> {
// implementation
"grouptest"
}
}
impl Convo for GroupTestConvo {
fn send_message(
&mut self,
_content: &[u8],
) -> Result<Vec<AddressedEncryptedPayload>, ChatError> {
Ok(vec![])
}
fn handle_frame(
&mut self,
_encoded_payload: EncryptedPayload,
) -> Result<Option<ContentData>, ChatError> {
Ok(None)
}
fn remote_id(&self) -> String {
self.id().to_string()
}
fn convo_type(&self) -> &str {
"group_test"
}
}

View File

@ -126,6 +126,40 @@ impl ChatStorage {
Ok(())
}
/// Checks if a conversation exists by its local ID.
pub fn has_conversation(&self, local_convo_id: &str) -> Result<bool, StorageError> {
let exists: bool = self.db.connection().query_row(
"SELECT EXISTS(SELECT 1 FROM conversations WHERE local_convo_id = ?1)",
params![local_convo_id],
|row| row.get(0),
)?;
Ok(exists)
}
/// Loads a single conversation record by its local ID.
pub fn load_conversation(
&self,
local_convo_id: &str,
) -> Result<Option<ConversationRecord>, StorageError> {
let mut stmt = self.db.connection().prepare(
"SELECT local_convo_id, remote_convo_id, convo_type FROM conversations WHERE local_convo_id = ?1",
)?;
let result = stmt.query_row(params![local_convo_id], |row| {
Ok(ConversationRecord {
local_convo_id: row.get(0)?,
remote_convo_id: row.get(1)?,
convo_type: row.get(2)?,
})
});
match result {
Ok(record) => Ok(Some(record)),
Err(RusqliteError::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
/// Loads all conversation records.
pub fn load_conversations(&self) -> Result<Vec<ConversationRecord>, StorageError> {
let mut stmt = self.db.connection().prepare(