Add GroupV1

This commit is contained in:
Jazz Turner-Baggs 2026-04-23 18:04:45 -07:00
parent 39bf267564
commit af3ff3c6a2
No known key found for this signature in database
23 changed files with 1840 additions and 82 deletions

4
Cargo.lock generated
View File

@ -633,6 +633,10 @@ dependencies = [
"syn 2.0.117",
]
[[package]]
name = "delivery"
version = "0.1.0"
[[package]]
name = "der"
version = "0.7.10"

View File

@ -10,9 +10,12 @@ members = [
"core/storage",
"crates/client",
"crates/client-ffi",
"bin/chat-cli",
"bin/chat-cli", "extensions/delivery",
]
# default-members = [ "core/*", "crates/*"]
default-members = [ "core/*"]
[workspace.dependencies]
blake2 = "0.10"
libchat = { path = "core/conversations" }

View File

@ -1,39 +1,50 @@
use std::sync::Arc;
use std::{cell::RefCell, rc::Rc};
use crypto::{Identity, PublicKey};
use storage::{ChatStore, ConversationKind};
use crate::conversation::{Convo, GroupConvo, GroupV1Convo, IdentityProvider};
use crate::ctx::{self, ClientCtx};
use crate::account::LogosAccount;
use crate::{DeliveryService, RegistrationService};
use crate::{
conversation::{Conversation, ConversationId, Convo, Id, PrivateV1Convo},
conversation::{Conversation, ConversationId, Id, PrivateV1Convo},
errors::ChatError,
inbox::Inbox,
inbox_v2::{GroupInitializer, InboxV2},
proto::{EncryptedPayload, EnvelopeV1, Message},
types::{AddressedEnvelope, ContentData},
};
use crypto::{Identity, PublicKey};
use storage::{ChatStore, ConversationKind};
pub use crate::conversation::ConversationIdOwned;
pub use crate::inbox::Introduction;
// This is the main entry point to the conversations api.
// Ctx manages lifetimes of objects to process and generate payloads.
pub struct Context<S: ChatStore> {
pub struct Context<DS: DeliveryService, RS: RegistrationService, CS: ChatStore> {
_identity: Rc<Identity>,
inbox: Inbox<S>,
store: Rc<RefCell<S>>,
#[allow(unused)] // TODO: (P2) Remove once Account integrated in future PR.
account: LogosAccount,
client_ctx: ClientCtx<DS, RS, CS>,
inbox: Inbox<CS>,
pq_inbox: InboxV2,
store: Rc<RefCell<CS>>,
}
impl<S: ChatStore> Context<S> {
impl<DS: DeliveryService, RS: RegistrationService, CS: ChatStore + 'static> Context<DS, RS, CS> {
/// Opens or creates a Context with the given storage configuration.
///
/// If an identity exists in storage, it will be restored.
/// Otherwise, a new identity will be created with the given name and saved.
pub fn new_from_store(name: impl Into<String>, store: S) -> Result<Self, ChatError> {
pub fn new_from_store(
name: impl Into<String>,
delivery: DS,
contact_reg: RS,
store: CS,
) -> Result<Self, ChatError> {
let name = name.into();
let store = Rc::new(RefCell::new(store));
let mut ctx = ClientCtx::new(delivery, contact_reg, store.clone());
// Load or create identity
let identity = if let Some(identity) = store.borrow().load_identity()? {
@ -47,9 +58,18 @@ impl<S: ChatStore> Context<S> {
let identity = Rc::new(identity);
let inbox = Inbox::new(Rc::clone(&store), Rc::clone(&identity));
let pq_inbox = InboxV2::new();
// Subscribe
ctx.ds()
.subscribe(pq_inbox.delivery_address())
.map_err(ChatError::generic)?;
Ok(Self {
_identity: identity,
client_ctx: ctx,
inbox,
pq_inbox,
store,
account: LogosAccount::new_test(name.as_str()),
})
@ -58,10 +78,17 @@ impl<S: ChatStore> Context<S> {
/// Creates a new in-memory Context (for testing).
///
/// Uses in-memory SQLite database. Each call creates a new isolated database.
pub fn new_with_name(name: impl Into<String>, chat_store: S) -> Self {
pub fn new_with_name(
name: impl Into<String>,
delivery: DS,
contact_reg: RS,
chat_store: CS,
) -> Result<Self, ChatError> {
let name = name.into();
let identity = Identity::new(&name);
let chat_store = Rc::new(RefCell::new(chat_store));
let mut ctx = ClientCtx::new(delivery, contact_reg, chat_store.clone());
chat_store
.borrow_mut()
.save_identity(&identity)
@ -69,13 +96,27 @@ impl<S: ChatStore> Context<S> {
let identity = Rc::new(identity);
let inbox = Inbox::new(Rc::clone(&chat_store), Rc::clone(&identity));
let mut pq_inbox = InboxV2::new();
pq_inbox.register(&mut ctx)?;
Self {
ctx.ds()
.subscribe(pq_inbox.delivery_address())
.map_err(ChatError::generic)?;
Ok(Self {
_identity: identity,
client_ctx: ctx,
pq_inbox,
inbox,
store: chat_store,
account: LogosAccount::new_test(name.as_str()),
}
})
}
/// Returns the unique identifier associated with the account
pub fn account_id(&self) -> String {
self.pq_inbox.account.friendly_name()
}
pub fn installation_name(&self) -> &str {
@ -96,7 +137,7 @@ impl<S: ChatStore> Context<S> {
.invite_to_private_convo(remote_bundle, content, Rc::clone(&self.store))
.unwrap_or_else(|_| todo!("Log/Surface Error"));
let remote_id = Inbox::<S>::inbox_identifier_for_key(*remote_bundle.installation_key());
let remote_id = Inbox::<CS>::inbox_identifier_for_key(*remote_bundle.installation_key());
let payload_bytes = payloads
.into_iter()
.map(|p| p.into_envelope(remote_id.clone()))
@ -106,6 +147,23 @@ impl<S: ChatStore> Context<S> {
Ok((convo_id, payload_bytes))
}
pub fn create_group_convo(
&mut self,
participants: &[&str],
) -> Result<Box<dyn GroupConvo<DS, RS, CS>>, ChatError> {
let mut convo = self.pq_inbox.create_group_v1(&mut self.client_ctx)?;
self.client_ctx
.store()
.save_conversation(&storage::ConversationMeta {
local_convo_id: convo.id().to_string(),
remote_convo_id: "0".into(),
kind: ConversationKind::GroupV1,
})?;
convo.add_member(&mut self.client_ctx, participants)?;
Ok(Box::new(convo))
}
pub fn list_conversations(&self) -> Result<Vec<ConversationIdOwned>, ChatError> {
let records = self.store.borrow().load_conversations()?;
Ok(records
@ -119,41 +177,47 @@ impl<S: ChatStore> Context<S> {
convo_id: ConversationId,
content: &[u8],
) -> Result<Vec<AddressedEnvelope>, ChatError> {
let convo = self.load_convo(convo_id)?;
match convo {
Conversation::Private(mut convo) => {
let payloads = convo.send_message(content)?;
let remote_id = convo.remote_id();
Ok(payloads
.into_iter()
.map(|p| p.into_envelope(remote_id.clone()))
.collect())
}
}
let mut convo = self.load_convo(convo_id)?;
let payloads = convo.send_message(content)?;
let remote_id = convo.remote_id();
Ok(payloads
.into_iter()
.map(|p| p.into_envelope(remote_id.clone()))
.collect())
}
// Decode bytes and send to protocol for processing.
pub fn handle_payload(&mut self, payload: &[u8]) -> Result<Option<ContentData>, ChatError> {
let env = EnvelopeV1::decode(payload)?;
let e2 = env.clone();
// TODO: Impl Conversation hinting
let convo_id = env.conversation_hint;
let enc = EncryptedPayload::decode(env.payload)?;
let a = self.pq_inbox.id();
match convo_id {
c if c == self.inbox.id() => self.dispatch_to_inbox(enc),
c if self.store.borrow().has_conversation(&c)? => self.dispatch_to_convo(&c, enc),
_ => Ok(None),
c if c == self.inbox.id() => self.dispatch_to_inbox(&env.payload),
c if c == self.pq_inbox.id() => self.dispatch_to_inbox2(&env.payload),
c if self.store.borrow().has_conversation(&c)? => {
self.dispatch_to_convo(&c, &env.payload)
}
_ => Ok(Some(ContentData {
conversation_id: "".into(),
data: vec![],
is_new_convo: false,
})),
}
}
// Dispatch encrypted payload to Inbox, and register the created Conversation
fn dispatch_to_inbox(
&mut self,
enc_payload: EncryptedPayload,
enc_payload_bytes: &[u8],
) -> Result<Option<ContentData>, ChatError> {
let public_key_hex = Inbox::<S>::extract_ephemeral_key_hex(&enc_payload)?;
// EncryptedPayloads are not used by GroupConvos at this time, else this can be performed in `handle_payload`
// TODO: (P1) reconcile envelope parsing between Covno and GroupConvo
let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?;
let public_key_hex = Inbox::<CS>::extract_ephemeral_key_hex(&enc_payload)?;
let (convo, content) =
self.inbox
.handle_frame(enc_payload, &public_key_hex, Rc::clone(&self.store))?;
@ -168,20 +232,22 @@ impl<S: ChatStore> Context<S> {
Ok(content)
}
// Dispatch encrypted payload to Inbox, and register the created Conversation
fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result<Option<ContentData>, ChatError> {
self.pq_inbox.handle_frame(&mut self.client_ctx, payload)?;
Ok(None)
}
// Dispatch encrypted payload to its corresponding conversation
fn dispatch_to_convo(
&mut self,
convo_id: ConversationId,
enc_payload: EncryptedPayload,
enc_payload_bytes: &[u8],
) -> Result<Option<ContentData>, ChatError> {
let convo = self.load_convo(convo_id)?;
match convo {
Conversation::Private(mut convo) => {
let result = convo.handle_frame(enc_payload)?;
Ok(result)
}
}
let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?;
let mut convo = self.load_convo(convo_id)?;
convo.handle_frame(enc_payload)
}
pub fn create_intro_bundle(&mut self) -> Result<Vec<u8>, ChatError> {
@ -190,7 +256,7 @@ impl<S: ChatStore> Context<S> {
}
/// Loads a conversation from DB by constructing it from metadata.
fn load_convo(&self, convo_id: ConversationId) -> Result<Conversation<S>, ChatError> {
fn load_convo(&mut self, convo_id: ConversationId) -> Result<Box<dyn Convo>, ChatError> {
let record = self
.store
.borrow()
@ -204,8 +270,37 @@ impl<S: ChatStore> Context<S> {
record.local_convo_id,
record.remote_convo_id,
)?;
Ok(Conversation::Private(private_convo))
Ok(Box::new(private_convo))
}
ConversationKind::GroupV1 => Ok(Box::new(
self.pq_inbox
.load_mls_convo(&mut self.client_ctx, record.local_convo_id)?,
)),
ConversationKind::Unknown(_) => Err(ChatError::BadBundleValue(format!(
"unsupported conversation type: {}",
record.kind.as_str()
))),
}
}
fn load_group_convo(
&mut self,
convo_id: ConversationId,
) -> Result<Box<dyn GroupConvo<DS, RS, CS>>, ChatError> {
let record = self
.store
.borrow()
.load_conversation(convo_id)?
.ok_or_else(|| ChatError::NoConvo(convo_id.into()))?;
match record.kind {
ConversationKind::PrivateV1 => {
Err(ChatError::NoConvo("This is not a group convo".into()))
}
ConversationKind::GroupV1 => Ok(Box::new(
self.pq_inbox
.load_mls_convo(&mut self.client_ctx, record.local_convo_id)?,
)),
ConversationKind::Unknown(_) => Err(ChatError::BadBundleValue(format!(
"unsupported conversation type: {}",
record.kind.as_str()
@ -214,17 +309,40 @@ impl<S: ChatStore> Context<S> {
}
}
impl<DS: DeliveryService, RS: RegistrationService, CS: ChatStore> GroupInitializer<DS, RS, CS>
for Context<DS, RS, CS>
{
fn on_new_group_convo(
&self,
convo: impl crate::conversation::GroupConvo<DS, RS, CS>,
) -> Result<(), ChatError> {
todo!()
}
}
#[cfg(test)]
mod tests {
use std::{
any::Any,
ops::{Deref, DerefMut},
};
use sqlite::{ChatStorage, StorageConfig};
use storage::{ConversationStore, IdentityStore};
use tempfile::tempdir;
use crate::{
test_utils::{EphemeralRegistry, LocalBroadcaster, MemStore},
utils::hex_trunc,
};
use super::*;
type TestContext = Context<LocalBroadcaster, EphemeralRegistry, ChatStorage>;
fn send_and_verify(
sender: &mut Context<ChatStorage>,
receiver: &mut Context<ChatStorage>,
sender: &mut TestContext,
receiver: &mut TestContext,
convo_id: ConversationId,
content: &[u8],
) {
@ -238,10 +356,171 @@ mod tests {
assert!(!received.is_new_convo); // Check that `is_new_convo` is FALSE
}
// Simple client Functionality for testing
struct Client {
inner: Context<LocalBroadcaster, EphemeralRegistry, MemStore>,
on_content: Option<Box<dyn Fn(ContentData)>>,
}
impl Client {
fn init(
ctx: Context<LocalBroadcaster, EphemeralRegistry, MemStore>,
cb: Option<impl Fn(ContentData) + 'static>,
) -> Self {
Client {
inner: ctx,
on_content: cb.map(|f| Box::new(f) as Box<dyn Fn(ContentData)>),
}
}
fn process_messages(&mut self) {
while let Some(data) = self.client_ctx.ds().poll() {
let res = self.handle_payload(&data).unwrap();
if let Some(cb) = &self.on_content {
match res {
Some(content_data) => cb(content_data),
None => continue,
}
}
}
}
fn convo(
&mut self,
convo_id: &str,
) -> Box<dyn GroupConvo<LocalBroadcaster, EphemeralRegistry, MemStore>> {
// TODO: (P1) Convos are being copied somewhere, which means hanging on to a reference causes state desync
self.load_group_convo(convo_id).unwrap()
}
}
impl Deref for Client {
type Target = Context<LocalBroadcaster, EphemeralRegistry, MemStore>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl DerefMut for Client {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
// Higher order function to handle printing
fn pretty_print(prefix: impl Into<String>) -> Box<dyn Fn(ContentData)> {
let prefix = prefix.into();
return Box::new(move |c: ContentData| {
let cid = hex_trunc(c.conversation_id.as_bytes());
let content = String::from_utf8(c.data).unwrap();
println!("{} ({}) {}", prefix, cid, content)
});
}
fn process(clients: &mut Vec<Client>) {
for client in clients {
client.process_messages();
}
}
#[test]
fn create_group() {
let ds = LocalBroadcaster::new();
let rs = EphemeralRegistry::new();
let saro_ctx =
Context::new_with_name("saro", ds.new_consumer(), rs.clone(), MemStore::new()).unwrap();
let raya_ctx =
Context::new_with_name("raya", ds.clone(), rs.clone(), MemStore::new()).unwrap();
let mut clients = vec![
Client::init(saro_ctx, Some(pretty_print(" Saro "))),
Client::init(raya_ctx, Some(pretty_print(" Raya "))),
];
const SARO: usize = 0;
const RAYA: usize = 1;
let raya_id = clients[RAYA].account_id();
let s_convo = clients[SARO]
.create_group_convo(&[raya_id.as_ref()])
.unwrap();
let CONVO_ID = s_convo.id();
// Raya can read this message because
// 1) It was sent after add_members was committed, and
// 2) LocalBroadcaster provides historical messages.
clients[SARO]
.convo(CONVO_ID)
.send_content(
&mut clients[SARO].client_ctx,
b"ok who broke the group chat again",
)
.unwrap();
// clients[SARO].process_messages();
process(&mut clients);
clients[RAYA]
.convo(CONVO_ID)
.send_content(
&mut clients[RAYA].client_ctx,
b"it was literally working five minutes ago",
)
.unwrap();
// clients[SARO].process_messages();
process(&mut clients);
let pax_ctx = Context::new_with_name("pax", ds, rs, MemStore::new()).unwrap();
clients.push(Client::init(pax_ctx, Some(pretty_print(" Pax"))));
const PAX: usize = 2;
let pax_id = clients[PAX].account_id();
clients[SARO]
.convo(CONVO_ID)
.add_member(&mut clients[SARO].client_ctx, &[pax_id.as_ref()])
.unwrap();
// clients[SARO].process_messages();
process(&mut clients);
clients[PAX]
.convo(CONVO_ID)
.send_content(
&mut clients[PAX].client_ctx,
b"ngl the key rotation is cooked",
)
.unwrap();
// clients[SARO].process_messages();
process(&mut clients);
clients[SARO]
.convo(CONVO_ID)
.send_content(
&mut clients[SARO].client_ctx,
b"bro we literally just added you to the group ",
)
.unwrap();
process(&mut clients);
// process(&mut clients);
}
#[test]
fn ctx_integration() {
let mut saro = Context::new_with_name("saro", ChatStorage::in_memory());
let mut raya = Context::new_with_name("raya", ChatStorage::in_memory());
let ds = LocalBroadcaster::new();
let rs = EphemeralRegistry::new();
let mut saro =
Context::new_with_name("saro", ds.clone(), rs.clone(), ChatStorage::in_memory())
.unwrap();
let mut raya = Context::new_with_name("raya", ds, rs, ChatStorage::in_memory()).unwrap();
// Raya creates intro bundle and sends to Saro
let bundle = raya.create_intro_bundle().unwrap();
@ -274,8 +553,10 @@ mod tests {
#[test]
fn identity_persistence() {
let ds = LocalBroadcaster::new();
let rs = EphemeralRegistry::new();
let store1 = ChatStorage::new(StorageConfig::InMemory).unwrap();
let ctx1 = Context::new_with_name("alice", store1);
let ctx1 = Context::new_with_name("alice", ds, rs, store1).unwrap();
let pubkey1 = ctx1._identity.public_key();
let name1 = ctx1.installation_name().to_string();
@ -291,8 +572,10 @@ mod tests {
let db_path = dir.path().join("chat.sqlite");
let db_path = db_path.to_string_lossy().into_owned();
let ds = LocalBroadcaster::new();
let rs = EphemeralRegistry::new();
let store = ChatStorage::new(StorageConfig::File(db_path.clone())).unwrap();
let ctx = Context::new_from_store("alice", store).unwrap();
let ctx = Context::new_from_store("alice", ds, rs, store).unwrap();
let pubkey = ctx._identity.public_key();
drop(ctx);
@ -305,8 +588,12 @@ mod tests {
#[test]
fn conversation_metadata_persistence() {
let mut alice = Context::new_with_name("alice", ChatStorage::in_memory());
let mut bob = Context::new_with_name("bob", ChatStorage::in_memory());
let ds = LocalBroadcaster::new();
let rs = EphemeralRegistry::new();
let mut alice =
Context::new_with_name("alice", ds.clone(), rs.clone(), ChatStorage::in_memory())
.unwrap();
let mut bob = Context::new_with_name("bob", ds, rs, ChatStorage::in_memory()).unwrap();
let bundle = alice.create_intro_bundle().unwrap();
let intro = Introduction::try_from(bundle.as_slice()).unwrap();
@ -323,8 +610,12 @@ mod tests {
#[test]
fn conversation_full_flow() {
let mut alice = Context::new_with_name("alice", ChatStorage::in_memory());
let mut bob = Context::new_with_name("bob", ChatStorage::in_memory());
let ds = LocalBroadcaster::new();
let rs = EphemeralRegistry::new();
let mut alice =
Context::new_with_name("alice", ds.clone(), rs.clone(), ChatStorage::in_memory())
.unwrap();
let mut bob = Context::new_with_name("bob", ds, rs, ChatStorage::in_memory()).unwrap();
let bundle = alice.create_intro_bundle().unwrap();
let intro = Introduction::try_from(bundle.as_slice()).unwrap();
@ -364,4 +655,38 @@ mod tests {
.expect("bob should receive");
assert_eq!(content.data, b"alice reply");
}
#[test]
fn bcast_test() {
let mut a = LocalBroadcaster::new();
let mut b = a.new_consumer();
a.subscribe("a".into()).unwrap();
b.subscribe("b".into()).unwrap();
{
let e = AddressedEnvelope {
delivery_address: "a".into(),
data: (1..4).collect(),
};
a.publish(e.clone()).unwrap();
let result = a.poll();
assert!(result.unwrap() == e.data);
assert!(a.poll().is_none());
}
{
let e = AddressedEnvelope {
delivery_address: "b".into(),
data: (4..10).collect(),
};
a.publish(e.clone()).unwrap();
dbg!(&b);
let result = b.poll();
assert!(result.unwrap() == e.data);
assert!(b.poll().is_none());
}
}
}

View File

@ -1,12 +1,18 @@
pub mod group_v1;
mod privatev1;
use crate::types::{AddressedEncryptedPayload, ContentData};
use crate::{
DeliveryService, RegistrationService,
ctx::ClientCtx,
types::{AddressedEncryptedPayload, ContentData},
};
use chat_proto::logoschat::encryption::EncryptedPayload;
use std::fmt::Debug;
use std::sync::Arc;
use storage::{ConversationKind, ConversationStore, RatchetStore};
use storage::{ChatStore, ConversationKind, ConversationStore, RatchetStore};
pub use crate::errors::ChatError;
pub use group_v1::{GroupV1Convo, IdentityProvider, LogosMlsProvider};
pub use privatev1::PrivateV1Convo;
pub type ConversationId<'a> = &'a str;
@ -36,6 +42,29 @@ pub trait Convo: Id + Debug {
fn convo_type(&self) -> ConversationKind;
}
pub trait GroupConvo<DS: DeliveryService, RS: RegistrationService, CS: ChatStore>: Convo {
fn add_member(
&mut self,
ctx: &mut ClientCtx<DS, RS, CS>,
members: &[&str],
) -> Result<(), ChatError>;
// Default implementation which dispatches envelopes to the DeliveryService
fn send_content(
&mut self,
ctx: &mut ClientCtx<DS, RS, CS>,
content: &[u8],
) -> Result<(), ChatError> {
let payloads = self.send_message(content)?;
for payload in payloads {
ctx.ds()
.publish(payload.into_envelope(self.id().into()))
.map_err(|e| ChatError::Delivery(e.to_string()))?;
}
Ok(())
}
}
pub enum Conversation<S: ConversationStore + RatchetStore> {
Private(PrivateV1Convo<S>),
}

View File

@ -0,0 +1,433 @@
use std::cell::{Ref, RefCell};
use std::rc::Rc;
use blake2::{Blake2b, Digest, digest::consts::U6};
use crypto::Ed25519VerifyingKey;
use openmls::prelude::*;
use openmls::{prelude::tls_codec::Deserialize, treesync::RatchetTree};
use openmls_libcrux_crypto::Provider as LibcruxProvider;
use openmls::prelude::MlsMessageBodyIn;
use openmls_traits::signatures::Signer as OpenMlsSigner;
use openmls_traits::storage::StorageProvider;
use prost::Message;
use crate::{
AddressedEnvelope, DeliveryService, RegistrationService,
conversation::{ChatError, ConversationId, Convo, GroupConvo, Id},
ctx::ClientCtx,
types::{AddressedEncryptedPayload, ContentData},
};
use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload};
use storage::{ChatStore, ConversationKind};
pub trait IdentityProvider: OpenMlsSigner {
fn friendly_name(&self) -> String;
fn public_key(&self) -> Ed25519VerifyingKey;
// fn installation_key() -> u8;
}
pub trait MlsInitializer {
fn invite_to_group_v1<DS: DeliveryService, RS: RegistrationService, CS: ChatStore>(
&self,
ctx: &mut ClientCtx<DS, RS, CS>,
account_id: &str,
welcome: &MlsMessageOut,
// ratchet_tree: RatchetTree, // Embedded
) -> Result<(), ChatError>;
}
pub trait MlsCtx {
type IDENT: IdentityProvider;
type INIT: MlsInitializer;
fn ident(&self) -> &Self::IDENT;
fn provider(&self) -> Ref<'_, LibcruxProvider>;
fn init(&self) -> &Self::INIT;
// Build an MLS Credential from the supplied IdentityProvider
fn get_credential(&self) -> CredentialWithKey;
}
pub trait LogosMlsProvider: OpenMlsProvider {}
pub trait GroupMlsStorageV1 {
fn save_state(&self, state: &[u8]);
fn load_state(&self) -> Vec<u8>;
}
pub struct GroupV1Convo<Ctx: MlsCtx> {
ctx: Rc<RefCell<Ctx>>,
pub(crate) mls_group: MlsGroup, // TODO: (!) Fix Visibility
convo_id: String,
}
impl<Ctx: MlsCtx> std::fmt::Debug for GroupV1Convo<Ctx> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GroupV1Convo")
.field("name", &self.ctx.borrow().ident().friendly_name())
.field("convo_id", &self.convo_id)
.field("mls_epoch", &self.mls_group.epoch())
.finish_non_exhaustive()
}
}
impl<Ctx: MlsCtx> GroupV1Convo<Ctx> {
pub fn new<DS: DeliveryService>(ctx: Ctx, ds: &mut DS) -> Self {
let config = Self::mls_create_config();
let ctx = Rc::new(RefCell::new(ctx));
let mls_group = {
let ctx_ref = ctx.borrow();
MlsGroup::new(
&*ctx_ref.provider(),
ctx_ref.ident(),
&config,
ctx_ref.get_credential(),
)
.unwrap()
};
let convo_id = hex::encode(mls_group.group_id().as_slice());
Self::subscribe(ds, &convo_id);
println!(
"@ Create Convo: {}. {}. d:{} dc:{}",
ctx.borrow().ident().friendly_name(),
convo_id,
Self::delivery_address_from_id(&convo_id),
Self::ctrl_delivery_address_from_id(&convo_id)
);
Self {
ctx,
mls_group,
convo_id,
}
}
pub fn new_from_welcome<DS: DeliveryService>(
ctx: Rc<RefCell<Ctx>>,
ds: &mut DS,
welcome: Welcome,
) -> Self {
let mls_group = {
let ctx_borrow = ctx.borrow();
let provider = ctx_borrow.provider();
StagedWelcome::build_from_welcome(&*provider, &Self::mls_join_config(), welcome)
.unwrap()
.build()
.unwrap()
.into_group(&*provider)
.unwrap()
};
let convo_id = hex::encode(mls_group.group_id().as_slice());
Self::subscribe(ds, &convo_id);
println!(
"@ Welcome Convo: I:{}. {}. d:{} dc:{}",
ctx.borrow().ident().friendly_name(),
convo_id,
Self::delivery_address_from_id(&convo_id),
Self::ctrl_delivery_address_from_id(&convo_id)
);
GroupV1Convo {
ctx,
mls_group,
convo_id,
}
}
pub fn load<DS: DeliveryService>(
ctx: Rc<RefCell<Ctx>>,
ds: &mut DS,
convo_id: String,
group_id: GroupId,
) -> Result<Self, ChatError> {
let Some(mls_group) = MlsGroup::load(ctx.borrow().provider().storage(), &group_id)
.map_err(ChatError::generic)?
else {
return Err(ChatError::NoConvo("mls group not found".into()));
};
// println!(
// "\n>>> {}. {:?}",
// ctx.borrow().ident().friendly_name(),
// mls_group
// );
Self::subscribe(ds, &convo_id);
Ok(GroupV1Convo {
ctx,
mls_group,
convo_id,
})
}
fn subscribe<DS: DeliveryService>(ds: &mut DS, convo_id: &str) -> Result<(), ChatError> {
ds.subscribe(Self::delivery_address_from_id(&convo_id))
.map_err(ChatError::generic)?;
ds.subscribe(Self::ctrl_delivery_address_from_id(&convo_id))
.map_err(ChatError::generic)?;
Ok(())
}
pub fn ratchet_tree(&self) -> RatchetTree {
self.mls_group.export_ratchet_tree()
}
fn mls_create_config() -> MlsGroupCreateConfig {
MlsGroupCreateConfig::builder()
.ciphersuite(Ciphersuite::MLS_256_XWING_CHACHA20POLY1305_SHA256_Ed25519)
.use_ratchet_tree_extension(true) // This is handy for now, until there is central store for this data
.build()
}
fn mls_join_config() -> MlsGroupJoinConfig {
MlsGroupJoinConfig::builder().build()
}
fn delivery_address_from_id(convo_id: &str) -> String {
let hash = Blake2b::<U6>::new()
.chain_update("delivery_addr|")
.chain_update(convo_id)
.finalize();
hex::encode(hash)
}
fn delivery_address(&self) -> String {
Self::delivery_address_from_id(&self.convo_id)
}
fn ctrl_delivery_address_from_id(convo_id: &str) -> String {
let hash = Blake2b::<U6>::new()
.chain_update("ctrl_delivery_addr|")
.chain_update(convo_id)
.finalize();
hex::encode(hash)
}
fn ctrl_delivery_address(&self) -> String {
Self::ctrl_delivery_address_from_id(&self.convo_id)
}
fn key_package_for_account<DS: DeliveryService, RS: RegistrationService, CS: ChatStore>(
&self,
ctx: &mut ClientCtx<DS, RS, CS>,
ident: &str,
) -> Result<KeyPackage, ChatError> {
let retrieved_bytes = ctx
.contact_registry()
.retreive(ident)
.map_err(|e| ChatError::Generic(e.to_string()))?;
// dbg!(ctx.contact_registry());
let Some(keypkg_bytes) = retrieved_bytes else {
return Err(ChatError::Protocol("Contact Not Found".into()));
};
let key_package_in = KeyPackageIn::tls_deserialize(&mut keypkg_bytes.as_slice())?;
let keypkg = key_package_in.validate(
self.ctx.borrow().provider().crypto(),
ProtocolVersion::Mls10,
)?; //TODO: P3 - Hardcoded Protocol Version
Ok(keypkg)
}
fn save_state<CS: ChatStore>(&self, store: &CS) {}
}
impl<Ctx: MlsCtx> Id for GroupV1Convo<Ctx> {
fn id(&self) -> ConversationId<'_> {
&self.convo_id
}
}
impl<Ctx: MlsCtx> Convo for GroupV1Convo<Ctx> {
fn send_message(
&mut self,
content: &[u8],
) -> Result<Vec<AddressedEncryptedPayload>, ChatError> {
let ctx_ref = self.ctx.borrow();
let provider = ctx_ref.provider();
let mls_message_out = self
.mls_group
.create_message(&*provider, ctx_ref.ident(), content)
.unwrap();
let a = AddressedEncryptedPayload {
delivery_address: self.delivery_address(),
data: EncryptedPayload {
encryption: Some(
chat_proto::logoschat::encryption::encrypted_payload::Encryption::Plaintext(
Plaintext {
payload: mls_message_out.to_bytes().unwrap().into(),
},
),
),
},
};
Ok(vec![a])
}
fn handle_frame(
&mut self,
encoded_payload: EncryptedPayload,
) -> Result<Option<ContentData>, ChatError> {
use chat_proto::logoschat::encryption::encrypted_payload::Encryption;
let bytes = match encoded_payload.encryption {
Some(Encryption::Plaintext(pt)) => pt.payload,
_ => {
return Err(ChatError::ProtocolExpectation(
"None",
"Some(Encryption::Plaintext)".into(),
));
}
};
let mls_message =
MlsMessageIn::tls_deserialize_exact_bytes(&bytes).map_err(ChatError::generic)?;
let protocol_message: ProtocolMessage = mls_message
.try_into_protocol_message()
.map_err(ChatError::generic)?;
let ctx_borrow = self.ctx.borrow();
let provider = ctx_borrow.provider();
if protocol_message.epoch() < self.mls_group.epoch() {
// TODO: (!) Determine how to handle messages for old epochs. Minimally log this.
return Ok(None);
}
let processed = self
.mls_group
.process_message(&*provider, protocol_message)
.map_err(ChatError::generic)?;
match processed.into_content() {
ProcessedMessageContent::ApplicationMessage(msg) => Ok(Some(ContentData {
conversation_id: hex::encode(self.mls_group.group_id().as_slice()),
data: msg.into_bytes(),
is_new_convo: false,
})),
ProcessedMessageContent::StagedCommitMessage(commit) => {
self.mls_group
.merge_staged_commit(&*provider, *commit)
.map_err(ChatError::generic)?;
Ok(None)
}
x => {
println!("Unhabled Message ttype {:?}", x);
Ok(None)
}
}
}
fn remote_id(&self) -> String {
// "group_remote_id".into()
todo!()
}
fn convo_type(&self) -> storage::ConversationKind {
ConversationKind::GroupV1
}
}
impl<Ctx: MlsCtx, DS: DeliveryService, RS: RegistrationService, CS: ChatStore>
GroupConvo<DS, RS, CS> for GroupV1Convo<Ctx>
{
fn add_member(
&mut self,
ctx: &mut ClientCtx<DS, RS, CS>,
members: &[&str],
) -> Result<(), ChatError> {
// add_members returns:
// commit — the Commit message Alice broadcasts to all members
// welcome — the Welcome message sent privately to each new joiner
// _group_info — used for external joins; ignore for now
let ctx_ref = self.ctx.borrow();
let provider = ctx_ref.provider();
if members.len() > 50 {
// This is a temporary limit that originates from the the De-MLS epoch time.
return Err(ChatError::Protocol(
"Cannot add more than 50 Members at a time".into(),
));
}
// Get the Keypacakages and transpose any errors.
// The account_id is kept so invites can be addressed properly
let keypkgs = members
.iter()
// .map(|ident| self.key_package_for_account(ctx, ident))
.map(|ident| self.key_package_for_account(ctx, ident))
.collect::<Result<Vec<_>, ChatError>>()?;
let (commit, welcome, _group_info) = self
.mls_group
.add_members(&*provider, ctx_ref.ident(), keypkgs.iter().as_slice())
.unwrap();
self.mls_group.merge_pending_commit(&*provider).unwrap();
// TODO: (P3) Evaluate privacy/performance implications of an aggregated Welcome for multiple users
for account_id in members {
ctx_ref
.init()
.invite_to_group_v1(ctx, account_id, &welcome)?;
}
let encrypted_payload = EncryptedPayload {
encryption: Some(encrypted_payload::Encryption::Plaintext(Plaintext {
payload: commit.to_bytes()?.into(),
})),
};
let addr_enc_payload = AddressedEncryptedPayload {
delivery_address: self.ctrl_delivery_address(),
data: encrypted_payload,
};
// Prepare commit message
// TODO: (P1) Make GroupConvos agnostic to framing so its less error prone and more
let env = addr_enc_payload.into_envelope(self.convo_id.clone());
ctx.ds()
.publish(env)
.map_err(|e| ChatError::Generic(format!("Publish: {e}")))
}
}
use prost::Oneof;
#[derive(Clone, PartialEq, Message)]
pub struct GroupV1Frame {
#[prost(string, tag = "1")]
pub sender: String,
#[prost(uint64, tag = "2")]
pub timestamp: u64,
// oneof field — optional, holds one variant
#[prost(oneof = "FrameType", tags = "3, 4, 5")]
pub payload: Option<FrameType>,
}
#[derive(Clone, PartialEq, Oneof)]
pub enum FrameType {
#[prost(bytes, tag = "3")]
Welcome(Vec<u8>),
}
#[cfg(test)]
mod tests {
use crypto::PrivateKey;
use super::*;
#[test]
fn test_mls() {}
}

View File

@ -0,0 +1,36 @@
use std::{
cell::{Ref, RefCell, RefMut},
rc::Rc,
};
use storage::ChatStore;
use crate::{DeliveryService, RegistrationService};
pub struct ClientCtx<DS: DeliveryService, RS: RegistrationService, CS: ChatStore> {
ds: DS,
contact_registry: RS,
convo_store: Rc<RefCell<CS>>, // TODO: (P2) Remove Rc/Refcell
}
impl<'a, DS: DeliveryService, RS: RegistrationService, CS: ChatStore> ClientCtx<DS, RS, CS> {
pub fn new(ds: DS, contact_registry: RS, convo_store: Rc<RefCell<CS>>) -> Self {
Self {
ds,
contact_registry,
convo_store,
}
}
pub fn ds(&'a mut self) -> &'a mut DS {
&mut self.ds
}
pub fn contact_registry(&'a mut self) -> &'a mut RS {
&mut self.contact_registry
}
pub fn store(&'a self) -> RefMut<CS> {
self.convo_store.borrow_mut()
}
}

View File

@ -1,3 +1,4 @@
use openmls::{framing::errors::MlsMessageError, prelude::tls_codec};
pub use thiserror::Error;
use storage::StorageError;
@ -26,6 +27,23 @@ pub enum ChatError {
UnsupportedConvoType(String),
#[error("storage error: {0}")]
Storage(#[from] StorageError),
#[error("mls error: {0}")]
MlsMessageError(#[from] MlsMessageError),
#[error("TlsCodec: {0}")]
TlsCodec(#[from] tls_codec::Error),
#[error("generic: {0}")]
Generic(String),
#[error("KeyPackage: {0}")]
KeyPackage(#[from] openmls::prelude::KeyPackageVerifyError),
#[error("Delivery: {0}")]
Delivery(String),
}
impl ChatError {
// This is a stopgap until there is a proper error system in place
pub fn generic(e: impl ToString) -> Self {
Self::Generic(e.to_string())
}
}
#[derive(Error, Debug)]

View File

@ -0,0 +1,35 @@
use std::{cell::RefCell, fmt::Debug, fmt::Display, rc::Rc};
use crate::types::AddressedEnvelope;
pub struct Service<T> {
inner: Rc<RefCell<T>>,
}
impl<T> Service<T> {
pub fn new(t: T) -> Self {
Self {
inner: Rc::new(RefCell::new(t)),
}
}
fn with<F, R>(&self, f: F) -> R
where
F: FnOnce(&T) -> R,
{
let inner = self.inner.borrow();
f(&inner)
}
}
pub trait DeliveryService {
type Error: Display;
fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Self::Error>;
fn subscribe(&mut self, delivery_address: String) -> Result<(), Self::Error>;
}
pub trait RegistrationService: Debug {
type Error: Display;
fn register(&mut self, identity: String, key_bundle: Vec<u8>) -> Result<(), Self::Error>;
fn retreive(&self, identity: &str) -> Result<Option<Vec<u8>>, Self::Error>;
}

View File

@ -0,0 +1,383 @@
use std::any::Any;
use std::cell::{Ref, RefCell};
use std::collections::HashMap;
use std::rc::Rc;
use chat_proto::logoschat::envelope::EnvelopeV1;
use crypto::Ed25519SigningKey;
use crypto::Ed25519VerifyingKey;
use crypto::PublicKey;
use openmls::prelude::tls_codec::Serialize;
use openmls::{prelude::*, treesync::RatchetTree};
use openmls_libcrux_crypto::Provider as LibcruxProvider;
use openmls_traits::signatures::Signer;
use openmls_traits::storage::StorageProvider;
use prost::{Message, Oneof};
use std::sync::atomic::{AtomicUsize, Ordering};
use storage::ChatStore;
use storage::ConversationMeta;
use storage::ConversationStore;
use crate::AddressedEnvelope;
use crate::ChatError;
use crate::DeliveryService;
use crate::RegistrationService;
use crate::conversation::GroupConvo;
use crate::conversation::group_v1::{MlsCtx, MlsInitializer};
use crate::conversation::{GroupV1Convo, IdentityProvider};
use crate::ctx::ClientCtx;
use crate::types::AddressedEncryptedPayload;
use crate::utils::hash_size::Testing;
use crate::utils::{blake2b_hex, hash_size, hex_trunc};
static ACCOUNT_COUNTER: AtomicUsize = AtomicUsize::new(0);
const ACCOUNT_NAMES: &[&str] = &["Saro", "Raya", "Pax"];
#[derive(Clone)]
pub struct LogosAccount {
id: String,
signing_key: Ed25519SigningKey,
// x25519_key: crypto::PrivateKey,
}
impl LogosAccount {
pub fn new() -> Self {
let idx = ACCOUNT_COUNTER.fetch_add(1, Ordering::Relaxed);
let id = if idx < ACCOUNT_NAMES.len() {
ACCOUNT_NAMES[idx % ACCOUNT_NAMES.len()].to_string()
} else {
use rand_core::{OsRng, RngCore};
const CHARSET: &[u8] =
b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
let i: String = (0..8)
.map(|_| {
let idx = (OsRng.next_u32() as usize) % CHARSET.len();
CHARSET[idx] as char
})
.collect();
i
};
Self {
id,
signing_key: Ed25519SigningKey::generate(),
// x25519_key: crypto::PrivateKey::random(),
}
}
}
impl Signer for LogosAccount {
fn sign(&self, payload: &[u8]) -> Result<Vec<u8>, openmls_traits::signatures::SignerError> {
Ok(self.signing_key.sign(payload).as_ref().to_vec())
}
fn signature_scheme(&self) -> SignatureScheme {
SignatureScheme::ED25519
}
}
impl IdentityProvider for LogosAccount {
fn friendly_name(&self) -> String {
self.id.clone()
}
fn public_key(&self) -> Ed25519VerifyingKey {
self.signing_key.verifying_key()
}
}
#[derive(Clone)]
pub struct MlsContext<Init: MlsInitializer> {
pub ident_provider: LogosAccount,
pub initializer: Init,
provider: Rc<RefCell<LibcruxProvider>>,
}
impl<Init: MlsInitializer + Clone> MlsCtx for MlsContext<Init> {
type IDENT = LogosAccount;
type INIT = Init;
fn ident(&self) -> &LogosAccount {
&self.ident_provider
}
fn provider(&self) -> Ref<'_, LibcruxProvider> {
self.provider.borrow()
}
fn init(&self) -> &Init {
&self.initializer
}
// Build an MLS Credential from the supplied IdentityProvider
fn get_credential(&self) -> CredentialWithKey {
CredentialWithKey {
credential: BasicCredential::new(self.ident_provider.friendly_name().into()).into(),
signature_key: self.ident_provider.public_key().as_ref().into(),
}
}
}
pub trait GroupInitializer<DS: DeliveryService, RS: RegistrationService, CS: ChatStore> {
fn on_new_group_convo(&self, convo: impl GroupConvo<DS, RS, CS>) -> Result<(), ChatError>;
}
#[derive(Clone)]
pub struct InboxV2 {
pub account: LogosAccount, // TODO: (!) don't expose account
mls_provider: Rc<RefCell<LibcruxProvider>>,
convo_map: HashMap<String, Vec<u8>>,
}
impl<'a> InboxV2 {
pub fn new() -> Self {
let account = LogosAccount::new();
let mls_provider = Rc::new(RefCell::new(LibcruxProvider::new().unwrap()));
Self {
account,
mls_provider,
convo_map: HashMap::new(),
}
}
pub fn register<DS: DeliveryService, RS: RegistrationService, CS: ChatStore>(
&mut self,
ctx: &mut ClientCtx<DS, RS, CS>,
) -> Result<(), ChatError> {
let keypackage = self.create_keypackage()?;
let bytes = keypackage.tls_serialize_detached()?;
ctx.contact_registry()
.register(self.account.friendly_name(), bytes)
.map_err(ChatError::generic)?; //TODO: (P1) create an address scheme instead of using names
Ok(())
}
pub fn delivery_address(&self) -> String {
Self::delivery_address_for_account_id(&self.account.id)
}
pub fn id(&self) -> String {
Self::conversation_id_for_account_id(&self.account.id)
}
pub fn create_group_v1<DS: DeliveryService, RS: RegistrationService, CS: ChatStore>(
&self,
ctx: &mut ClientCtx<DS, RS, CS>,
) -> Result<GroupV1Convo<MlsContext<InboxV2>>, ChatError> {
let convo = GroupV1Convo::new(self.assemble_ctx(), ctx.ds());
Ok(convo)
}
pub fn handle_frame<DS: DeliveryService, RS: RegistrationService, CS: ChatStore>(
&self,
ctx: &mut ClientCtx<DS, RS, CS>,
payload_bytes: &[u8],
) -> Result<(), ChatError> {
let inbox_frame = InboxV2Frame::decode(payload_bytes)?;
let Some(payload) = inbox_frame.payload else {
return Err(ChatError::BadParsing("InboxV2Payload missing"));
};
match payload {
InviteType::GroupV1(group_v1_heavy_invite) => {
self.handle_heavy_invite(ctx, group_v1_heavy_invite)
}
}
}
fn assemble_ctx(&self) -> MlsContext<InboxV2> {
MlsContext {
ident_provider: self.account.clone(),
initializer: self.clone(),
provider: self.mls_provider.clone(),
}
}
fn persist_convo<DS: DeliveryService, RS: RegistrationService, CS: ChatStore>(
&self,
ctx: &'a ClientCtx<DS, RS, CS>,
convo: impl GroupConvo<DS, RS, CS>,
) -> Result<(), ChatError> {
// TODO: (P2) Remove remote_convo_id this is an implementation detail specific to PrivateV1
// TODO: (P3) Implement From<Convo> for ConversationMeta
let meta = ConversationMeta {
local_convo_id: convo.id().to_string(),
remote_convo_id: "0".into(),
kind: storage::ConversationKind::GroupV1,
};
ctx.store().save_conversation(&meta)?;
// TODO: (P1) Persist state
Ok(())
}
fn handle_heavy_invite<DS: DeliveryService, RS: RegistrationService, CS: ChatStore>(
&self,
ctx: &mut ClientCtx<DS, RS, CS>,
invite: GroupV1HeavyInvite,
) -> Result<(), ChatError> {
let (msg_in, rest) = MlsMessageIn::tls_deserialize_bytes(invite.welcome_bytes.as_slice())?;
let MlsMessageBodyIn::Welcome(welcome) = msg_in.extract() else {
return Err(ChatError::ProtocolExpectation(
"something else",
"Welcome".into(),
));
};
let mls_ctx = Rc::new(RefCell::new(self.assemble_ctx()));
let convo = GroupV1Convo::new_from_welcome(mls_ctx, ctx.ds(), welcome);
self.persist_convo(ctx, convo)
}
fn create_keypackage(&self) -> Result<KeyPackage, ChatError> {
let mls_ctx = self.assemble_ctx();
let capabilities = Capabilities::builder()
.ciphersuites(vec![
Ciphersuite::MLS_256_XWING_CHACHA20POLY1305_SHA256_Ed25519,
])
.extensions(vec![ExtensionType::ApplicationId])
.build();
let a = KeyPackage::builder()
.leaf_node_capabilities(capabilities)
.build(
Ciphersuite::MLS_256_XWING_CHACHA20POLY1305_SHA256_Ed25519,
&*mls_ctx.provider(),
&self.account,
mls_ctx.get_credential(),
)
.expect("Failed to build KeyPackage");
Ok(a.key_package().clone())
}
fn delivery_address_for_account_id(account_id: &str) -> String {
blake2b_hex::<hash_size::AccountId>(&["InboxV2|", "delivery_address|", account_id])
}
fn conversation_id_for_account_id(account_id: &str) -> String {
blake2b_hex::<hash_size::Testing>(&["InboxV2|", "conversation_id|", account_id])
}
fn dbg_mls_store(ctx: &MlsContext<InboxV2>, prefix: impl AsRef<str>) {
let pa = ctx.provider.borrow();
let data = &*pa.storage().values.read().unwrap();
println!(":::MlsProviderStore::: -- {}", prefix.as_ref());
for key in data.keys() {
let val = match data.get(key) {
Some(x) => format!("{} ({})", hex_trunc(x), blake2b_hex::<Testing>(&[x])),
None => "None".into(),
};
println!(". {:?}: {:?}", hex_trunc(key), val)
}
}
pub fn load_mls_convo<DS: DeliveryService, RS: RegistrationService, CS: ChatStore>(
&self,
ctx: &mut ClientCtx<DS, RS, CS>,
convo_id: String,
) -> Result<GroupV1Convo<MlsContext<InboxV2>>, ChatError> {
let mls_ctx = self.assemble_ctx();
let group_id_bytes = hex::decode(&convo_id).map_err(ChatError::generic)?;
let group_id = GroupId::from_slice(&group_id_bytes);
let convo =
GroupV1Convo::load(Rc::new(RefCell::new(mls_ctx)), ctx.ds(), convo_id, group_id)?;
Ok(convo)
}
}
impl MlsInitializer for InboxV2 {
fn invite_to_group_v1<DS: DeliveryService, RS: RegistrationService, CS: ChatStore>(
&self,
ctx: &mut ClientCtx<DS, RS, CS>,
account_id: &str,
welcome: &MlsMessageOut,
) -> Result<(), ChatError> {
let invite = GroupV1HeavyInvite {
welcome_bytes: welcome.to_bytes()?,
};
let frame = InboxV2Frame {
payload: Some(InviteType::GroupV1(invite)),
};
let envelope = EnvelopeV1 {
conversation_hint: Self::conversation_id_for_account_id(account_id),
salt: 0,
payload: frame.encode_to_vec().into(),
};
let outbound_msg = AddressedEnvelope {
delivery_address: Self::delivery_address_for_account_id(account_id),
data: envelope.encode_to_vec(),
};
ctx.ds().publish(outbound_msg).map_err(ChatError::generic)?;
Ok(())
}
}
#[derive(Clone, PartialEq, Message)]
pub struct InboxV2Frame {
#[prost(oneof = "InviteType", tags = "1")]
pub payload: Option<InviteType>,
}
#[derive(Clone, PartialEq, Oneof)]
pub enum InviteType {
#[prost(message, tag = "1")]
GroupV1(GroupV1HeavyInvite),
}
#[derive(Clone, PartialEq, Message)]
pub struct GroupV1HeavyInvite {
#[prost(bytes, tag = "1")]
pub welcome_bytes: Vec<u8>,
}
#[cfg(test)]
mod tests {
use super::*;
use openmls_traits::signatures::Signer;
struct Account {
name: String,
signing_key: crypto::Ed25519SigningKey,
}
impl Signer for Account {
fn sign(&self, payload: &[u8]) -> Result<Vec<u8>, openmls_traits::signatures::SignerError> {
Ok(self.signing_key.sign(payload).as_ref().to_vec())
}
fn signature_scheme(&self) -> SignatureScheme {
SignatureScheme::ED25519
}
}
impl IdentityProvider for Account {
fn friendly_name(&self) -> String {
self.name.clone()
}
fn public_key(&self) -> Ed25519VerifyingKey {
todo!()
}
}
#[test]
fn dev() {
// let inbox = InboxV2::new(...);
// let group = inbox.create_group_v1().unwrap();
// let bytes = group.send("hello".as_bytes());
}
}

View File

@ -2,15 +2,22 @@ mod account;
mod context;
mod conversation;
mod crypto;
mod ctx;
mod errors;
mod external_traits;
mod inbox;
mod inbox_v2;
mod proto;
mod types;
mod utils;
pub use account::LogosAccount;
#[cfg(test)]
mod test_utils;
pub use context::{Context, ConversationIdOwned, Introduction};
pub use errors::ChatError;
pub use external_traits::{DeliveryService, RegistrationService};
pub use sqlite::ChatStorage;
pub use sqlite::StorageConfig;
pub use types::{AddressedEnvelope, ContentData};

View File

@ -0,0 +1,310 @@
use std::{
cell::RefCell,
collections::{HashMap, HashSet, VecDeque},
fmt::Debug,
io::Cursor,
rc::Rc,
sync::{Arc, Mutex},
};
use storage::{ChatStore, ConversationMeta, ConversationStore, IdentityStore};
use storage::{EphemeralKeyStore, RatchetStore};
use crate::{
AddressedEnvelope, DeliveryService, RegistrationService,
utils::{blake2b_hex, hash_size::Testing, hex_trunc},
};
type Callback = Rc<dyn Fn(String, &Vec<u8>)>;
type Filter = Box<dyn Fn(&Vec<u8>) -> bool>;
#[derive(Debug)]
struct BroadcasterShared<T> {
/// Per-address message queue; all published messages are appended here.
messages: VecDeque<T>,
base_index: usize,
}
impl<T> BroadcasterShared<T> {
pub fn read(&self, cursor: usize) -> Option<&T> {
self.messages.get(cursor + self.base_index)
}
pub fn tail(&self) -> usize {
self.messages.len() + self.base_index
}
}
#[derive(Clone, Debug)]
pub struct LocalBroadcaster {
shared: Rc<RefCell<BroadcasterShared<AddressedEnvelope>>>,
cursor: usize,
subscriptions: HashSet<String>,
outbound_msgs: Vec<String>,
}
impl LocalBroadcaster {
pub fn new() -> Self {
let shared = Rc::new(RefCell::new(BroadcasterShared {
messages: VecDeque::new(),
base_index: 0,
}));
let cursor = shared.borrow().tail();
Self {
shared,
cursor,
subscriptions: HashSet::new(),
outbound_msgs: Vec::new(),
}
}
/// Returns a new consumer that shares the same message store but has its
/// own independent cursor — it starts from the beginning of each address
/// queue regardless of what any other consumer has already processed.
pub fn new_consumer(&self) -> Self {
let mut inner = self.shared.clone();
let cursor = inner.borrow().tail();
Self {
shared: inner,
cursor,
subscriptions: HashSet::new(),
outbound_msgs: Vec::new(),
}
}
/// Pulls all messages this consumer has not yet seen on `address`,
/// applying any registered filter. Advances the cursor so the same
/// messages are not returned again.
pub fn poll(&mut self) -> Option<Vec<u8>> {
loop {
let next = self.cursor;
match self.shared.borrow().read(next) {
None => return None,
Some(ae) => {
self.cursor = next + 1;
if self.subscriptions.contains(ae.delivery_address.as_str())
&& self.is_inbound(ae)
{
return Some(ae.data.clone());
}
}
}
}
}
pub fn clear(&mut self) {
self.cursor = self.shared.borrow().tail();
}
fn msg_id(msg: &AddressedEnvelope) -> String {
blake2b_hex::<Testing>(&[msg.data.as_slice()])
}
fn is_inbound(&self, msg: &AddressedEnvelope) -> bool {
let mid = Self::msg_id(msg);
!self.outbound_msgs.contains(&mid)
}
}
impl DeliveryService for LocalBroadcaster {
type Error = String;
fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Self::Error> {
self.outbound_msgs.push(Self::msg_id(&envelope));
self.shared.borrow_mut().messages.push_back(envelope);
Ok(())
}
fn subscribe(&mut self, delivery_address: String) -> Result<(), Self::Error> {
// Strict temporal ordering of subscriptions is not enforced.
// Subscruptions are evaluated on polling, not when the message is published
self.subscriptions.insert(delivery_address);
Ok(())
}
}
/// A Contact Registry used for Tests.
/// This implementation stores bundle bytes and then returns them when
/// retreived
///
#[derive(Clone)]
pub struct EphemeralRegistry {
registry: Arc<Mutex<HashMap<String, Vec<u8>>>>,
}
impl EphemeralRegistry {
pub fn new() -> Self {
Self {
registry: Arc::new(Mutex::new(HashMap::new())),
}
}
}
impl Debug for EphemeralRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let registry = self.registry.lock().unwrap();
let truncated: Vec<(&String, String)> = registry
.iter()
.map(|(k, v)| {
let hex = if v.len() <= 8 {
hex::encode(v)
} else {
format!(
"{}..{}",
hex::encode(&v[..4]),
hex::encode(&v[v.len() - 4..])
)
};
(k, hex)
})
.collect();
f.debug_struct("EphemeralRegistry")
.field("registry", &truncated)
.finish()
}
}
impl RegistrationService for EphemeralRegistry {
type Error = String;
fn register(&mut self, identity: String, key_bundle: Vec<u8>) -> Result<(), Self::Error> {
self.registry.lock().unwrap().insert(identity, key_bundle);
Ok(())
}
fn retreive(&self, identity: &str) -> Result<Option<Vec<u8>>, Self::Error> {
Ok(self.registry.lock().unwrap().get(identity).cloned())
}
}
pub struct MemStore {
convos: HashMap<String, ConversationMeta>,
state: HashMap<String, Vec<u8>>,
}
impl MemStore {
pub fn new() -> Self {
Self {
convos: HashMap::new(),
state: HashMap::new(),
}
}
}
impl ConversationStore for MemStore {
fn save_conversation(
&mut self,
meta: &storage::ConversationMeta,
) -> Result<(), storage::StorageError> {
self.convos
.insert(meta.local_convo_id.clone(), meta.clone());
Ok(())
}
fn load_conversation(
&self,
local_convo_id: &str,
) -> Result<Option<storage::ConversationMeta>, storage::StorageError> {
let a = self.convos.get(local_convo_id).cloned();
Ok(a)
}
fn remove_conversation(&mut self, local_convo_id: &str) -> Result<(), storage::StorageError> {
todo!()
}
fn load_conversations(&self) -> Result<Vec<storage::ConversationMeta>, storage::StorageError> {
Ok(self.convos.values().cloned().collect())
}
fn has_conversation(&self, local_convo_id: &str) -> Result<bool, storage::StorageError> {
Ok(self.convos.contains_key(local_convo_id))
}
}
impl IdentityStore for MemStore {
fn load_identity(&self) -> Result<Option<crypto::Identity>, storage::StorageError> {
// todo!()
Ok(None)
}
fn save_identity(&mut self, identity: &crypto::Identity) -> Result<(), storage::StorageError> {
// todo!()
Ok(())
}
}
impl EphemeralKeyStore for MemStore {
fn save_ephemeral_key(
&mut self,
public_key_hex: &str,
private_key: &crypto::PrivateKey,
) -> Result<(), storage::StorageError> {
todo!()
}
fn load_ephemeral_key(
&self,
public_key_hex: &str,
) -> Result<Option<crypto::PrivateKey>, storage::StorageError> {
todo!()
}
fn remove_ephemeral_key(&mut self, public_key_hex: &str) -> Result<(), storage::StorageError> {
todo!()
}
}
impl RatchetStore for MemStore {
fn save_ratchet_state(
&mut self,
conversation_id: &str,
state: &storage::RatchetStateRecord,
skipped_keys: &[storage::SkippedKeyRecord],
) -> Result<(), storage::StorageError> {
todo!()
}
fn load_ratchet_state(
&self,
conversation_id: &str,
) -> Result<storage::RatchetStateRecord, storage::StorageError> {
todo!()
}
fn load_skipped_keys(
&self,
conversation_id: &str,
) -> Result<Vec<storage::SkippedKeyRecord>, storage::StorageError> {
todo!()
}
fn has_ratchet_state(&self, conversation_id: &str) -> Result<bool, storage::StorageError> {
todo!()
}
fn delete_ratchet_state(&mut self, conversation_id: &str) -> Result<(), storage::StorageError> {
todo!()
}
fn cleanup_old_skipped_keys(
&mut self,
max_age_secs: i64,
) -> Result<usize, storage::StorageError> {
todo!()
}
}
// impl GroupMlsStorageV1 for MemStore {
// fn save_state(&self, convo_id: &str, state: &[u8]) {
// self.state.insert(convo_id, state)
// }
// fn load_state(&self, convo_id: &str) -> Vec<u8> {
// self.state.get(convo_id).unwrap().clone()
// }
// }

View File

@ -6,13 +6,51 @@ use crate::proto::{self, Message};
// This struct represents Outbound data.
// It wraps an encoded payload with a delivery address, so it can be handled by the delivery service.
#[derive(Clone)]
pub struct AddressedEnvelope {
pub delivery_address: String,
pub data: Vec<u8>,
}
impl AddressedEnvelope {
pub fn new(delivery_address: String, convo_id: String, data: &[u8]) -> Self {
let envelope = proto::EnvelopeV1 {
// TODO: conversation_id should be obscured
conversation_hint: convo_id,
salt: 0,
payload: proto::Bytes::copy_from_slice(data),
};
AddressedEnvelope {
delivery_address,
data: envelope.encode_to_vec(),
}
}
}
impl Debug for AddressedEnvelope {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let data = &self.data;
let hex = if data.len() <= 8 {
hex::encode(data)
} else {
format!(
"{}..{}",
hex::encode(&data[..4]),
hex::encode(&data[data.len() - 4..])
)
};
f.debug_struct("AddressedEnvelope")
.field("addr", &self.delivery_address)
.field("data", &hex)
.finish()
}
}
// This struct represents the result of processed inbound data.
// It wraps content payload with a conversation_id
#[derive(Debug)]
pub struct ContentData {
pub conversation_id: String,
pub data: Vec<u8>,
@ -30,17 +68,11 @@ pub(crate) struct AddressedEncryptedPayload {
impl AddressedEncryptedPayload {
// Wrap in an envelope and prepare for transmission
pub fn into_envelope(self, convo_id: String) -> AddressedEnvelope {
let envelope = proto::EnvelopeV1 {
// TODO: conversation_id should be obscured
conversation_hint: convo_id,
salt: 0,
payload: proto::Bytes::copy_from_slice(self.data.encode_to_vec().as_slice()),
};
AddressedEnvelope {
delivery_address: self.delivery_address,
data: envelope.encode_to_vec(),
}
AddressedEnvelope::new(
self.delivery_address,
convo_id,
self.data.encode_to_vec().as_slice(),
)
}
}

View File

@ -6,3 +6,66 @@ pub fn timestamp_millis() -> i64 {
.unwrap()
.as_millis() as i64
}
/// Track hash sizes in use across the crate.
pub mod hash_size {
use blake2::digest::{
consts::U64,
generic_array::ArrayLength,
typenum::{IsLessOrEqual, NonZero},
};
pub trait HashLen
where
<Self::Size as IsLessOrEqual<U64>>::Output: NonZero,
{
type Size: ArrayLength<u8> + IsLessOrEqual<U64>;
}
/// This macro generates HashLen for the given typenum::length
macro_rules! hash_sizes {
($($(#[$attr:meta])* $name:ident => $size:ty),* $(,)?) => {
$(
$(#[$attr])*
pub struct $name;
impl HashLen for $name { type Size = $size; }
)*
};
}
use blake2::digest::consts::{U4, U8, U18};
hash_sizes! {
/// Generic hash size for tests and debug
Testing => U4,
/// Account ID hash length
AccountId => U8,
ConversationId => U18,
}
}
use blake2::{Blake2b, Digest};
/// This establishes an easy to use wrapper for hashes in this crate.
/// The output is formatted string of hex characters
pub fn blake2b_hex<LEN: hash_size::HashLen>(components: &[impl AsRef<[u8]>]) -> String {
//A
let mut hash = Blake2b::<LEN::Size>::new();
for c in components {
hash.update(c);
}
let output = hash.finalize();
hex::encode(output)
}
pub fn hex_trunc(data: &[u8]) -> String {
if data.len() <= 8 {
hex::encode(data)
} else {
format!(
"{}..{}",
hex::encode(&data[..4]),
hex::encode(&data[data.len() - 4..])
)
}
}

View File

@ -4,6 +4,8 @@ mod signatures;
mod x3dh;
mod xeddsa_sign;
use thiserror::Error;
pub use identity::Identity;
pub use keys::{PrivateKey, PublicKey, SymmetricKey32};
pub use signatures::{Ed25519SigningKey, Ed25519VerifyingKey};

View File

@ -10,8 +10,8 @@ use std::collections::HashSet;
use crypto::{Identity, PrivateKey};
use rusqlite::{Transaction, params};
use storage::{
ConversationKind, ConversationMeta, ConversationStore, EphemeralKeyStore, IdentityStore,
RatchetStateRecord, RatchetStore, SkippedKeyRecord, StorageError,
ChatStore, ConversationKind, ConversationMeta, ConversationStore, EphemeralKeyStore,
IdentityStore, RatchetStateRecord, RatchetStore, SkippedKeyRecord, StorageError,
};
use zeroize::Zeroize;
@ -532,6 +532,16 @@ fn blob_to_array<const N: usize>(
.map_err(|_| invalid_blob_length(field, N, actual))
}
// impl GroupMlsStorageV1 for ChatStorage {
// fn save_state(&self, convo_id: &str, state: &[u8]) {
// todo!()
// }
// fn load_state(&self, convo_id: &str) -> Vec<u8> {
// todo!()
// }
// }
#[cfg(test)]
mod tests {
use storage::{

View File

@ -27,6 +27,7 @@ pub trait EphemeralKeyStore {
pub enum ConversationKind {
PrivateV1,
Unknown(String),
GroupV1,
}
impl ConversationKind {
@ -34,6 +35,7 @@ impl ConversationKind {
match self {
Self::PrivateV1 => "private_v1",
Self::Unknown(value) => value.as_str(),
Self::GroupV1 => "group_v1",
}
}
}
@ -42,6 +44,7 @@ impl From<&str> for ConversationKind {
fn from(value: &str) -> Self {
match value {
"private_v1" => Self::PrivateV1,
"group_v1" => Self::GroupV1,
other => Self::Unknown(other.to_string()),
}
}
@ -120,6 +123,8 @@ pub trait RatchetStore {
fn cleanup_old_skipped_keys(&mut self, max_age_secs: i64) -> Result<usize, StorageError>;
}
// TODO: (P2) this should be defined in the ConversationType
pub trait ChatStore: IdentityStore + EphemeralKeyStore + ConversationStore + RatchetStore {}
impl<T> ChatStore for T where T: IdentityStore + EphemeralKeyStore + ConversationStore + RatchetStore

View File

@ -1,9 +1,9 @@
use libchat::{
AddressedEnvelope, ChatError, ChatStorage, ContentData, Context, ConversationIdOwned,
Introduction, StorageConfig,
DeliveryService, Introduction, StorageConfig,
};
use crate::{delivery::DeliveryService, errors::ClientError};
use crate::errors::ClientError;
pub struct ChatClient<D: DeliveryService> {
ctx: Context<ChatStorage>,

View File

@ -1,6 +0,0 @@
use libchat::AddressedEnvelope;
pub trait DeliveryService {
type Error: std::fmt::Debug;
fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Self::Error>;
}

View File

@ -1,4 +1,4 @@
use crate::{AddressedEnvelope, delivery::DeliveryService};
use crate::{AddressedEnvelope, DeliveryService};
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::{Arc, RwLock};

View File

@ -1,12 +1,12 @@
mod client;
mod delivery;
mod delivery_in_process;
mod errors;
pub use client::ChatClient;
pub use delivery::DeliveryService;
pub use delivery_in_process::{Cursor, InProcessDelivery, MessageBus};
pub use errors::ClientError;
// Re-export types callers need to interact with ChatClient
pub use libchat::{AddressedEnvelope, ContentData, ConversationIdOwned, StorageConfig};
pub use libchat::{
AddressedEnvelope, ContentData, ConversationIdOwned, DeliveryService, StorageConfig,
};

View File

@ -0,0 +1,6 @@
[package]
name = "delivery"
version = "0.1.0"
edition = "2024"
[dependencies]

View File

@ -0,0 +1,5 @@
mod local_bcast;
use local_bcast::LocalBroadcast;
pub use LocalBroadcast;

View File

@ -0,0 +1,58 @@
use libchat::DeliveryService;
type Callback = Box<dyn FnOnce(String, &Vec<u8>)>;
#[derive(Clone)]
struct LocalBroadcaster {
subscribers: Arc<Mutex<HashMap<String, Vec<Callback>>>>,
}
impl LocalBroadcaster {
pub fn new() -> Self {
Self {
subscribers: Arc::new(Mutex::new(HashMap::new())),
}
}
}
impl DeliveryService for LocalBroadcaster {
type Error = String;
fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Self::Error> {
let callbacks = self
.subscribers
.lock()
.unwrap()
.remove(&envelope.delivery_address)
.unwrap_or_default();
for cb in callbacks {
cb(envelope.delivery_address.clone(), &envelope.data);
}
Ok(())
}
fn subscribe<F>(&mut self, delivery_address: String, cb: F) -> Result<(), Self::Error>
where
F: FnOnce(String, &Vec<u8>) + 'static,
{
self.subscribers
.lock()
.unwrap()
.entry(delivery_address)
.or_default()
.push(Box::new(cb));
Ok(())
}
}
#[cfg(test)]
mod tests {
#[test]
fn local_bcast() {
let ds = LocalBroadcast::new();
}
}