Merge remote-tracking branch 'origin/main' into seemenkina/mls_clean_up

This commit is contained in:
seemenkina 2026-06-22 18:20:05 +03:00
commit 4ca789ac10
No known key found for this signature in database
14 changed files with 328 additions and 145 deletions

View File

@ -1,3 +1,4 @@
mod direct_v1;
pub mod group_v1;
mod group_v2;
mod privatev1;
@ -6,6 +7,7 @@ pub use crate::errors::ChatError;
use crate::outcomes::ConvoOutcome;
use crate::proto::EncryptedPayload;
use crate::service_context::{ExternalServices, ServiceContext};
pub use direct_v1::DirectV1Convo;
pub use group_v1::GroupV1Convo;
pub use group_v2::GroupV2Convo;
pub use privatev1::PrivateV1Convo;
@ -15,7 +17,7 @@ pub type ConversationId = String;
pub type ConversationIdRef<'a> = &'a str;
/// Behaviour shared by every conversation kind.
pub(crate) trait Convo<S: ExternalServices> {
pub(crate) trait Convo<S: ExternalServices>: Identified + Send {
fn send_content(&mut self, cx: &mut ServiceContext<S>, content: &[u8])
-> Result<(), ChatError>;
@ -40,6 +42,8 @@ pub(crate) trait GroupConvo<S: ExternalServices>: Convo<S> + std::fmt::Debug + S
cx: &mut ServiceContext<S>,
members: &[IdentIdRef],
) -> Result<(), ChatError>;
}
pub(crate) trait Identified {
fn id(&self) -> ConversationIdRef<'_>;
}

View File

@ -0,0 +1,61 @@
use chat_proto::logoschat::encryption::EncryptedPayload;
use shared_traits::IdentIdRef;
use crate::{
ChatError, ExternalServices,
conversation::{ConversationIdRef, Convo, GroupConvo, GroupV1Convo, Identified},
service_context::ServiceContext,
};
type DelegateGroup = GroupV1Convo;
/// A Conversation between two participants.
#[derive(Debug)]
pub struct DirectV1Convo {
inner_group: DelegateGroup,
}
impl DirectV1Convo {
// Constructor must accept multiple IdentId's
// While the conversation is limited to 2 participants, each participants may
// have multiple Installations.
pub fn new<S: ExternalServices>(
cx: &mut ServiceContext<S>,
members: &[IdentIdRef],
) -> Result<Self, ChatError> {
let mut inner_group = DelegateGroup::new(cx)?;
inner_group.add_member(cx, members)?;
Ok(Self { inner_group })
}
}
impl Identified for DirectV1Convo {
fn id(&self) -> ConversationIdRef<'_> {
self.inner_group.id()
}
}
impl<S> Convo<S> for DirectV1Convo
where
S: ExternalServices,
{
fn send_content(
&mut self,
cx: &mut ServiceContext<S>,
content: &[u8],
) -> Result<(), super::ChatError> {
self.inner_group.send_content(cx, content)
}
fn handle_frame(
&mut self,
cx: &mut ServiceContext<S>,
enc: EncryptedPayload,
) -> Result<crate::ConvoOutcome, ChatError> {
self.inner_group.handle_frame(cx, enc)
}
fn wakeup(&mut self, service_ctx: &mut ServiceContext<S>) -> Result<(), ChatError> {
self.inner_group.wakeup(service_ctx)
}
}

View File

