chore: remove customized db conn

This commit is contained in:
kaichaosun 2026-03-30 17:17:32 +08:00
parent 41a1599f9f
commit 69d62ce51e
No known key found for this signature in database
GPG Key ID: 223E0F992F4F03BF
20 changed files with 366 additions and 326 deletions

3
Cargo.lock generated
View File

@ -117,7 +117,6 @@ dependencies = [
"rusqlite",
"storage",
"tempfile",
"thiserror",
"zeroize",
]
@ -136,8 +135,8 @@ dependencies = [
name = "client"
version = "0.1.0"
dependencies = [
"chat-sqlite",
"libchat",
"storage",
]
[[package]]

View File

@ -8,7 +8,7 @@ crate-type = ["rlib","staticlib","dylib"]
[dependencies]
base64 = "0.22"
chat-sqlite = { path = "../sqlite" }
sqlite = { package = "chat-sqlite", path = "../sqlite" }
blake2.workspace = true
chat-proto = { git = "https://github.com/logos-messaging/chat_proto" }
crypto = { path = "../crypto" }

View File

@ -13,8 +13,7 @@ use safer_ffi::{
prelude::{c_slice, repr_c},
};
use sqlite::ChatStorage;
use storage::StorageConfig;
use sqlite::{ChatStorage, StorageConfig};
use crate::{
context::{Context, Introduction},
@ -135,7 +134,17 @@ pub fn create_new_private_convo(
};
// Create conversation
let (convo_id, payloads) = ctx.0.create_private_convo(&intro, &content);
let (convo_id, payloads) = match ctx.0.create_private_convo(&intro, &content) {
Ok((id, payloads)) => (id, payloads),
Err(_) => {
*out = NewConvoResult {
error_code: ErrorCode::UnknownError as i32,
convo_id: "".into(),
payloads: Vec::new().into(),
};
return;
}
};
// Convert payloads to FFI-compatible vector
let ffi_payloads: Vec<Payload> = payloads

View File

@ -29,10 +29,7 @@ impl<T: ChatStore> Context<T> {
///
/// If an identity exists in storage, it will be restored.
/// Otherwise, a new identity will be created with the given name and saved.
pub fn open(
name: impl Into<String>,
store: T,
) -> Result<Self, ChatError> {
pub fn open(name: impl Into<String>, store: T) -> Result<Self, ChatError> {
let name = name.into();
// Load or create identity
@ -61,7 +58,9 @@ impl<T: ChatStore> Context<T> {
pub fn new_with_name(name: impl Into<String>, mut chat_store: T) -> Self {
let name = name.into();
let identity = Identity::new(&name);
chat_store.save_identity(&identity).expect("in-memory storage should not fail");
chat_store
.save_identity(&identity)
.expect("in-memory storage should not fail");
let identity = Rc::new(identity);
let inbox = Inbox::new(Rc::clone(&identity));
@ -81,7 +80,7 @@ impl<T: ChatStore> Context<T> {
&mut self,
remote_bundle: &Introduction,
content: &[u8],
) -> (ConversationIdOwned, Vec<AddressedEnvelope>) {
) -> Result<(ConversationIdOwned, Vec<AddressedEnvelope>), ChatError> {
let (convo, payloads) = self
.inbox
.invite_to_private_convo(remote_bundle, content)
@ -93,8 +92,8 @@ impl<T: ChatStore> Context<T> {
.map(|p| p.into_envelope(remote_id.clone()))
.collect();
let convo_id = self.persist_convo(&convo);
(convo_id, payload_bytes)
let convo_id = self.persist_convo(&convo)?;
Ok((convo_id, payload_bytes))
}
pub fn list_conversations(&self) -> Result<Vec<ConversationIdOwned>, ChatError> {
@ -150,10 +149,8 @@ impl<T: ChatStore> Context<T> {
let (convo, content) = self.inbox.handle_frame(&ephemeral_key, enc_payload)?;
// Remove consumed ephemeral key from storage
self.persist_convo(&convo)?;
self.store.remove_ephemeral_key(&key_hex)?;
self.persist_convo(convo.as_ref());
Ok(content)
}
@ -207,22 +204,22 @@ impl<T: ChatStore> Context<T> {
}
/// Persists a conversation's metadata and ratchet state to DB.
fn persist_convo(&mut self, convo: &dyn Convo) -> ConversationIdOwned {
fn persist_convo(&mut self, convo: &PrivateV1Convo) -> Result<ConversationIdOwned, ChatError> {
let convo_info = ConversationMeta {
local_convo_id: convo.id().to_string(),
remote_convo_id: convo.remote_id(),
kind: convo.convo_type().into(),
};
let _ = self.store.save_conversation(&convo_info);
let _ = convo.save_ratchet_state(&mut self.store);
Arc::from(convo.id())
self.store.save_conversation(&convo_info)?;
convo.save_ratchet_state(&mut self.store)?;
Ok(Arc::from(convo.id()))
}
}
#[cfg(test)]
mod tests {
use sqlite::ChatStorage;
use storage::{ConversationStore, StorageConfig};
use sqlite::{ChatStorage, StorageConfig};
use storage::ConversationStore;
use super::*;
@ -253,7 +250,7 @@ mod tests {
// Saro initiates conversation with Raya
let mut content = vec![10];
let (saro_convo_id, payloads) = saro.create_private_convo(&intro, &content);
let (saro_convo_id, payloads) = saro.create_private_convo(&intro, &content).unwrap();
// Raya receives initial message
let payload = payloads.first().unwrap();
@ -296,7 +293,7 @@ mod tests {
let bundle = alice.create_intro_bundle().unwrap();
let intro = Introduction::try_from(bundle.as_slice()).unwrap();
let (_, payloads) = bob.create_private_convo(&intro, b"hi");
let (_, payloads) = bob.create_private_convo(&intro, b"hi").unwrap();
let payload = payloads.first().unwrap();
let content = alice.handle_payload(&payload.data).unwrap().unwrap();
@ -314,7 +311,7 @@ mod tests {
let bundle = alice.create_intro_bundle().unwrap();
let intro = Introduction::try_from(bundle.as_slice()).unwrap();
let (bob_convo_id, payloads) = bob.create_private_convo(&intro, b"hello");
let (bob_convo_id, payloads) = bob.create_private_convo(&intro, b"hello").unwrap();
let payload = payloads.first().unwrap();
let content = alice.handle_payload(&payload.data).unwrap().unwrap();
@ -343,9 +340,7 @@ mod tests {
assert_eq!(content.data, b"more messages");
// Alice can also send back
let payloads = alice
.send_content(&alice_convo_id, b"alice reply")
.unwrap();
let payloads = alice.send_content(&alice_convo_id, b"alice reply").unwrap();
let payload = payloads.first().unwrap();
let content = bob
.handle_payload(&payload.data)

View File

@ -3,7 +3,6 @@ use std::sync::Arc;
pub use crate::errors::ChatError;
use crate::types::{AddressedEncryptedPayload, ContentData};
use storage::RatchetStore;
pub type ConversationId<'a> = &'a str;
pub type ConversationIdOwned = Arc<str>;
@ -30,9 +29,6 @@ pub trait Convo: Id + Debug {
/// Returns the conversation type identifier for storage.
fn convo_type(&self) -> ConversationKind;
/// Persists ratchet state to storage. Default is no-op.
fn save_ratchet_state(&self, storage: &mut dyn RatchetStore) -> Result<(), ChatError>;
}
mod privatev1;

View File

@ -168,6 +168,13 @@ impl PrivateV1Convo {
is_new_convo: false,
})
}
pub fn save_ratchet_state(&self, storage: &mut dyn RatchetStore) -> Result<(), ChatError> {
let record = to_ratchet_record(&self.dr_state);
let skipped_keys = to_skipped_key_records(&self.dr_state.skipped_keys());
storage.save_ratchet_state(&self.local_convo_id, &record, &skipped_keys)?;
Ok(())
}
}
impl Id for PrivateV1Convo {
@ -225,13 +232,6 @@ impl Convo for PrivateV1Convo {
fn convo_type(&self) -> ConversationKind {
ConversationKind::PrivateV1
}
fn save_ratchet_state(&self, storage: &mut dyn RatchetStore) -> Result<(), ChatError> {
let record = to_ratchet_record(&self.dr_state);
let skipped_keys = to_skipped_key_records(&self.dr_state.skipped_keys());
storage.save_ratchet_state(&self.local_convo_id, &record, &skipped_keys)?;
Ok(())
}
}
impl Debug for PrivateV1Convo {

View File

@ -115,7 +115,7 @@ impl Inbox {
&self,
ephemeral_key: &PrivateKey,
enc_payload: EncryptedPayload,
) -> Result<(Box<dyn Convo>, Option<ContentData>), ChatError> {
) -> Result<(PrivateV1Convo, Option<ContentData>), ChatError> {
let handshake = Self::extract_payload(enc_payload)?;
let header = handshake
@ -142,7 +142,7 @@ impl Inbox {
None => return Err(ChatError::Protocol("expected contentData".into())),
};
Ok((Box::new(convo), Some(content)))
Ok((convo, Some(content)))
}
}
}
@ -239,8 +239,8 @@ impl Id for Inbox {
#[cfg(test)]
mod tests {
use super::*;
use sqlite::ChatStorage;
use storage::{EphemeralKeyStore, StorageConfig};
use sqlite::{ChatStorage, StorageConfig};
use storage::EphemeralKeyStore;
#[test]
fn test_invite_privatev1_roundtrip() {

View File

@ -27,5 +27,5 @@ serde = "1.0"
headers = ["safer-ffi/headers"]
[dev-dependencies]
chat-sqlite = { path = "../sqlite" }
sqlite = { package = "chat-sqlite", path = "../sqlite" }
tempfile = "3"

View File

@ -3,8 +3,7 @@
//! Run with: cargo run --example out_of_order_demo -p double-ratchets
use double_ratchets::{InstallationKeyPair, RatchetSession};
use sqlite::ChatStorage;
use storage::StorageConfig;
use sqlite::{ChatStorage, StorageConfig};
use tempfile::NamedTempFile;
fn main() {
@ -31,23 +30,21 @@ fn main() {
let mut bob_storage =
ChatStorage::new(StorageConfig::File(bob_db_path.to_string())).unwrap();
let mut alice_session: RatchetSession<ChatStorage> =
RatchetSession::create_sender_session(
&mut alice_storage,
conv_id,
shared_secret,
bob_public,
)
.unwrap();
let mut alice_session: RatchetSession<ChatStorage> = RatchetSession::create_sender_session(
&mut alice_storage,
conv_id,
shared_secret,
bob_public,
)
.unwrap();
let mut bob_session: RatchetSession<ChatStorage> =
RatchetSession::create_receiver_session(
&mut bob_storage,
conv_id,
shared_secret,
bob_keypair,
)
.unwrap();
let mut bob_session: RatchetSession<ChatStorage> = RatchetSession::create_receiver_session(
&mut bob_storage,
conv_id,
shared_secret,
bob_keypair,
)
.unwrap();
println!(" Sessions created for Alice and Bob");

View File

@ -3,8 +3,7 @@
//! Run with: cargo run --example storage_demo -p double-ratchets
use double_ratchets::{InstallationKeyPair, RatchetSession};
use sqlite::ChatStorage;
use storage::StorageConfig;
use sqlite::{ChatStorage, StorageConfig};
use tempfile::NamedTempFile;
fn main() {
@ -21,10 +20,7 @@ fn main() {
ChatStorage::new(StorageConfig::File(alice_db_path.to_string())).unwrap();
let mut bob_storage =
ChatStorage::new(StorageConfig::File(bob_db_path.to_string())).unwrap();
println!(
" Database created at: {}, {}",
alice_db_path, bob_db_path
);
println!(" Database created at: {}, {}", alice_db_path, bob_db_path);
run_conversation(&mut alice_storage, &mut bob_storage);
}

View File

@ -141,8 +141,7 @@ impl<'a, S: RatchetStore, D: HkdfInfo + Clone> RatchetSession<'a, S, D> {
/// Manually saves the current state.
pub fn save(&mut self) -> Result<(), SessionError> {
save_state(self.storage, &self.conversation_id, &self.state)
.map_err(|error| error.into())
save_state(self.storage, &self.conversation_id, &self.state).map_err(|error| error.into())
}
pub fn msg_send(&self) -> u32 {

View File

@ -10,7 +10,6 @@ hex = "0.4.3"
storage = { path = "../storage" }
zeroize = { version = "1.8.2", features = ["derive"] }
rusqlite = { version = "0.35", features = ["bundled-sqlcipher-vendored-openssl"] }
thiserror = "2"
[dev-dependencies]
tempfile = "3"

View File

@ -1,10 +1,9 @@
//! SQLite storage backend.
use rusqlite::{Connection, Row, Transaction};
use std::path::Path;
use rusqlite::Connection;
use storage::StorageError;
use crate::errors::SqliteError;
use crate::errors::map_rusqlite_error;
/// Configuration for SQLite storage.
#[derive(Debug, Clone)]
@ -17,99 +16,51 @@ pub enum StorageConfig {
Encrypted { path: String, key: String },
}
pub struct DbConn(rusqlite::Connection);
impl DbConn {
fn map_err(e: rusqlite::Error) -> StorageError {
StorageError::Database(e.to_string())
}
pub fn prepare(&self, sql: &str) -> Result<rusqlite::Statement<'_>, StorageError> {
self.0.prepare(sql).map_err(Self::map_err)
}
pub fn transaction(&mut self) -> Result<Transaction<'_>, StorageError> {
self.0.transaction().map_err(Self::map_err)
}
pub fn execute(&self, sql: &str, params: impl rusqlite::Params) -> Result<usize, StorageError> {
self.0.execute(sql, params).map_err(Self::map_err)
}
pub fn execute_batch(&self, sql: &str) -> Result<(), StorageError> {
self.0.execute_batch(sql).map_err(Self::map_err)
}
pub fn query_row<T, F>(
&self,
sql: &str,
params: impl rusqlite::Params,
f: F,
) -> Result<T, StorageError>
where
F: FnOnce(&Row) -> Result<T, rusqlite::Error>,
{
self.0.query_row(sql, params, f).map_err(Self::map_err)
}
}
/// SQLite database wrapper.
///
/// This provides the core database connection and can be shared
/// across different domain-specific storage implementations.
pub struct SqliteDb {
conn: DbConn,
conn: Connection,
}
impl SqliteDb {
/// Creates a new SQLite database with the given configuration.
pub fn new(config: StorageConfig) -> Result<Self, SqliteError> {
pub fn new(config: StorageConfig) -> Result<Self, StorageError> {
let conn = match config {
StorageConfig::InMemory => Connection::open_in_memory()?,
StorageConfig::File(ref path) => Connection::open(path)?,
StorageConfig::InMemory => Connection::open_in_memory().map_err(map_rusqlite_error)?,
StorageConfig::File(ref path) => Connection::open(path).map_err(map_rusqlite_error)?,
StorageConfig::Encrypted { ref path, ref key } => {
let conn = Connection::open(path)?;
conn.pragma_update(None, "key", key)?;
let conn = Connection::open(path).map_err(map_rusqlite_error)?;
conn.pragma_update(None, "key", key)
.map_err(map_rusqlite_error)?;
conn
}
};
// Enable foreign keys
conn.execute_batch("PRAGMA foreign_keys = ON;")?;
conn.execute_batch("PRAGMA foreign_keys = ON;")
.map_err(map_rusqlite_error)?;
Ok(Self { conn: DbConn(conn) })
}
/// Opens an existing database file.
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, SqliteError> {
let conn = Connection::open(path)?;
conn.execute_batch("PRAGMA foreign_keys = ON;")?;
Ok(Self { conn: DbConn(conn) })
}
/// Creates an in-memory database (useful for testing).
pub fn in_memory() -> Result<Self, SqliteError> {
Self::new(StorageConfig::InMemory)
}
pub fn sqlcipher(path: String, key: String) -> Result<Self, SqliteError> {
Self::new(StorageConfig::Encrypted { path, key })
Ok(Self { conn })
}
/// Returns a reference to the underlying connection.
///
/// Use this for domain-specific storage operations.
pub fn connection(&self) -> &DbConn {
pub fn connection(&self) -> &Connection {
&self.conn
}
/// Returns a mutable reference to the underlying connection.
///
/// Use this for operations that require mutable access, such as transactions.
pub fn connection_mut(&mut self) -> &mut DbConn {
pub fn connection_mut(&mut self) -> &mut Connection {
&mut self.conn
}
/// Begins a transaction.
pub fn transaction(&mut self) -> Result<rusqlite::Transaction<'_>, SqliteError> {
Ok(self.conn.transaction()?)
pub fn transaction(&mut self) -> Result<rusqlite::Transaction<'_>, StorageError> {
self.conn.transaction().map_err(map_rusqlite_error)
}
}

View File

@ -1,32 +1,24 @@
use rusqlite::Error as RusqliteError;
use storage::StorageError;
use thiserror::Error;
// #[derive(Debug, thiserror::Error, Display)]
// pub struct SqliteError(pub rusqlite::Error);
//
// #[derive(Debug, thiserror::Error)]
// pub enum SqliteError {
// #[error(transparent)]
// Rusqlite(#[from] rusqlite::Error),
// #[error(transparent)]
// Storage(#[from] StorageError),
// }
#[derive(Debug, Error)]
pub enum SqliteError {
#[error("sqlite error: {0}")]
Rusqlite(#[from] rusqlite::Error),
#[error(transparent)]
Storage(#[from] StorageError),
pub(crate) fn map_rusqlite_error(err: RusqliteError) -> StorageError {
StorageError::Database(err.to_string())
}
// impl From<SqliteError> for StorageError {
// fn from(err: SqliteError) -> Self {
// match err {
// SqliteError::Storage(e) => e,
// SqliteError::Rusqlite(e) => StorageError::Database(e.to_string()),
// }
// }
// }
pub(crate) fn map_optional_row<T>(
result: Result<T, RusqliteError>,
) -> Result<Option<T>, StorageError> {
match result {
Ok(value) => Ok(Some(value)),
Err(RusqliteError::QueryReturnedNoRows) => Ok(None),
Err(err) => Err(map_rusqlite_error(err)),
}
}
pub(crate) fn not_found(record: impl Into<String>) -> StorageError {
StorageError::NotFound(record.into())
}
pub(crate) fn invalid_blob_length(field: &str, expected: usize, actual: usize) -> StorageError {
StorageError::InvalidData(format!("{field} expected {expected} bytes, got {actual}"))
}

View File

@ -8,7 +8,7 @@ mod types;
use std::collections::HashSet;
use crypto::{Identity, PrivateKey};
use rusqlite::{Error as RusqliteError, Transaction, params};
use rusqlite::{Transaction, params};
use storage::{
ConversationKind, ConversationMeta, ConversationStore, EphemeralKeyStore, IdentityStore,
RatchetStateRecord, RatchetStore, SkippedKeyRecord, StorageError,
@ -16,10 +16,13 @@ use storage::{
use zeroize::Zeroize;
use crate::{
common::{SqliteDb, StorageConfig},
common::SqliteDb,
errors::{invalid_blob_length, map_optional_row, map_rusqlite_error, not_found},
types::IdentityRecord,
};
pub use common::StorageConfig;
/// Chat-specific storage operations.
///
/// This struct wraps a SqliteDb and provides domain-specific
@ -57,7 +60,8 @@ impl IdentityStore for ChatStorage {
let mut stmt = self
.db
.connection()
.prepare("SELECT name, secret_key FROM identity WHERE id = 1")?;
.prepare("SELECT name, secret_key FROM identity WHERE id = 1")
.map_err(map_rusqlite_error)?;
let result = stmt.query_row([], |row| {
let name: String = row.get(0)?;
@ -65,15 +69,17 @@ impl IdentityStore for ChatStorage {
Ok((name, secret_key))
});
match result {
Ok((name, mut secret_key_vec)) => {
match map_optional_row(result)? {
Some((name, mut secret_key_vec)) => {
let bytes: Result<[u8; 32], _> = secret_key_vec.as_slice().try_into();
let bytes = match bytes {
Ok(b) => b,
Err(_) => {
secret_key_vec.zeroize();
return Err(StorageError::InvalidData(
"Invalid secret key length".into(),
return Err(invalid_blob_length(
"identity.secret_key",
32,
secret_key_vec.len(),
));
}
};
@ -84,8 +90,7 @@ impl IdentityStore for ChatStorage {
};
Ok(Some(Identity::from(record)))
}
Err(RusqliteError::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
None => Ok(None),
}
}
@ -95,10 +100,14 @@ impl IdentityStore for ChatStorage {
/// the time sensitive data remains in stack memory.
fn save_identity(&mut self, identity: &Identity) -> Result<(), StorageError> {
let mut secret_bytes = identity.secret().DANGER_to_bytes();
let result = self.db.connection().execute(
"INSERT OR REPLACE INTO identity (id, name, secret_key) VALUES (1, ?1, ?2)",
params![identity.get_name(), secret_bytes.as_slice()],
);
let result = self
.db
.connection()
.execute(
"INSERT OR REPLACE INTO identity (id, name, secret_key) VALUES (1, ?1, ?2)",
params![identity.get_name(), secret_bytes.as_slice()],
)
.map_err(map_rusqlite_error);
secret_bytes.zeroize();
result?;
Ok(())
@ -113,10 +122,14 @@ impl EphemeralKeyStore for ChatStorage {
private_key: &PrivateKey,
) -> Result<(), StorageError> {
let mut secret_bytes = private_key.DANGER_to_bytes();
let result = self.db.connection().execute(
"INSERT OR REPLACE INTO ephemeral_keys (public_key_hex, secret_key) VALUES (?1, ?2)",
params![public_key_hex, secret_bytes.as_slice()],
);
let result = self
.db
.connection()
.execute(
"INSERT OR REPLACE INTO ephemeral_keys (public_key_hex, secret_key) VALUES (?1, ?2)",
params![public_key_hex, secret_bytes.as_slice()],
)
.map_err(map_rusqlite_error);
secret_bytes.zeroize();
result?;
Ok(())
@ -127,39 +140,44 @@ impl EphemeralKeyStore for ChatStorage {
let mut stmt = self
.db
.connection()
.prepare("SELECT secret_key FROM ephemeral_keys WHERE public_key_hex = ?1")?;
.prepare("SELECT secret_key FROM ephemeral_keys WHERE public_key_hex = ?1")
.map_err(map_rusqlite_error)?;
let result = stmt.query_row(params![public_key_hex], |row| {
let secret_key: Vec<u8> = row.get(0)?;
Ok(secret_key)
});
match result {
Ok(mut secret_key_vec) => {
match map_optional_row(result)? {
Some(mut secret_key_vec) => {
let bytes: Result<[u8; 32], _> = secret_key_vec.as_slice().try_into();
let bytes = match bytes {
Ok(b) => b,
Err(_) => {
secret_key_vec.zeroize();
return Err(StorageError::InvalidData(
"Invalid ephemeral secret key length".into(),
return Err(invalid_blob_length(
"ephemeral_keys.secret_key",
32,
secret_key_vec.len(),
));
}
};
secret_key_vec.zeroize();
Ok(Some(PrivateKey::from(bytes)))
}
Err(RusqliteError::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
None => Ok(None),
}
}
/// Removes an ephemeral key from storage.
fn remove_ephemeral_key(&mut self, public_key_hex: &str) -> Result<(), StorageError> {
self.db.connection().execute(
"DELETE FROM ephemeral_keys WHERE public_key_hex = ?1",
params![public_key_hex],
)?;
self.db
.connection()
.execute(
"DELETE FROM ephemeral_keys WHERE public_key_hex = ?1",
params![public_key_hex],
)
.map_err(map_rusqlite_error)?;
Ok(())
}
}
@ -170,7 +188,8 @@ impl ConversationStore for ChatStorage {
self.db.connection().execute(
"INSERT OR REPLACE INTO conversations (local_convo_id, remote_convo_id, convo_type) VALUES (?1, ?2, ?3)",
params![meta.local_convo_id, meta.remote_convo_id, meta.kind.as_str()],
)?;
)
.map_err(map_rusqlite_error)?;
Ok(())
}
@ -179,9 +198,13 @@ impl ConversationStore for ChatStorage {
&self,
local_convo_id: &str,
) -> Result<Option<ConversationMeta>, StorageError> {
let mut stmt = self.db.connection().prepare(
"SELECT local_convo_id, remote_convo_id, convo_type FROM conversations WHERE local_convo_id = ?1",
)?;
let mut stmt = self
.db
.connection()
.prepare(
"SELECT local_convo_id, remote_convo_id, convo_type FROM conversations WHERE local_convo_id = ?1",
)
.map_err(map_rusqlite_error)?;
let result = stmt.query_row(params![local_convo_id], |row| {
let local_convo_id: String = row.get(0)?;
@ -194,19 +217,18 @@ impl ConversationStore for ChatStorage {
})
});
match result {
Ok(meta) => Ok(Some(meta)),
Err(RusqliteError::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
map_optional_row(result)
}
/// Removes a conversation by its local ID.
fn remove_conversation(&mut self, local_convo_id: &str) -> Result<(), StorageError> {
self.db.connection().execute(
"DELETE FROM conversations WHERE local_convo_id = ?1",
params![local_convo_id],
)?;
self.db
.connection()
.execute(
"DELETE FROM conversations WHERE local_convo_id = ?1",
params![local_convo_id],
)
.map_err(map_rusqlite_error)?;
Ok(())
}
@ -215,7 +237,8 @@ impl ConversationStore for ChatStorage {
let mut stmt = self
.db
.connection()
.prepare("SELECT local_convo_id, remote_convo_id, convo_type FROM conversations")?;
.prepare("SELECT local_convo_id, remote_convo_id, convo_type FROM conversations")
.map_err(map_rusqlite_error)?;
let records = stmt
.query_map([], |row| {
@ -227,19 +250,25 @@ impl ConversationStore for ChatStorage {
remote_convo_id,
kind: ConversationKind::from(convo_type.as_str()),
})
})?
.collect::<Result<Vec<_>, _>>()?;
})
.map_err(map_rusqlite_error)?
.collect::<Result<Vec<_>, _>>()
.map_err(map_rusqlite_error)?;
Ok(records)
}
/// Checks if a conversation exists by its local ID.
fn has_conversation(&self, local_convo_id: &str) -> Result<bool, StorageError> {
let exists: bool = self.db.connection().query_row(
"SELECT EXISTS(SELECT 1 FROM conversations WHERE local_convo_id = ?1)",
params![local_convo_id],
|row| row.get(0),
)?;
let exists: bool = self
.db
.connection()
.query_row(
"SELECT EXISTS(SELECT 1 FROM conversations WHERE local_convo_id = ?1)",
params![local_convo_id],
|row| row.get(0),
)
.map_err(map_rusqlite_error)?;
Ok(exists)
}
}
@ -281,12 +310,13 @@ impl RatchetStore for ChatStorage {
state.msg_recv,
state.prev_chain_len,
],
)?;
)
.map_err(map_rusqlite_error)?;
// Sync skipped keys
sync_skipped_keys(&tx, conversation_id, skipped_keys)?;
tx.commit()?;
tx.commit().map_err(map_rusqlite_error)?;
Ok(())
}
@ -295,32 +325,59 @@ impl RatchetStore for ChatStorage {
conversation_id: &str,
) -> Result<RatchetStateRecord, StorageError> {
let conn = self.db.connection();
let mut stmt = conn.prepare(
"
SELECT root_key, sending_chain, receiving_chain, dh_self_secret,
dh_remote, msg_send, msg_recv, prev_chain_len
FROM ratchet_state
WHERE conversation_id = ?1
",
)?;
let mut stmt = conn
.prepare(
"
SELECT root_key, sending_chain, receiving_chain, dh_self_secret,
dh_remote, msg_send, msg_recv, prev_chain_len
FROM ratchet_state
WHERE conversation_id = ?1
",
)
.map_err(map_rusqlite_error)?;
stmt.query_row(params![conversation_id], |row| {
Ok(RatchetStateRecord {
root_key: blob_to_array(row.get::<_, Vec<u8>>(0)?),
sending_chain: row.get::<_, Option<Vec<u8>>>(1)?.map(blob_to_array),
receiving_chain: row.get::<_, Option<Vec<u8>>>(2)?.map(blob_to_array),
dh_self_secret: blob_to_array(row.get::<_, Vec<u8>>(3)?),
dh_remote: row.get::<_, Option<Vec<u8>>>(4)?.map(blob_to_array),
msg_send: row.get(5)?,
msg_recv: row.get(6)?,
prev_chain_len: row.get(7)?,
let (
root_key,
sending_chain,
receiving_chain,
dh_self_secret,
dh_remote,
msg_send,
msg_recv,
prev_chain_len,
) = stmt
.query_row(params![conversation_id], |row| {
Ok((
row.get::<_, Vec<u8>>(0)?,
row.get::<_, Option<Vec<u8>>>(1)?,
row.get::<_, Option<Vec<u8>>>(2)?,
row.get::<_, Vec<u8>>(3)?,
row.get::<_, Option<Vec<u8>>>(4)?,
row.get(5)?,
row.get(6)?,
row.get(7)?,
))
})
})
.map_err(|e| match e {
RusqliteError::QueryReturnedNoRows => {
StorageError::NotFound(conversation_id.to_string())
}
e => StorageError::Database(e.to_string()),
.map_err(|err| match err {
rusqlite::Error::QueryReturnedNoRows => not_found(conversation_id.to_string()),
other => map_rusqlite_error(other),
})?;
Ok(RatchetStateRecord {
root_key: blob_to_array(root_key, "ratchet_state.root_key")?,
sending_chain: sending_chain
.map(|blob| blob_to_array(blob, "ratchet_state.sending_chain"))
.transpose()?,
receiving_chain: receiving_chain
.map(|blob| blob_to_array(blob, "ratchet_state.receiving_chain"))
.transpose()?,
dh_self_secret: blob_to_array(dh_self_secret, "ratchet_state.dh_self_secret")?,
dh_remote: dh_remote
.map(|blob| blob_to_array(blob, "ratchet_state.dh_remote"))
.transpose()?,
msg_send,
msg_recv,
prev_chain_len,
})
}
@ -329,33 +386,48 @@ impl RatchetStore for ChatStorage {
conversation_id: &str,
) -> Result<Vec<SkippedKeyRecord>, StorageError> {
let conn = self.db.connection();
let mut stmt = conn.prepare(
"
SELECT public_key, msg_num, message_key
FROM skipped_keys
WHERE conversation_id = ?1
",
)?;
let mut stmt = conn
.prepare(
"
SELECT public_key, msg_num, message_key
FROM skipped_keys
WHERE conversation_id = ?1
",
)
.map_err(map_rusqlite_error)?;
let rows = stmt.query_map(params![conversation_id], |row| {
Ok(SkippedKeyRecord {
public_key: blob_to_array(row.get::<_, Vec<u8>>(0)?),
msg_num: row.get(1)?,
message_key: blob_to_array(row.get::<_, Vec<u8>>(2)?),
let rows = stmt
.query_map(params![conversation_id], |row| {
Ok((
row.get::<_, Vec<u8>>(0)?,
row.get::<_, u32>(1)?,
row.get::<_, Vec<u8>>(2)?,
))
})
})?;
.map_err(map_rusqlite_error)?
.collect::<Result<Vec<_>, _>>()
.map_err(map_rusqlite_error)?;
rows.collect::<Result<Vec<_>, _>>()
.map_err(|e| StorageError::Database(e.to_string()))
rows.into_iter()
.map(|(public_key, msg_num, message_key)| {
Ok(SkippedKeyRecord {
public_key: blob_to_array(public_key, "skipped_keys.public_key")?,
msg_num,
message_key: blob_to_array(message_key, "skipped_keys.message_key")?,
})
})
.collect()
}
fn has_ratchet_state(&self, conversation_id: &str) -> Result<bool, StorageError> {
let conn = self.db.connection();
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM ratchet_state WHERE conversation_id = ?1",
params![conversation_id],
|row| row.get(0),
)?;
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM ratchet_state WHERE conversation_id = ?1",
params![conversation_id],
|row| row.get(0),
)
.map_err(map_rusqlite_error)?;
Ok(count > 0)
}
@ -364,21 +436,25 @@ impl RatchetStore for ChatStorage {
tx.execute(
"DELETE FROM skipped_keys WHERE conversation_id = ?1",
params![conversation_id],
)?;
)
.map_err(map_rusqlite_error)?;
tx.execute(
"DELETE FROM ratchet_state WHERE conversation_id = ?1",
params![conversation_id],
)?;
tx.commit()?;
)
.map_err(map_rusqlite_error)?;
tx.commit().map_err(map_rusqlite_error)?;
Ok(())
}
fn cleanup_old_skipped_keys(&mut self, max_age_secs: i64) -> Result<usize, StorageError> {
let conn = self.db.connection();
let deleted = conn.execute(
"DELETE FROM skipped_keys WHERE created_at < strftime('%s', 'now') - ?1",
params![max_age_secs],
)?;
let deleted = conn
.execute(
"DELETE FROM skipped_keys WHERE created_at < strftime('%s', 'now') - ?1",
params![max_age_secs],
)
.map_err(map_rusqlite_error)?;
Ok(deleted)
}
}
@ -390,17 +466,26 @@ fn sync_skipped_keys(
current_keys: &[SkippedKeyRecord],
) -> Result<(), StorageError> {
// Get existing keys from DB (just the identifiers)
let mut stmt =
tx.prepare("SELECT public_key, msg_num FROM skipped_keys WHERE conversation_id = ?1")?;
let existing: HashSet<([u8; 32], u32)> = stmt
let mut stmt = tx
.prepare("SELECT public_key, msg_num FROM skipped_keys WHERE conversation_id = ?1")
.map_err(map_rusqlite_error)?;
let existing_rows = stmt
.query_map(params![conversation_id], |row| {
Ok((row.get::<_, Vec<u8>>(0)?, row.get::<_, u32>(1)?))
})
.map_err(map_rusqlite_error)?
.collect::<Result<Vec<_>, _>>()
.map_err(map_rusqlite_error)?;
let existing: HashSet<([u8; 32], u32)> = existing_rows
.into_iter()
.map(|(public_key, msg_num)| {
Ok((
blob_to_array(row.get::<_, Vec<u8>>(0)?),
row.get::<_, u32>(1)?,
blob_to_array(public_key, "skipped_keys.public_key")?,
msg_num,
))
})?
.filter_map(|r| r.ok())
.collect();
})
.collect::<Result<_, StorageError>>()?;
// Build set of current keys
let current_set: HashSet<([u8; 32], u32)> = current_keys
@ -413,7 +498,8 @@ fn sync_skipped_keys(
tx.execute(
"DELETE FROM skipped_keys WHERE conversation_id = ?1 AND public_key = ?2 AND msg_num = ?3",
params![conversation_id, pk.as_slice(), msg_num],
)?;
)
.map_err(map_rusqlite_error)?;
}
// Insert new keys
@ -429,22 +515,28 @@ fn sync_skipped_keys(
sk.msg_num,
sk.message_key.as_slice(),
],
)?;
)
.map_err(map_rusqlite_error)?;
}
}
Ok(())
}
fn blob_to_array<const N: usize>(blob: Vec<u8>) -> [u8; N] {
fn blob_to_array<const N: usize>(
blob: Vec<u8>,
field: &'static str,
) -> Result<[u8; N], StorageError> {
let actual = blob.len();
blob.try_into()
.unwrap_or_else(|v: Vec<u8>| panic!("Expected {} bytes, got {}", N, v.len()))
.map_err(|_| invalid_blob_length(field, N, actual))
}
#[cfg(test)]
mod tests {
use storage::{
ConversationKind, ConversationMeta, ConversationStore, EphemeralKeyStore, IdentityStore,
RatchetStore,
};
use super::*;
@ -522,4 +614,39 @@ mod tests {
assert_eq!(convos[0].remote_convo_id, "remote_2");
assert_eq!(convos[0].kind.as_str(), "private_v1");
}
#[test]
fn test_invalid_ratchet_blob_returns_storage_error() {
let storage = ChatStorage::new(StorageConfig::InMemory).unwrap();
storage
.db
.connection()
.execute(
"INSERT INTO ratchet_state (
conversation_id, root_key, sending_chain, receiving_chain,
dh_self_secret, dh_remote, msg_send, msg_recv, prev_chain_len
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
params![
"bad-convo",
vec![0u8; 31],
Option::<Vec<u8>>::None,
Option::<Vec<u8>>::None,
vec![0u8; 32],
Option::<Vec<u8>>::None,
0u32,
0u32,
0u32,
],
)
.map_err(map_rusqlite_error)
.unwrap();
let err = storage.load_ratchet_state("bad-convo").unwrap_err();
assert!(matches!(err, StorageError::InvalidData(_)));
assert_eq!(
err.to_string(),
"invalid data: ratchet_state.root_key expected 32 bytes, got 31"
);
}
}

View File

@ -3,7 +3,10 @@
//! SQL migrations are embedded at compile time and applied in order.
//! Each migration is applied atomically within a transaction.
use crate::{common::DbConn, errors::SqliteError};
use rusqlite::Connection;
use storage::StorageError;
use crate::errors::map_rusqlite_error;
/// Embeds and returns all migration SQL files in order.
pub fn get_migrations() -> Vec<(&'static str, &'static str)> {
@ -22,29 +25,33 @@ pub fn get_migrations() -> Vec<(&'static str, &'static str)> {
/// Applies all migrations to the database.
///
/// Uses a simple version tracking table to avoid re-running migrations.
pub fn apply_migrations(conn: &mut DbConn) -> Result<(), SqliteError> {
pub fn apply_migrations(conn: &mut Connection) -> Result<(), StorageError> {
// Create migrations tracking table if it doesn't exist
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS _migrations (
name TEXT PRIMARY KEY,
applied_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
);",
)?;
)
.map_err(map_rusqlite_error)?;
for (name, sql) in get_migrations() {
// Check if migration already applied
let already_applied: bool = conn.query_row(
"SELECT EXISTS(SELECT 1 FROM _migrations WHERE name = ?1)",
[name],
|row| row.get(0),
)?;
let already_applied: bool = conn
.query_row(
"SELECT EXISTS(SELECT 1 FROM _migrations WHERE name = ?1)",
[name],
|row| row.get(0),
)
.map_err(map_rusqlite_error)?;
if !already_applied {
// Apply migration and record it atomically in a transaction
let tx = conn.transaction()?;
tx.execute_batch(sql)?;
tx.execute("INSERT INTO _migrations (name) VALUES (?1)", [name])?;
tx.commit()?;
let tx = conn.transaction().map_err(map_rusqlite_error)?;
tx.execute_batch(sql).map_err(map_rusqlite_error)?;
tx.execute("INSERT INTO _migrations (name) VALUES (?1)", [name])
.map_err(map_rusqlite_error)?;
tx.commit().map_err(map_rusqlite_error)?;
}
}

View File

@ -11,22 +11,6 @@ pub enum StorageError {
#[error("not found: {0}")]
NotFound(String),
/// Serialization error.
#[error("serialization error: {0}")]
Serialization(String),
/// Deserialization error.
#[error("deserialization error: {0}")]
Deserialization(String),
/// Schema migration error.
#[error("migration error: {0}")]
Migration(String),
/// Transaction error.
#[error("transaction error: {0}")]
Transaction(String),
/// Invalid data error.
#[error("invalid data: {0}")]
InvalidData(String),

View File

@ -8,4 +8,4 @@ crate-type = ["rlib"]
[dependencies]
libchat = { workspace = true }
storage = { path = "../../core/storage" }
chat-sqlite = { path = "../../core/sqlite" }

View File

@ -1,7 +1,7 @@
use chat_sqlite::StorageConfig;
use libchat::ChatError;
use libchat::ChatStorage;
use libchat::Context;
use storage::StorageConfig;
pub struct ChatClient {
ctx: Context<ChatStorage>,

View File

@ -40,9 +40,8 @@ type
VecPayload* = object
`ptr`*: ptr Payload
len*: csize_t
cap*: csize_t
cap*: csize_t ## Vector of Payloads returned by safer_ffi functions
## Vector of Payloads returned by safer_ffi functions
VecString* = object
`ptr`*: ptr ReprCString
len*: csize_t
@ -104,33 +103,25 @@ proc destroy_string*(s: ReprCString) {.importc.}
## Creates an intro bundle for sharing with other users
## Returns: CreateIntroResult struct - check error_code field (0 = success, negative = error)
## The result must be freed with destroy_intro_result()
proc create_intro_bundle*(
ctx: ContextHandle,
): CreateIntroResult {.importc.}
proc create_intro_bundle*(ctx: ContextHandle): CreateIntroResult {.importc.}
## Creates a new private conversation
## Returns: NewConvoResult struct - check error_code field (0 = success, negative = error)
## The result must be freed with destroy_convo_result()
proc create_new_private_convo*(
ctx: ContextHandle,
bundle: SliceUint8,
content: SliceUint8,
ctx: ContextHandle, bundle: SliceUint8, content: SliceUint8
): NewConvoResult {.importc.}
## Get the available conversation identifers.
## Returns: ListConvoResult struct - check error_code field (0 = success, negative = error)
## The result must be freed with destroy_list_result()
proc list_conversations*(
ctx: ContextHandle,
): ListConvoResult {.importc.}
proc list_conversations*(ctx: ContextHandle): ListConvoResult {.importc.}
## Sends content to an existing conversation
## Returns: SendContentResult struct - check error_code field (0 = success, negative = error)
## The result must be freed with destroy_send_content_result()
proc send_content*(
ctx: ContextHandle,
convo_id: ReprCString,
content: SliceUint8,
ctx: ContextHandle, convo_id: ReprCString, content: SliceUint8
): SendContentResult {.importc.}
## Handles an incoming payload
@ -139,8 +130,7 @@ proc send_content*(
## is no data, and the convo_id should be ignored.
## The result must be freed with destroy_handle_payload_result()
proc handle_payload*(
ctx: ContextHandle,
payload: SliceUint8,
ctx: ContextHandle, payload: SliceUint8
): HandlePayloadResult {.importc.}
## Free the result from create_intro_bundle
@ -229,4 +219,3 @@ proc toBytes*(s: string): seq[byte] =
return @[]
result = newSeq[byte](s.len)
copyMem(addr result[0], unsafeAddr s[0], s.len)