@ -11,12 +11,13 @@ use prost::Message as _;
use shared_traits::IdentIdRef;
use crate::account_directory::{AccountDirectory, resolve_device_ids};
use crate::conversation::ConversationIdRef;
use crate::inbox_v2::MlsProvider;
use crate::service_context::{ExternalServices, ServiceContext};
use crate::{
DeliveryService, IdentityProvider,
conversation::{ChatError, Convo, GroupConvo},
conversation::{ChatError, Convo, GroupConvo, Identified},
outcomes::{Content, ConvoOutcome},
service_traits::KeyPackageProvider,
types::AddressedEncryptedPayload,
@ -174,10 +175,6 @@ impl GroupV1Convo {
Ok(keypackages)
}
pub fn id(&self) -> &str {
&self.convo_id
}
fn send_message<S: ExternalServices>(
&mut self,
content: &[u8],
@ -205,6 +202,12 @@ impl GroupV1Convo {
}
}
impl Identified for GroupV1Convo {
fn id(&self) -> ConversationIdRef<'_> {
&self.convo_id
}
}
impl<S: ExternalServices> Convo<S> for GroupV1Convo {
fn send_content(
&mut self,
@ -344,8 +347,4 @@ impl<S: ExternalServices> GroupConvo<S> for GroupV1Convo {
.publish(env)
.map_err(|e| ChatError::Generic(format!("Publish: {e}")))
}
fn id(&self) -> super::ConversationIdRef<'_> {
&self.convo_id
}
}

View File

@ -34,7 +34,7 @@ use crate::IdentityProvider;
use crate::conversation::{ConversationIdRef, ExternalServices, ServiceContext};
use crate::{
ConvoOutcome, DeliveryService, RegistrationService,
conversation::{ChatError, Convo, GroupConvo},
conversation::{ChatError, Convo, GroupConvo, Identified},
};
/// Namespace used for de-mls (GroupV2) keypackages, so they don't collide
@ -288,6 +288,12 @@ impl GroupV2Convo {
}
}
impl Identified for GroupV2Convo {
fn id(&self) -> ConversationIdRef<'_> {
&self.convo_id
}
}
impl<S> Convo<S> for GroupV2Convo
where
S: ExternalServices,
@ -372,9 +378,6 @@ impl<S> GroupConvo<S> for GroupV2Convo
where
S: ExternalServices,
{
fn id(&self) -> ConversationIdRef<'_> {
&self.convo_id
}
#[instrument(name = "groupv2.add_member", skip_all, fields(user_id = %service_ctx.mls_identity.display_name()))]
fn add_member(
&mut self,

View File

@ -14,7 +14,7 @@ use storage::{ConversationKind, ConversationMeta, ConversationStore};
use crate::{
DeliveryService,
conversation::{ChatError, ConversationId, Convo},
conversation::{ChatError, ConversationId, ConversationIdRef, Convo, Identified},
errors::EncryptionError,
inbox::PRIVATE_V1_INBOX_ADDRESS,
outcomes::{Content, ConvoOutcome},
@ -200,10 +200,6 @@ impl PrivateV1Convo {
}
}
pub fn id(&self) -> &str {
&self.local_convo_id
}
pub fn encrypt_content<S: RatchetStore>(
&mut self,
content: &[u8],
@ -235,6 +231,12 @@ impl PrivateV1Convo {
}
}
impl Identified for PrivateV1Convo {
fn id(&self) -> ConversationIdRef<'_> {
&self.local_convo_id
}
}
impl<S: ExternalServices> Convo<S> for PrivateV1Convo {
fn send_content(
&mut self,

View File

@ -1,5 +1,7 @@
use crate::causal_history::{CausalHistoryStore, MissingMessage};
use crate::conversation::{ConversationIdRef, GroupV1Convo, GroupV2Convo, PrivateV1Convo};
use crate::conversation::{
ConversationIdRef, DirectV1Convo, GroupV1Convo, GroupV2Convo, Identified, PrivateV1Convo,
};
use crate::service_context::{ExternalServices, ServiceContext};
use crate::{DeliveryService, IdentityProvider, RegistrationService, WakeupService};
use crate::{
@ -14,6 +16,7 @@ use crypto::{Identity, PublicKey};
use openmls::group::GroupId;
use shared_traits::IdentIdRef;
use std::collections::HashMap;
use std::fmt::Debug;
use storage::{ChatStore, ConversationKind, ConversationStore};
use tracing::{info, instrument};
@ -182,7 +185,14 @@ impl<'a, S: ExternalServices + 'static> Core<S> {
self.services.identity.public_key()
}
pub fn create_private_convo(
pub fn create_direct_convo(
&mut self,
members: &[IdentIdRef],
) -> Result<ConversationId, ChatError> {
self.create_direct_convo_v1(members)
}
pub fn create_private_convo_v1(
&mut self,
remote_bundle: &Introduction,
content: &[u8],
@ -202,6 +212,17 @@ impl<'a, S: ExternalServices + 'static> Core<S> {
Ok(convo_id)
}
pub fn create_direct_convo_v1(
&mut self,
members: &[IdentIdRef],
) -> Result<ConversationId, ChatError> {
let convo = DirectV1Convo::new(&mut self.services, members)?;
let convo_id = convo.id().to_string();
self.register_convo(ConvoTypeOwned::Direct(Box::new(convo)))?;
Ok(convo_id)
}
pub fn create_group_convo(
&mut self,
participants: &[IdentIdRef],
@ -264,6 +285,10 @@ impl<'a, S: ExternalServices + 'static> Core<S> {
ConvoTypeOwned::Group(group_convo) => {
group_convo.add_member(&mut self.services, members)
}
ConvoTypeOwned::Direct(convo) => Err(ChatError::UnsupportedFunction(
convo.id().into(),
"Add Member".into(),
)),
}
} else {
let mut convo = self.load_group_convo(convo_id)?;
@ -395,6 +420,7 @@ impl<'a, S: ExternalServices + 'static> Core<S> {
};
let convo = match convo {
ConvoTypeOwned::Group(c) => c.as_mut(),
ConvoTypeOwned::Direct(c) => c.as_mut(),
};
convo.wakeup(&mut self.services)
@ -464,15 +490,24 @@ impl<'a, S: ExternalServices + 'static> Core<S> {
}
}
#[derive(Debug)]
enum ConvoTypeOwned<S: ExternalServices> {
// Pairwise(Box<dyn BaseConvo<S>>),
Direct(Box<dyn Convo<S>>),
Group(Box<dyn GroupConvo<S>>),
}
impl<'a, S: ExternalServices> ConvoTypeOwned<S> {
pub fn id(&'a self) -> ConversationIdRef<'a> {
impl<S: ExternalServices> Debug for ConvoTypeOwned<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Direct(arg0) => f.debug_tuple("Pairwise").field(&arg0.id()).finish(),
Self::Group(arg0) => f.debug_tuple("Group").field(&arg0.id()).finish(),
}
}
}
impl<S: ExternalServices> Identified for ConvoTypeOwned<S> {
fn id(&self) -> ConversationIdRef<'_> {
match self {
ConvoTypeOwned::Direct(convo) => convo.id(),
ConvoTypeOwned::Group(group_convo) => group_convo.id(),
}
}
@ -486,6 +521,7 @@ impl<S: ExternalServices> Convo<S> for ConvoTypeOwned<S> {
) -> Result<(), ChatError> {
match self {
ConvoTypeOwned::Group(group_convo) => group_convo.send_content(cx, content),
ConvoTypeOwned::Direct(convo) => convo.send_content(cx, content),
}
}
@ -496,12 +532,14 @@ impl<S: ExternalServices> Convo<S> for ConvoTypeOwned<S> {
) -> Result<ConvoOutcome, ChatError> {
match self {
ConvoTypeOwned::Group(group_convo) => group_convo.handle_frame(cx, enc),
ConvoTypeOwned::Direct(convo) => convo.handle_frame(cx, enc),
}
}
fn wakeup(&mut self, service_ctx: &mut ServiceContext<S>) -> Result<(), ChatError> {
match self {
ConvoTypeOwned::Group(group_convo) => group_convo.wakeup(service_ctx),
ConvoTypeOwned::Direct(convo) => convo.wakeup(service_ctx),
}
}
}

View File

@ -4,6 +4,8 @@ pub use thiserror::Error;
use storage::StorageError;
use crate::ConversationId;
#[derive(Error, Debug)]
pub enum ChatError {
#[error("protocol error: {0:?}")]
@ -42,6 +44,9 @@ pub enum ChatError {
MlsError(#[from] MlsError),
#[error("demls error: {0}")]
DeMlsError(#[from] ConversationError),
// Used when a core function is called with a convo_id which is unsupported
#[error("convo:{0} does not support {1}")]
UnsupportedFunction(ConversationId, String),
}
impl ChatError {

View File

@ -21,6 +21,7 @@ use crate::RegistrationService;
use crate::conversation::GroupConvo;
use crate::conversation::GroupV1Convo;
use crate::conversation::GroupV2Convo;
use crate::conversation::Identified as _;
use crate::service_context::{ExternalServices, ServiceContext};
use crate::utils::{blake2b_hex, hash_size};
use crate::{

View File

@ -87,7 +87,7 @@ fn ctx_integration() {
// Saro initiates conversation with Raya
let mut content = vec![10];
let saro_convo_id = saro.create_private_convo(&intro, &content).unwrap();
let saro_convo_id = saro.create_private_convo_v1(&intro, &content).unwrap();
// Raya receives the invite + initial message
let initial = recv_one(&mut raya);
@ -178,7 +178,7 @@ fn conversation_metadata_persistence() {
let bundle = alice.create_intro_bundle().unwrap();
let intro = Introduction::try_from(bundle.as_slice()).unwrap();
bob.create_private_convo(&intro, b"hi").unwrap();
bob.create_private_convo_v1(&intro, b"hi").unwrap();
let result = recv_one(&mut alice);
let PayloadOutcome::Inbox(io) = result else {
@ -219,7 +219,7 @@ fn conversation_full_flow() {
let bundle = alice.create_intro_bundle().unwrap();
let intro = Introduction::try_from(bundle.as_slice()).unwrap();
let bob_convo_id = bob.create_private_convo(&intro, b"hello").unwrap();
let bob_convo_id = bob.create_private_convo_v1(&intro, b"hello").unwrap();
let result = recv_one(&mut alice);
let PayloadOutcome::Inbox(io) = result else {

View File

@ -0,0 +1,43 @@
use integration_tests_core::TestHarness;
use tracing::info;
#[test]
fn happypath_roundtrip() {
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_test_writer()
.try_init();
const S_M1: &[u8] = b"Marco";
const R_M1: &[u8] = b"Polo";
// Initialize TestHarness with 2 clients
let mut harness = TestHarness::<2>::new(|_, _| {});
//Saro Create Convo
let particpant = harness.raya().addr();
let convo_id = harness
.saro()
.create_direct_convo_v1(&[&particpant])
.expect("saro create group");
// Carry the invite through (commit, WelcomeReady, routing to Raya's inbox,
// accept_welcome); settle until Raya has joined.
harness.process_until_label("Saro Send", |h| h.raya().convo_count() == 1);
// Saro sends a message; settle until Raya receives it.
info!(target: "chat", "Saro -> sending: {S_M1:?}");
harness
.saro()
.send_content(&convo_id, S_M1)
.expect("saro send");
harness.process_until(|h| h.raya().check(&convo_id, S_M1));
// Raya replies; settle until Saro receives it.
info!(target: "chat", "Raya -> sending:{R_M1:?}");
harness.raya().send_content(&convo_id, R_M1).unwrap();
harness.process_until(|h| h.saro().check(&convo_id, R_M1));
assert!(harness.saro().check(&convo_id, R_M1));
}

View File

@ -4,8 +4,8 @@ use std::thread::{self, JoinHandle};
use components::{EphemeralRegistry, ThreadedWakeupService, WakeupEvent};
use crossbeam_channel::{Receiver, Sender, select};
use libchat::{
ChatError, ChatStorage, ConversationId, ConvoOutcome, Core, DeliveryService, InboxOutcome,
Introduction, PayloadOutcome, RegistrationService, StorageConfig,
ChatError, ChatStorage, ConversationId, ConvoOutcome, Core, DeliveryService, IdentId,
IdentIdRef, InboxOutcome, Introduction, PayloadOutcome, RegistrationService, StorageConfig,
};
use logos_account::TestLogosAccount;
use parking_lot::Mutex;
@ -14,6 +14,8 @@ use crate::errors::ClientError;
use crate::event::Event;
type ClientCore<T, R> = Core<(TestLogosAccount, T, R, ThreadedWakeupService, ChatStorage)>;
type AccountAddressRef<'a> = &'a str;
type LocalSignerId = IdentId;
/// The transport as the client sees it: a [`DeliveryService`] for outbound
/// publishing plus the inbound payload stream the worker drains. One object owns
@ -158,8 +160,24 @@ where
self.core.lock().create_intro_bundle().map_err(Into::into)
}
// Creates a conversation between two Accounts.
pub fn create_direct_conversation(
&mut self,
account: AccountAddressRef,
) -> Result<ConversationId, ClientError> {
let signers = self.signers_from_account(account)?;
let signer_refs: Vec<IdentIdRef> = signers.iter().collect();
self.core
.lock()
.create_direct_convo(&signer_refs)
.map_err(Into::into)
}
/// Parse intro bundle bytes and initiate a private conversation. Outbound
/// envelopes are published by the core. Returns this side's conversation ID.
///
/// This function will be deprecated in the future. Use `create_direct_conversation`
pub fn create_conversation(
&mut self,
intro_bundle: &[u8],
@ -168,7 +186,7 @@ where
let intro = Introduction::try_from(intro_bundle)?;
self.core
.lock()
.create_private_convo(&intro, initial_content)
.create_private_convo_v1(&intro, initial_content)
.map_err(Into::into)
}
@ -185,6 +203,15 @@ where
.send_content(convo_id, content)
.map_err(Into::into)
}
// Get signers for a given AccountAddress.
fn signers_from_account(
&self,
account: AccountAddressRef,
) -> Result<Vec<LocalSignerId>, ClientError> {
// Assume Account = LocalSigner until Account is ready
Ok(vec![IdentId::new(account.to_string())])
}
}
impl<T: DeliveryService, R: RegistrationService> Drop for ChatClient<T, R> {

View File

@ -1,114 +1,2 @@
use std::{
collections::HashMap,
fmt::Debug,
sync::{Arc, Mutex},
};
use crypto::Ed25519VerifyingKey;
use libchat::{
AccountDirectory, DeviceSet, IdentityProvider, RegistrationService, SignedDeviceBundle,
verify_bundle,
};
pub mod ephemeral;
pub mod http;
/// A Contact Registry used for Tests.
/// This implementation stores bundle bytes and then returns them when
/// retrieved.
///
/// Like the real `keypackage-registry`, one object serves both roles: a
/// keypackage store ([`RegistrationService`]) keyed by `device_id`, and an
/// account → device directory ([`AccountDirectory`]) keyed by the hex account key.
#[derive(Clone, Default)]
pub struct EphemeralRegistry {
key_packages: Arc<Mutex<HashMap<String, Vec<u8>>>>,
installations: Arc<Mutex<HashMap<String, SignedDeviceBundle>>>,
}
impl EphemeralRegistry {
pub fn new() -> Self {
Self::default()
}
}
impl Debug for EphemeralRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let registry = self.key_packages.lock().unwrap();
let truncated: Vec<(&String, String)> = registry
.iter()
.map(|(k, v)| {
let hex = if v.len() <= 8 {
hex::encode(v)
} else {
format!(
"{}..{}",
hex::encode(&v[..4]),
hex::encode(&v[v.len() - 4..])
)
};
(k, hex)
})
.collect();
f.debug_struct("EphemeralRegistry")
.field("registry", &truncated)
.finish()
}
}
impl RegistrationService for EphemeralRegistry {
type Error = String;
fn register(
&mut self,
identity: &dyn IdentityProvider,
key_bundle: Vec<u8>,
) -> Result<(), <Self as RegistrationService>::Error> {
self.key_packages
.lock()
.unwrap()
.insert(identity.id().to_string(), key_bundle);
Ok(())
}
fn retrieve(
&self,
device_id: &str,
) -> Result<Option<Vec<u8>>, <Self as RegistrationService>::Error> {
Ok(self.key_packages.lock().unwrap().get(device_id).cloned())
}
}
/// Account → device directory, verifying each bundle on `fetch` exactly as the
/// HTTP client does so callers exercise the same trust path without a server.
impl AccountDirectory for EphemeralRegistry {
type Error = String;
fn publish(
&mut self,
bundle: &SignedDeviceBundle,
) -> Result<(), <Self as AccountDirectory>::Error> {
self.installations
.lock()
.unwrap()
.insert(hex::encode(bundle.account_pub.as_ref()), bundle.clone());
Ok(())
}
fn fetch(
&self,
account: &Ed25519VerifyingKey,
) -> Result<Option<DeviceSet>, <Self as AccountDirectory>::Error> {
let Some(bundle) = self
.installations
.lock()
.unwrap()
.get(&hex::encode(account.as_ref()))
.cloned()
else {
return Ok(None);
};
verify_bundle(account, &bundle)
.map(Some)
.map_err(|e| e.to_string())
}
}

View File

@ -0,0 +1,112 @@
use std::{
collections::HashMap,
fmt::Debug,
sync::{Arc, Mutex},
};
use crypto::Ed25519VerifyingKey;
use libchat::{
AccountDirectory, DeviceSet, IdentityProvider, RegistrationService, SignedDeviceBundle,
verify_bundle,
};
/// A Contact Registry used for Tests.
/// This implementation stores bundle bytes and then returns them when
/// retrieved.
///
/// Like the real `keypackage-registry`, one object serves both roles: a
/// keypackage store ([`RegistrationService`]) keyed by `device_id`, and an
/// account → device directory ([`AccountDirectory`]) keyed by the hex account key.
#[derive(Clone, Default)]
pub struct EphemeralRegistry {
key_packages: Arc<Mutex<HashMap<String, Vec<u8>>>>,
installations: Arc<Mutex<HashMap<String, SignedDeviceBundle>>>,
}
impl EphemeralRegistry {
pub fn new() -> Self {
Self::default()
}
}
impl Debug for EphemeralRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let registry = self.key_packages.lock().unwrap();
let truncated: Vec<(&String, String)> = registry
.iter()
.map(|(k, v)| {
let hex = if v.len() <= 8 {
hex::encode(v)
} else {
format!(
"{}..{}",
hex::encode(&v[..4]),
hex::encode(&v[v.len() - 4..])
)
};
(k, hex)
})
.collect();
f.debug_struct("EphemeralRegistry")
.field("registry", &truncated)
.finish()
}
}
impl RegistrationService for EphemeralRegistry {
type Error = String;
fn register(
&mut self,
identity: &dyn IdentityProvider,
key_bundle: Vec<u8>,
) -> Result<(), <Self as RegistrationService>::Error> {
self.key_packages
.lock()
.unwrap()
.insert(identity.id().to_string(), key_bundle);
Ok(())
}
fn retrieve(
&self,
device_id: &str,
) -> Result<Option<Vec<u8>>, <Self as RegistrationService>::Error> {
Ok(self.key_packages.lock().unwrap().get(device_id).cloned())
}
}
/// Account → device directory, verifying each bundle on `fetch` exactly as the
/// HTTP client does so callers exercise the same trust path without a server.
impl AccountDirectory for EphemeralRegistry {
type Error = String;
fn publish(
&mut self,
bundle: &SignedDeviceBundle,
) -> Result<(), <Self as AccountDirectory>::Error> {
self.installations
.lock()
.unwrap()
.insert(hex::encode(bundle.account_pub.as_ref()), bundle.clone());
Ok(())
}
fn fetch(
&self,
account: &Ed25519VerifyingKey,
) -> Result<Option<DeviceSet>, <Self as AccountDirectory>::Error> {
let Some(bundle) = self
.installations
.lock()
.unwrap()
.get(&hex::encode(account.as_ref()))
.cloned()
else {
return Ok(None);
};
verify_bundle(account, &bundle)
.map(Some)
.map_err(|e| e.to_string())
}
}

View File

@ -3,7 +3,7 @@ mod delivery;
mod storage;
mod wakeup;
pub use contact_registry::EphemeralRegistry;
pub use contact_registry::ephemeral::EphemeralRegistry;
pub use contact_registry::http::{HttpRegistry, HttpRegistryError};
pub use delivery::*;
pub use storage::*;