DeMLS Integration (#134)

* Add WakeupService

* Move Id to trait

* Add GroupV2

* Add convo cache

* Add TestHarness

* Instrument call paths

* Downgrade Ciphersuite

* Update imports

* cleanups

* Add Wakeups to Client

* fix: protoc dependency for ci

* fix: nix hash

* Remove save_conversation for v2

* PR comments
This commit is contained in:
Jazz Turner-Baggs 2026-06-15 13:15:18 -07:00 committed by GitHub
parent 9d9a691fe3
commit 960d0bc119
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 4460 additions and 356 deletions

View File

@ -18,6 +18,8 @@ jobs:
steps:
- uses: actions/checkout@v4
- run: rustup update stable && rustup default stable
# hashgraph-like-consensus's build.rs shells out to protoc via prost-build.
- run: sudo apt-get update && sudo apt-get install -y protobuf-compiler
# chat-cli's build.rs unconditionally links liblogosdelivery and requires
# LOGOS_DELIVERY_LIB_DIR. The smoketest job builds and exercises it under
# Nix; here we keep the toolchain-only job fast by skipping it.
@ -31,6 +33,8 @@ jobs:
- uses: actions/checkout@v4
- run: rustup update stable && rustup default stable
- run: rustup component add clippy
# hashgraph-like-consensus's build.rs shells out to protoc via prost-build.
- run: sudo apt-get update && sudo apt-get install -y protobuf-compiler
- run: cargo clippy --all-targets --all-features --workspace --exclude chat-cli -- -D warnings
fmt:

2719
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -15,17 +15,22 @@ shared-traits = { workspace = true }
storage = { workspace = true }
# External dependencies (sorted)
alloy = "2.0"
base64 = "0.22"
chat-proto = { git = "https://github.com/logos-messaging/chat_proto", rev = "37ec98a151f6d50aab2905802ac0a896477e62ea" }
de-mls = { git = "https://github.com/vacp2p/de-mls", branch = "develop" }
double-ratchets = { path = "../double-ratchets" }
hashgraph-like-consensus = "0.5.1"
hex = "0.4.3"
openmls = { version = "0.8.1", features = ["libcrux-provider"] }
openmls_libcrux_crypto = "0.3.1"
openmls_memory_storage = "0.5.0"
openmls_traits = "0.5.0"
prost = "0.14.1"
rand = "0.9"
rand_core = { version = "0.6" }
thiserror = "2.0.17"
tracing = "0.1.44"
x25519-dalek = { version = "2.0.1", features = ["static_secrets", "reusable_secrets", "getrandom"] }
[dev-dependencies]

View File

@ -1,4 +1,5 @@
pub mod group_v1;
mod group_v2;
mod privatev1;
pub use crate::errors::ChatError;
@ -6,10 +7,12 @@ use crate::outcomes::ConvoOutcome;
use crate::proto::EncryptedPayload;
use crate::service_context::{ExternalServices, ServiceContext};
pub use group_v1::GroupV1Convo;
pub use group_v2::GroupV2Convo;
pub use privatev1::PrivateV1Convo;
use shared_traits::IdentIdRef;
pub type ConversationId = String;
pub type ConversationIdRef<'a> = &'a str;
/// Behaviour shared by every conversation kind.
pub(crate) trait Convo<S: ExternalServices> {
@ -26,13 +29,17 @@ pub(crate) trait Convo<S: ExternalServices> {
cx: &mut ServiceContext<S>,
enc: EncryptedPayload,
) -> Result<ConvoOutcome, ChatError>;
fn wakeup(&mut self, service_ctx: &mut ServiceContext<S>) -> Result<(), ChatError>;
}
/// Group-only operations.
pub(crate) trait GroupConvo<S: ExternalServices>: Convo<S> {
pub(crate) trait GroupConvo<S: ExternalServices>: Convo<S> + std::fmt::Debug + Send {
fn add_member(
&mut self,
cx: &mut ServiceContext<S>,
members: &[IdentIdRef],
) -> Result<(), ChatError>;
fn id(&self) -> ConversationIdRef<'_>;
}

View File

@ -39,7 +39,7 @@ impl std::fmt::Debug for GroupV1Convo {
impl GroupV1Convo {
// Create a new conversation with the creator as the only participant.
pub fn new<S: ExternalServices>(cx: &mut ServiceContext<S>) -> Result<Self, ChatError> {
let config = Self::mls_create_config();
let config = Self::mls_create_config(cx);
let mls_group = MlsGroup::new(
&cx.mls_provider,
&cx.mls_identity,
@ -105,9 +105,9 @@ impl GroupV1Convo {
Ok(())
}
fn mls_create_config() -> MlsGroupCreateConfig {
fn mls_create_config<S: ExternalServices>(cx: &mut ServiceContext<S>) -> MlsGroupCreateConfig {
MlsGroupCreateConfig::builder()
.ciphersuite(Ciphersuite::MLS_256_XWING_CHACHA20POLY1305_SHA256_Ed25519)
.ciphersuite(cx.mls_provider.crypto().supported_ciphersuites()[0])
.use_ratchet_tree_extension(true) // This is handy for now, until there is central store for this data
.build()
}
@ -276,6 +276,10 @@ impl<S: ExternalServices> Convo<S> for GroupV1Convo {
content,
})
}
fn wakeup(&mut self, _: &mut ServiceContext<S>) -> Result<(), ChatError> {
Ok(())
}
}
impl<S: ExternalServices> GroupConvo<S> for GroupV1Convo {
@ -340,4 +344,8 @@ impl<S: ExternalServices> GroupConvo<S> for GroupV1Convo {
.publish(env)
.map_err(|e| ChatError::Generic(format!("Publish: {e}")))
}
fn id(&self) -> super::ConversationIdRef<'_> {
&self.convo_id
}
}

View File

@ -0,0 +1,485 @@
// This Implementation is a Quick and Dirty Integration of DeMLS into libchat.
// DeMLS and Libchat have different execution models, trait definitions and ownership/lifetimes of objects.
// The easies path is to do a Spike to see what it would take, gather the friction points and then iterate.
use crate::types::AddressedEncryptedPayload;
use crate::{Content, WakeupService};
use alloy::signers::local::PrivateKeySigner;
use blake2::{Blake2b, Digest, digest::consts::U6};
use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload};
use de_mls::core::{
ConsensusPlugin, ConsensusServiceFor, ConversationEvent, ConversationPluginsFactory,
ScoringConfig, StewardListConfig,
};
use de_mls::defaults::{
DefaultConsensusPlugin, DefaultConversationPluginsFactory, MemoryDeMlsStorage,
};
use de_mls::member_id::MemberId;
use de_mls::mls_crypto::MlsCredentials;
use de_mls::protos::de_mls::messages::v1::{
AppMessage as AppMessageProto, MemberWelcome, app_message,
};
use de_mls::session::{Conversation, ConversationConfig, ConversationDeps};
use hashgraph_like_consensus::signing::EthereumConsensusSigner;
use prost::Message;
use shared_traits::{IdentId, IdentIdRef};
use std::sync::Arc;
use std::time::Duration;
use tracing::{info, instrument, warn};
use crate::IdentityProvider;
use crate::conversation::{ConversationIdRef, ExternalServices, ServiceContext};
use crate::{
ConvoOutcome, DeliveryService, RegistrationService,
conversation::{ChatError, Convo, GroupConvo},
};
/// Namespace used for de-mls (GroupV2) keypackages, so they don't collide
/// with the openmls (GroupV1) keypackage registered under the bare account id.
const DEMLS_KEYPACKAGE_NAMESPACE: &str = "demls";
/// This is a Test Wrapper of Demls MemberId Trait
/// Libchat has its own trait that will need to be intergrated at somepoint.
pub struct LocalDemlsMember {
name: String,
}
impl LocalDemlsMember {
pub fn new(name: impl Into<String>) -> Self {
Self { name: name.into() }
}
}
impl MemberId for LocalDemlsMember {
fn member_id_bytes(&self) -> &[u8] {
self.name.as_bytes()
}
fn member_id_display(&self) -> &str {
&self.name
}
}
/// Borrows an existing `IdentityProvider` but reports a namespaced `id()`,
/// so the same identity can register multiple keypackage "flavors"
/// (e.g. openmls vs. de-mls) without colliding in the registry.
struct NamespacedIdentity<'a> {
inner: &'a dyn IdentityProvider,
id: IdentId,
}
impl<'a> NamespacedIdentity<'a> {
fn new(inner: &'a dyn IdentityProvider, namespace: &str) -> Self {
let id = IdentId::new(Self::prefix(inner.id(), namespace));
Self { inner, id }
}
fn prefix(id: &IdentId, namesapce: &str) -> String {
format!("{namesapce}|{id}")
}
}
impl IdentityProvider for NamespacedIdentity<'_> {
fn id(&self) -> IdentIdRef<'_> {
&self.id
}
fn display_name(&self) -> String {
self.inner.display_name()
}
fn sign(&self, payload: &[u8]) -> crypto::Ed25519Signature {
self.inner.sign(payload)
}
fn public_key(&self) -> &crypto::Ed25519VerifyingKey {
self.inner.public_key()
}
}
struct DemlsSetup {
member: LocalDemlsMember,
factory: DefaultConversationPluginsFactory,
consensus_storage: <DefaultConsensusPlugin as ConsensusPlugin>::ConsensusStorage,
consensus_signer: EthereumConsensusSigner,
app_id: Vec<u8>, // random bytes; echo-dedup key
config: ConversationConfig, // the ms-scale test timers, as before
}
impl DemlsSetup {
fn new(identity_name: String) -> Result<Self, ChatError> {
let member = LocalDemlsMember::new(identity_name);
let credentials = Arc::new(MlsCredentials::from_member_id(&member)?);
let factory = DefaultConversationPluginsFactory::new(
Arc::new(MemoryDeMlsStorage::new()),
credentials,
);
// TODO(config): TEST-ONLY millisecond timers. de-mls deadlines are real
// wall-clock, so the default 60s timers never fire under fast virtual
// time. Production needs a real config injected from the caller, not
// these hardcoded values.
let config = ConversationConfig {
commit_inactivity_duration: Duration::from_millis(50),
freeze_duration: Duration::from_millis(20),
voting_delay: Duration::from_millis(30),
election_voting_delay: Duration::from_millis(30),
consensus_timeout: Duration::from_millis(150),
proposal_expiration: Duration::from_millis(2000),
..ConversationConfig::default()
};
Ok(DemlsSetup {
member,
factory,
consensus_storage: DefaultConsensusPlugin::new_storage(),
consensus_signer: EthereumConsensusSigner::new(PrivateKeySigner::random()),
app_id: rand_string(5).as_bytes().to_vec(),
config,
})
}
/// Call exactly once per Conversation construction.
fn deps(
&self,
) -> ConversationDeps<'_, DefaultConsensusPlugin, DefaultConversationPluginsFactory> {
ConversationDeps {
plugins: &self.factory,
consensus: ConsensusServiceFor::<DefaultConsensusPlugin>::new_with_components(
self.consensus_storage.clone(),
DefaultConsensusPlugin::new_event_bus(),
self.consensus_signer.clone(),
10,
),
identity: &self.member,
app_id: Arc::from(self.app_id.as_slice()),
config: self.config.clone(),
scoring_config: ScoringConfig::default(),
steward_list_config: StewardListConfig::default(),
}
}
}
pub struct GroupV2Convo {
convo_id: String,
setup: DemlsSetup,
conversation: Option<Conversation<DefaultConsensusPlugin, DefaultConversationPluginsFactory>>,
/// Member-ids we proposed via add_member. WelcomeReady now fires on
/// every member; we forward a welcome only to joiners WE invited.
pending_invites: Vec<Vec<u8>>,
}
impl std::fmt::Debug for GroupV2Convo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GroupV2Convo")
.field("convo_id", &self.convo_id)
.finish_non_exhaustive()
}
}
fn rand_string(n: usize) -> String {
let bytes: Vec<u8> = (0..n).map(|_| rand::random::<u8>()).collect();
hex::encode(bytes)
}
impl GroupV2Convo {
pub fn new<S: ExternalServices>(
service_ctx: &mut ServiceContext<S>,
) -> Result<Self, ChatError> {
let setup = DemlsSetup::new(service_ctx.mls_identity.display_name())?;
let convo_id = rand_string(5);
let conversation = Conversation::create(&convo_id, setup.deps())?;
let convo = GroupV2Convo {
convo_id,
setup,
conversation: Some(conversation),
pending_invites: vec![],
};
convo.init(service_ctx)?;
Ok(convo)
}
/// Joiner side: register a fresh key package under the account name,
/// but do NOT start a conversation. `convo_id` stays empty until
/// [`Self::accept_welcome`] fills it.
pub fn new_pending<S: ExternalServices>(
service_ctx: &mut ServiceContext<S>,
) -> Result<Self, ChatError> {
let name = service_ctx.mls_identity.display_name();
let setup = DemlsSetup::new(name.clone())?;
let kp = setup.factory.generate_key_package()?;
// TEMPORARY: Demls creates its own Provider which causes keys to be fragmented in different storage providers.
// The key registry does not support a method to namespace keys with the same identity. When the key is pulled down it cannot
// guarentee it was the one created with demls owned provider, resulting in failure.
// This workaround prefixes the ID used to store the keys, such that they do not conflict.
let namespaced =
NamespacedIdentity::new(&*service_ctx.mls_identity, DEMLS_KEYPACKAGE_NAMESPACE);
service_ctx
.registry
.register(&namespaced, kp.as_bytes().to_vec())
.map_err(ChatError::generic)?;
Ok(GroupV2Convo {
convo_id: String::new(),
setup,
conversation: None,
pending_invites: vec![],
})
}
/// Joiner side: ingest a de-mls welcome handed over the InboxV2 1-1
/// channel. `from_welcome` attaches MLS and applies the bundled
/// `ConversationSync` in one call; we then subscribe to the
/// conversation address and flush the join broadcast.
#[instrument(name = "groupv2.accept_welcome", skip_all, fields(user_id = %service_ctx.mls_identity.display_name()))]
pub fn accept_welcome<S: ExternalServices>(
&mut self,
service_ctx: &mut ServiceContext<S>,
welcome: &MemberWelcome,
) -> Result<(), ChatError> {
let conv = Conversation::from_welcome(self.setup.deps(), welcome)?
.ok_or_else(|| ChatError::generic("welcome not addressed to this member"))?;
self.convo_id = conv.id().to_string();
self.conversation = Some(conv);
self.init(service_ctx)?; // subscribe
self.after_op(service_ctx)?; // flush join broadcast + schedule wakeup
Ok(())
}
fn delivery_address_from_id(convo_id: &str) -> String {
let hash = Blake2b::<U6>::new()
.chain_update("delivery_addr|")
.chain_update(convo_id)
.finalize();
hex::encode(hash)
}
fn init<S: ExternalServices>(
&self,
service_ctx: &mut ServiceContext<S>,
) -> Result<(), ChatError> {
// Configure the delivery service to listen for the required delivery addresses.
service_ctx
.ds
.subscribe(&Self::delivery_address_from_id(&self.convo_id))
.map_err(ChatError::generic)?;
Ok(())
}
pub fn id(&self) -> ConversationIdRef<'_> {
&self.convo_id
}
}
impl<S> Convo<S> for GroupV2Convo
where
S: ExternalServices,
{
#[instrument(name = "groupv2.send_content", skip_all, fields(user_id = %service_ctx.mls_identity.display_name(), content))]
fn send_content(
&mut self,
service_ctx: &mut super::ServiceContext<S>,
content: &[u8],
) -> Result<(), ChatError> {
let conv = self
.conversation
.as_mut()
.ok_or_else(|| ChatError::generic("conversation not found"))?;
conv.send_message(content.to_vec())?;
self.after_op(service_ctx)?;
Ok(())
}
#[instrument(name = "groupv2.handle_frame", skip_all, fields(user_id = %service_ctx.mls_identity.display_name()))]
fn handle_frame(
&mut self,
service_ctx: &mut super::ServiceContext<S>,
encoded_payload: EncryptedPayload,
) -> Result<ConvoOutcome, ChatError> {
let bytes = match encoded_payload.encryption {
Some(encrypted_payload::Encryption::Plaintext(pt)) => pt.payload,
_ => {
return Err(ChatError::generic("Expected plaintext"));
}
};
let frame = GroupV2Frame::decode(bytes.as_ref()).map_err(ChatError::generic)?;
let inner = match frame.payload {
Some(GroupV2Payload::DeMlsWrapper(b)) => b.to_vec(),
_ => return Ok(ConvoOutcome::empty(self.convo_id.clone())),
};
let conv = self
.conversation
.as_mut()
.ok_or_else(|| ChatError::generic("no conversation"))?;
conv.process_inbound(&frame.sender_app_id, &inner)?;
conv.poll();
let events = self.after_op(service_ctx)?; // route + publish + re-arm, returns events
match self.events_to_content(&events) {
Some(o) => Ok(o),
None => {
warn!("returning None as ConvoOutcome");
Ok(ConvoOutcome::empty(self.convo_id.to_string()))
}
}
}
#[instrument(name = "groupv2.wakeup", skip_all, fields(user_id = %ctx.mls_identity.display_name()))]
fn wakeup(&mut self, ctx: &mut ServiceContext<S>) -> Result<(), ChatError> {
info!(convo = %self.convo_id, "Wakeup");
let Some(conv) = self.conversation.as_mut() else {
return Ok(()); // pending joiner: no deadlines exist yet
};
let outcome = conv.poll();
if outcome.leave_requested {
// Commit ejected us (or join expired). Real handling - drops
// this convo from its map;
tracing::warn!(convo = %self.convo_id, "conversation requested teardown");
}
self.after_op(ctx)?; // publish what poll produced + re-arm alarm
Ok(())
}
}
impl<S> GroupConvo<S> for GroupV2Convo
where
S: ExternalServices,
{
fn id(&self) -> ConversationIdRef<'_> {
&self.convo_id
}
#[instrument(name = "groupv2.add_member", skip_all, fields(user_id = %service_ctx.mls_identity.display_name()))]
fn add_member(
&mut self,
service_ctx: &mut ServiceContext<S>,
members: &[IdentIdRef],
) -> Result<(), ChatError> {
// Record who WE invited before touching the conversation: after_op
// forwards a welcome only to joiners in pending_invites (member-id
// bytes == account name bytes for LocalDemlsMember).
let mut kps = Vec::with_capacity(members.len());
for member in members {
let device_id = NamespacedIdentity::prefix(member, DEMLS_KEYPACKAGE_NAMESPACE);
let kp_bytes = service_ctx
.registry
.retrieve(&device_id)
.map_err(ChatError::generic)?
.ok_or_else(|| ChatError::generic("No key package"))?;
self.pending_invites
.push(member.as_str().as_bytes().to_vec());
kps.push(kp_bytes);
}
let conv = self
.conversation
.as_mut()
.ok_or_else(|| ChatError::generic("no conversation"))?;
for kp_bytes in &kps {
conv.add_member(kp_bytes)?;
}
self.after_op(service_ctx)?;
Ok(())
}
// fn conversation_state(&self) -> Result<ConversationState, ChatError> {
// Ok(self
// .conversation
// .as_ref()
// .map(|c| c.state())
// .unwrap_or(ConversationState::PendingJoin))
// }
}
impl GroupV2Convo {
fn after_op<S: ExternalServices>(
&mut self,
service_ctx: &mut ServiceContext<S>,
) -> Result<Vec<ConversationEvent>, ChatError> {
let Some(conv) = self.conversation.as_ref() else {
return Ok(Vec::new()); // still pending join — nothing buffered
};
// Pull everything first (these are &self, take-all):
let events = conv.drain_events();
let outbound = conv.drain_outbound(); // Vec<de_mls::session::Outbound>
let wakeup = conv.next_wakeup_in();
// 1. Route welcomes for joiners WE invited (event fires on every member now).
for evt in &events {
if let ConversationEvent::WelcomeReady { welcome, .. } = evt {
for joiner in &welcome.joiner_identities {
if let Some(i) = self.pending_invites.iter().position(|p| p == joiner) {
self.pending_invites.remove(i);
let name = String::from_utf8(joiner.clone()).map_err(ChatError::generic)?;
crate::inbox_v2::invite_user_v2(
&mut service_ctx.ds,
&IdentId::new(name),
welcome,
)?;
}
}
}
}
// 2. Publish
for out in outbound {
let frame = GroupV2Frame {
payload: Some(GroupV2Payload::DeMlsWrapper(out.payload.into())),
sender_app_id: out.sender, // was pkt.app_id
};
let payload = AddressedEncryptedPayload {
delivery_address: Self::delivery_address_from_id(&out.conversation_id),
data: EncryptedPayload {
encryption: Some(encrypted_payload::Encryption::Plaintext(Plaintext {
payload: frame.encode_to_vec().into(),
})),
},
};
service_ctx
.ds
.publish(payload.into_envelope(out.conversation_id))
.map_err(ChatError::generic)?;
}
// 3. Re-arm the alarm with the conversation's earliest deadline.
if let Some(d) = wakeup {
service_ctx
.wakeup_service
.wakeup_in(d, self.convo_id.clone());
}
Ok(events)
}
fn events_to_content(&self, events: &[ConversationEvent]) -> Option<ConvoOutcome> {
events.iter().find_map(|evt| match evt {
ConversationEvent::AppMessage(AppMessageProto {
payload: Some(app_message::Payload::ConversationMessage(cm)),
}) => Some(ConvoOutcome {
convo_id: self.convo_id.clone(),
content: Some(Content {
bytes: cm.message.clone(),
}),
}),
_ => None,
})
}
}
use prost::{Oneof, bytes::Bytes};
#[derive(Clone, PartialEq, Message)]
pub struct GroupV2Frame {
#[prost(oneof = "GroupV2Payload", tags = "2, 3")]
pub payload: Option<GroupV2Payload>,
#[prost(bytes = "vec", tag = "4")]
pub sender_app_id: Vec<u8>,
}
#[derive(Clone, PartialEq, Oneof)]
pub enum GroupV2Payload {
#[prost(message, tag = "2")]
DeMlsWrapper(Bytes),
#[prost(message, tag = "3")]
MlsCommitMessage(Bytes),
}

View File

@ -275,6 +275,10 @@ impl<S: ExternalServices> Convo<S> for PrivateV1Convo {
content,
})
}
fn wakeup(&mut self, _: &mut ServiceContext<S>) -> Result<(), ChatError> {
Ok(())
}
}
impl Debug for PrivateV1Convo {

View File

@ -1,8 +1,9 @@
use crate::causal_history::{CausalHistoryStore, MissingMessage};
use crate::conversation::{ConversationIdRef, GroupV1Convo, GroupV2Convo, PrivateV1Convo};
use crate::service_context::{ExternalServices, ServiceContext};
use crate::{DeliveryService, IdentityProvider, RegistrationService};
use crate::{DeliveryService, IdentityProvider, RegistrationService, WakeupService};
use crate::{
conversation::{Convo, GroupConvo, GroupV1Convo, PrivateV1Convo},
conversation::{Convo, GroupConvo},
errors::ChatError,
inbox::Inbox,
inbox_v2::{InboxV2, MlsEphemeralPqProvider, MlsIdentityProvider},
@ -10,9 +11,11 @@ use crate::{
proto::{EncryptedPayload, EnvelopeV1, Message},
};
use crypto::{Identity, PublicKey};
use openmls::prelude::GroupId;
use openmls::group::GroupId;
use shared_traits::IdentIdRef;
use std::collections::HashMap;
use storage::{ChatStore, ConversationKind, ConversationStore};
use tracing::{info, instrument};
pub use crate::conversation::ConversationId;
pub use crate::inbox::Introduction;
@ -27,15 +30,18 @@ pub struct Core<S: ExternalServices> {
services: ServiceContext<S>,
inbox: Inbox,
pq_inbox: InboxV2,
// Cache of loaded conversations
cached_convos: HashMap<String, ConvoTypeOwned<S>>,
}
// Constructors live on the `(DS, RS, CS)` form: `S` can't be inferred backwards
// through `S::DS`, so the bundle is built from the three args here.
impl<IP, DS, RS, CS> Core<(IP, DS, RS, CS)>
impl<IP, DS, RS, WS, CS> Core<(IP, DS, RS, WS, CS)>
where
IP: IdentityProvider + 'static,
DS: DeliveryService + 'static,
RS: RegistrationService + 'static,
WS: WakeupService + 'static,
CS: ChatStore + 'static,
{
/// Opens or creates a `Core` with the given storage configuration.
@ -46,6 +52,7 @@ where
ident: IP,
delivery: DS,
registration: RS,
wakeup_service: WS,
mut store: CS,
) -> Result<Self, ChatError> {
let identity = if let Some(identity) = store.load_identity()? {
@ -56,7 +63,14 @@ where
identity
};
Self::assemble(ident, identity, delivery, registration, store)
Self::assemble(
ident,
identity,
delivery,
registration,
wakeup_service,
store,
)
}
/// Creates a new in-memory `Core` (for testing).
@ -66,10 +80,18 @@ where
ident: IP,
delivery: DS,
registration: RS,
wakeup_service: WS,
store: CS,
) -> Result<Self, ChatError> {
let identity = Identity::new(ident.id().as_str().to_string());
let mut core = Self::assemble(ident, identity, delivery, registration, store)?;
let mut core = Self::assemble(
ident,
identity,
delivery,
registration,
wakeup_service,
store,
)?;
core.register_keypackage()?;
core.register_account_bundle()?;
@ -83,6 +105,7 @@ where
identity: Identity,
mut delivery: DS,
registration: RS,
wakeup_service: WS,
store: CS,
) -> Result<Self, ChatError> {
let inbox = Inbox::new(&identity);
@ -109,9 +132,11 @@ where
mls_provider,
causal,
identity,
wakeup_service,
},
inbox,
pq_inbox,
cached_convos: HashMap::new(),
})
}
}
@ -180,6 +205,13 @@ impl<'a, S: ExternalServices + 'static> Core<S> {
pub fn create_group_convo(
&mut self,
participants: &[IdentIdRef],
) -> Result<ConversationId, ChatError> {
self.create_group_convo_v2(participants)
}
pub fn create_group_convo_v1(
&mut self,
participants: &[IdentIdRef],
) -> Result<ConversationId, ChatError> {
// TODO: (P1) Ensure errors are handled properly. This is a high chance for
// desynchronized state: MlsGroup persistence, conversation persistence, and
@ -193,7 +225,27 @@ impl<'a, S: ExternalServices + 'static> Core<S> {
kind: ConversationKind::GroupV1,
})?;
convo.add_member(&mut self.services, participants)?;
Ok(convo.id().to_string())
let convo_id = convo.id().to_string();
self.register_convo(ConvoTypeOwned::Group(Box::new(convo)))?;
Ok(convo_id)
}
pub fn create_group_convo_v2(
&mut self,
participants: &[IdentIdRef],
) -> Result<ConversationId, ChatError> {
// TODO: (P1) Ensure errors are handled properly. This is a high chance for
// desynchronized state: MlsGroup persistence, conversation persistence, and
// invite delivery all happen separately.
let mut convo = GroupV2Convo::new(&mut self.services)?;
convo.add_member(&mut self.services, participants)?;
let convo_id = convo.id().to_string();
self.register_convo(ConvoTypeOwned::Group(Box::new(convo)))?;
Ok(convo_id)
}
/// Add members to an existing group conversation.
@ -202,13 +254,38 @@ impl<'a, S: ExternalServices + 'static> Core<S> {
convo_id: &str,
members: &[IdentIdRef],
) -> Result<(), ChatError> {
let mut convo = self.load_group_convo(convo_id)?;
convo.add_member(&mut self.services, members)
if self.cached_convos.contains_key(convo_id) {
let convo = self
.cached_convos
.get_mut(convo_id)
.ok_or_else(|| ChatError::NoConvo(convo_id.to_string()))?;
match convo {
ConvoTypeOwned::Group(group_convo) => {
group_convo.add_member(&mut self.services, members)
}
}
} else {
let mut convo = self.load_group_convo(convo_id)?;
convo.add_member(&mut self.services, members)
}
}
pub fn list_conversations(&self) -> Result<Vec<ConversationId>, ChatError> {
// Check Legacy load_convo store
let records = self.services.store.load_conversations()?;
Ok(records.into_iter().map(|r| r.local_convo_id).collect())
let mut convos: Vec<ConversationId> =
records.into_iter().map(|r| r.local_convo_id).collect();
// Add cached mls convos
for convo in self.cached_convos.keys() {
convos.push(convo.to_string());
}
// Conversations may use both storage mechanisms.
// Remove duplicates
convos.dedup();
Ok(convos)
}
pub fn take_missing_messages(&self) -> Vec<MissingMessage> {
@ -217,11 +294,20 @@ impl<'a, S: ExternalServices + 'static> Core<S> {
/// Encrypt and publish `content` to an existing conversation.
pub fn send_content(&mut self, convo_id: &str, content: &[u8]) -> Result<(), ChatError> {
let mut convo = self.load_convo(convo_id)?;
convo.send_content(&mut self.services, content)
if self.cached_convos.contains_key(convo_id) {
let convo = self
.cached_convos
.get_mut(convo_id)
.ok_or_else(|| ChatError::NoConvo(convo_id.to_string()))?;
convo.send_content(&mut self.services, content)
} else {
let mut convo = self.load_convo(convo_id)?;
convo.send_content(&mut self.services, content)
}
}
// Decode bytes and send to protocol for processing.
#[instrument(name = "core.handle_frame", skip_all, fields(user_id = %self.services.mls_identity.display_name()))]
pub fn handle_payload(&mut self, payload: &[u8]) -> Result<PayloadOutcome, ChatError> {
let env = EnvelopeV1::decode(payload)?;
@ -230,7 +316,10 @@ impl<'a, S: ExternalServices + 'static> Core<S> {
match convo_id {
c if c == self.inbox.id() => self.dispatch_to_inbox(&env.payload).map(Into::into),
c if c == self.pq_inbox.id() => self.dispatch_to_inbox2(&env.payload).map(Into::into),
c if c == self.pq_inbox.id() => self.dispatch_to_inbox2(&env.payload),
c if self.cached_convos.contains_key(&c) => {
self.dispatch_to_convo(&c, &env.payload).map(Into::into)
}
c if self.services.store.has_conversation(&c)? => {
self.dispatch_to_convo(&c, &env.payload).map(Into::into)
}
@ -250,8 +339,22 @@ impl<'a, S: ExternalServices + 'static> Core<S> {
}
// Dispatch encrypted payload to the post-quantum inbox.
fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result<InboxOutcome, ChatError> {
self.pq_inbox.handle_frame(payload, &mut self.services)
fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result<PayloadOutcome, ChatError> {
if let Some(convo) = self.pq_inbox.handle_frame(&mut self.services, payload)? {
let convo_id = convo.id().to_string();
// Cache convos created by InboxV2
self.register_convo(ConvoTypeOwned::Group(convo))?;
Ok(PayloadOutcome::Inbox(InboxOutcome {
new_conversation: crate::NewConversation {
convo_id,
class: crate::ConversationClass::Group,
},
initial: None,
}))
} else {
Ok(PayloadOutcome::Empty)
}
}
// Dispatch encrypted payload to its corresponding conversation.
@ -261,8 +364,49 @@ impl<'a, S: ExternalServices + 'static> Core<S> {
enc_payload_bytes: &[u8],
) -> Result<ConvoOutcome, ChatError> {
let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?;
let mut convo = self.load_convo(convo_id)?;
convo.handle_frame(&mut self.services, enc_payload)
if self.cached_convos.contains_key(convo_id) {
let convo_type = self
.cached_convos
.get_mut(convo_id)
.ok_or_else(|| ChatError::NoConvo(convo_id.to_string()))?;
convo_type.handle_frame(&mut self.services, enc_payload)
} else {
let mut convo = self.load_convo(convo_id)?;
convo.handle_frame(&mut self.services, enc_payload)
}
}
pub fn wakeup(&mut self, convo_id: ConversationIdRef) -> Result<(), ChatError> {
info!(convos = ?self.cached_convos.keys().collect::<Vec<_>>(), id = ?self.services.mls_identity.id(), "Cached Convos");
match convo_id {
c if c == self.pq_inbox.id() => todo!(),
c if self.cached_convos.contains_key(c) => self.wakeup_convo(c),
_ => Ok(()),
}
}
// Dispatch encrypted payload to its corresponding conversation
fn wakeup_convo(&mut self, convo_id: ConversationIdRef) -> Result<(), ChatError> {
let Some(convo) = self.cached_convos.get_mut(convo_id) else {
return Err(ChatError::generic("No Convo Found"));
};
let convo = match convo {
ConvoTypeOwned::Group(c) => c.as_mut(),
};
convo.wakeup(&mut self.services)
}
fn register_convo(&mut self, convo: ConvoTypeOwned<S>) -> Result<(), ChatError> {
let res = self.cached_convos.insert(convo.id().to_string(), convo);
match res {
Some(_) => Err(ChatError::generic("Convo already exists. Cannot save")),
None => Ok(()),
}
}
/// Rebuilds a conversation from storage — the one site that branches on
@ -319,3 +463,45 @@ impl<'a, S: ExternalServices + 'static> Core<S> {
.ok_or_else(|| ChatError::NoConvo(convo_id.into()))
}
}
#[derive(Debug)]
enum ConvoTypeOwned<S: ExternalServices> {
// Pairwise(Box<dyn BaseConvo<S>>),
Group(Box<dyn GroupConvo<S>>),
}
impl<'a, S: ExternalServices> ConvoTypeOwned<S> {
pub fn id(&'a self) -> ConversationIdRef<'a> {
match self {
ConvoTypeOwned::Group(group_convo) => group_convo.id(),
}
}
}
impl<S: ExternalServices> Convo<S> for ConvoTypeOwned<S> {
fn send_content(
&mut self,
cx: &mut ServiceContext<S>,
content: &[u8],
) -> Result<(), ChatError> {
match self {
ConvoTypeOwned::Group(group_convo) => group_convo.send_content(cx, content),
}
}
fn handle_frame(
&mut self,
cx: &mut ServiceContext<S>,
enc: EncryptedPayload,
) -> Result<ConvoOutcome, ChatError> {
match self {
ConvoTypeOwned::Group(group_convo) => group_convo.handle_frame(cx, enc),
}
}
fn wakeup(&mut self, service_ctx: &mut ServiceContext<S>) -> Result<(), ChatError> {
match self {
ConvoTypeOwned::Group(group_convo) => group_convo.wakeup(service_ctx),
}
}
}

View File

@ -1,3 +1,4 @@
use de_mls::{mls_crypto::MlsError, session::ConversationError};
use openmls::{framing::errors::MlsMessageError, prelude::tls_codec};
pub use thiserror::Error;
@ -37,6 +38,10 @@ pub enum ChatError {
KeyPackage(#[from] openmls::prelude::KeyPackageVerifyError),
#[error("Delivery: {0}")]
Delivery(String),
#[error("mls error: {0}")]
MlsError(#[from] MlsError),
#[error("demls error: {0}")]
DeMlsError(#[from] ConversationError),
}
impl ChatError {

View File

@ -1,31 +1,36 @@
mod identity;
mod mls_provider;
use crypto::Ed25519VerifyingKey;
pub use identity::MlsIdentityProvider;
pub(crate) use mls_provider::MlsEphemeralPqProvider;
use shared_traits::IdentId;
use shared_traits::IdentIdRef;
use chat_proto::logoschat::envelope::EnvelopeV1;
use crypto::Ed25519VerifyingKey;
use de_mls::protos::de_mls::messages::v1::MemberWelcome;
use openmls::prelude::tls_codec::Serialize;
use openmls::prelude::*;
use prost::{Message, Oneof};
use std::cell::RefCell;
use storage::{ConversationKind, ConversationMeta, ConversationStore};
use tracing::info;
use tracing::instrument;
pub use identity::MlsIdentityProvider;
pub(crate) use mls_provider::MlsEphemeralPqProvider;
use crate::ChatError;
use crate::DeliveryService;
use crate::IdentityProvider;
use crate::RegistrationService;
use crate::conversation::ConversationId;
use crate::conversation::GroupConvo;
use crate::conversation::GroupV1Convo;
use crate::outcomes::{ConversationClass, InboxOutcome, NewConversation};
use crate::conversation::GroupV2Convo;
use crate::service_context::{ExternalServices, ServiceContext};
use crate::utils::{blake2b_hex, hash_size};
use crate::{
AccountAuthority, AccountDirectory, AddressedEnvelope, SignedDeviceBundle,
encode_bundle_payload,
};
use crate::{IdentId, IdentIdRef, IdentityProvider};
// Downgraded from MLS_256_XWING_CHACHA20POLY1305_SHA256_Ed25519 until demls accepts an external provider
const CIPHER_SUITE: Ciphersuite = Ciphersuite::MLS_128_DHKEMX25519_AES128GCM_SHA256_Ed25519;
// Define unique Identifiers derivations used in InboxV2
fn delivery_address_for(ident_id: IdentIdRef) -> String {
@ -47,17 +52,43 @@ pub trait MlsProvider: OpenMlsProvider {
) -> Result<(), ChatError>;
}
/// Deliver a de-mls welcome to `account_id` over its InboxV2 1-1 channel.
/// Function mirroring the GroupV1 `invite_user` path, but carrying a de-mls `MemberWelcome`.
pub fn invite_user_v2<DS: DeliveryService>(
ds: &mut DS,
account_id: IdentIdRef,
welcome: &MemberWelcome,
) -> Result<(), ChatError> {
let frame = InboxV2Frame {
payload: Some(InviteType::GroupV2(welcome.encode_to_vec())),
};
let envelope = EnvelopeV1 {
conversation_hint: conversation_id_for(account_id),
salt: 0,
payload: frame.encode_to_vec().into(),
};
ds.publish(AddressedEnvelope {
delivery_address: delivery_address_for(account_id),
data: envelope.encode_to_vec(),
})
.map_err(ChatError::generic)
}
/// An PQ focused Conversation initializer.
/// InboxV2 Incorporates an Account based identity system to support PQ based conversation protocols
/// such as MLS.
pub struct InboxV2 {
// Account_id field is an owned value, so it can be returned via reference.
ident_id: IdentId,
pending_demls: RefCell<Option<GroupV2Convo>>,
}
impl InboxV2 {
pub fn new(ident_id: IdentId) -> Self {
Self { ident_id }
Self {
ident_id,
pending_demls: RefCell::new(None),
}
}
pub fn ident_id(&self) -> IdentIdRef<'_> {
@ -66,7 +97,7 @@ impl InboxV2 {
/// Submit MlsKeypackage to registration service
pub fn register<S: ExternalServices>(
&self,
&mut self,
cx: &mut ServiceContext<S>,
) -> Result<(), ChatError> {
let keypackage_bytes = Self::create_keypackage(cx)?.tls_serialize_detached()?;
@ -75,7 +106,15 @@ impl InboxV2 {
// "LastResort" package or publish multiple
cx.registry
.register(&cx.mls_identity, keypackage_bytes)
.map_err(ChatError::generic)
.map_err(ChatError::generic)?;
// de-mls (GroupV2) joiner: build a conversation-less User and register
// its de-mls key package under the same account name. This shadows the
// OpenMLS key package above in the registry; GroupV2 is the path the
// de-mls integration exercises.
*self.pending_demls.borrow_mut() = Some(GroupV2Convo::new_pending(cx)?);
Ok(())
}
pub fn delivery_address(&self) -> String {
@ -86,20 +125,38 @@ impl InboxV2 {
conversation_id_for(&self.ident_id)
}
#[instrument(name = "inboxV2.handle_frame", skip_all, fields(user_id = %service_ctx.mls_identity.display_name()))]
pub fn handle_frame<S: ExternalServices>(
&self,
service_ctx: &mut ServiceContext<S>,
payload_bytes: &[u8],
cx: &mut ServiceContext<S>,
) -> Result<InboxOutcome, ChatError> {
let inbox_frame = InboxV2Frame::decode(payload_bytes)?;
) -> Result<Option<Box<dyn GroupConvo<S>>>, ChatError> {
// On a broadcast transport the inbox address also receives traffic
// that isn't an invite (or that prost decodes into an empty frame).
// Treat anything we can't interpret as "not for us" and skip it,
// rather than failing the whole poll cycle.
let Ok(inbox_frame) = InboxV2Frame::decode(payload_bytes) else {
return Ok(None);
};
let Some(payload) = inbox_frame.payload else {
return Err(ChatError::BadParsing("InboxV2Payload missing"));
return Ok(None);
};
match payload {
InviteType::GroupV1(group_v1_heavy_invite) => {
self.handle_heavy_invite(group_v1_heavy_invite, cx)
InviteType::GroupV1(inv) => {
Ok(Some(Box::new(self.handle_heavy_invite(service_ctx, inv)?)))
}
InviteType::GroupV2(welcome_bytes) => {
info!("Process WelcomeMessage");
let mut convo = self
.pending_demls
.borrow_mut()
.take()
.ok_or_else(|| ChatError::generic("no pending de-mls convo"))?;
let mw =
MemberWelcome::decode(welcome_bytes.as_slice()).map_err(ChatError::generic)?;
convo.accept_welcome(service_ctx, &mw)?;
Ok(Some(Box::new(convo)))
}
}
}
@ -123,9 +180,9 @@ impl InboxV2 {
fn handle_heavy_invite<S: ExternalServices>(
&self,
invite: GroupV1HeavyInvite,
cx: &mut ServiceContext<S>,
) -> Result<InboxOutcome, ChatError> {
invite: GroupV1HeavyInvite,
) -> Result<GroupV1Convo, ChatError> {
let (msg_in, _rest) = MlsMessageIn::tls_deserialize_bytes(invite.welcome_bytes.as_slice())?;
let MlsMessageBodyIn::Welcome(welcome) = msg_in.extract() else {
@ -136,30 +193,22 @@ impl InboxV2 {
};
let convo = GroupV1Convo::new_from_welcome(cx, welcome)?;
let convo_id: ConversationId = convo.id().to_string();
self.persist_convo(&convo, cx)?;
Ok(InboxOutcome {
new_conversation: NewConversation {
convo_id,
class: ConversationClass::Group,
},
initial: None,
})
Ok(convo)
}
fn create_keypackage<S: ExternalServices>(
cx: &ServiceContext<S>,
) -> Result<KeyPackage, ChatError> {
let capabilities = Capabilities::builder()
.ciphersuites(vec![
Ciphersuite::MLS_256_XWING_CHACHA20POLY1305_SHA256_Ed25519,
])
.ciphersuites(vec![CIPHER_SUITE])
.extensions(vec![ExtensionType::ApplicationId])
.build();
let a = KeyPackage::builder()
.leaf_node_capabilities(capabilities)
.build(
Ciphersuite::MLS_256_XWING_CHACHA20POLY1305_SHA256_Ed25519,
CIPHER_SUITE,
&cx.mls_provider,
&cx.mls_identity,
cx.mls_identity.get_credential(),
@ -244,7 +293,7 @@ impl InboxV2 {
#[derive(Clone, PartialEq, Message)]
pub struct InboxV2Frame {
#[prost(oneof = "InviteType", tags = "1")]
#[prost(oneof = "InviteType", tags = "1, 2")]
pub payload: Option<InviteType>,
}
@ -252,6 +301,8 @@ pub struct InboxV2Frame {
pub enum InviteType {
#[prost(message, tag = "1")]
GroupV1(GroupV1HeavyInvite),
#[prost(bytes, tag = "2")]
GroupV2(Vec<u8>),
}
#[derive(Clone, PartialEq, Message)]

View File

@ -27,8 +27,8 @@ pub use outcomes::{
Content, ConversationClass, ConvoOutcome, InboxOutcome, NewConversation, PayloadOutcome,
};
pub use service_context::ExternalServices;
pub use service_traits::{DeliveryService, RegistrationService};
pub use shared_traits::IdentityProvider;
pub use service_traits::{DeliveryService, RegistrationService, WakeupService};
pub use shared_traits::{IdentId, IdentIdRef, IdentityProvider};
pub use storage::ConversationKind;
pub use types::AddressedEnvelope;
pub use utils::hex_trunc;

View File

@ -6,6 +6,7 @@ use storage::ChatStore;
use crate::IdentityProvider;
use crate::causal_history::CausalHistoryStore;
use crate::inbox_v2::{MlsEphemeralPqProvider, MlsIdentityProvider};
use crate::service_traits::WakeupService;
use crate::{DeliveryService, RegistrationService};
/// Bundles the external service types (`DS`, `RS`, `CS`) behind one `S`. The
@ -14,19 +15,22 @@ pub trait ExternalServices {
type IP: IdentityProvider;
type DS: DeliveryService;
type RS: RegistrationService;
type WS: WakeupService;
type CS: ChatStore;
}
impl<IP, DS, RS, CS> ExternalServices for (IP, DS, RS, CS)
impl<IP, DS, RS, WS, CS> ExternalServices for (IP, DS, RS, WS, CS)
where
IP: IdentityProvider,
DS: DeliveryService,
RS: RegistrationService,
WS: WakeupService,
CS: ChatStore,
{
type IP = IP;
type DS = DS;
type RS = RS;
type WS = WS;
type CS = CS;
}
@ -39,6 +43,7 @@ pub(crate) struct ServiceContext<S: ExternalServices> {
pub(crate) mls_provider: MlsEphemeralPqProvider,
pub(crate) causal: CausalHistoryStore,
pub(crate) identity: Identity,
pub(crate) wakeup_service: S::WS,
}
#[cfg(test)]
@ -106,7 +111,16 @@ mod test_support {
}
}
impl<IP: IdentityProvider, CS: ChatStore> ServiceContext<(IP, NoopDelivery, NoopRegistration, CS)> {
#[derive(Debug)]
pub(crate) struct NoopWakeups;
impl WakeupService for NoopWakeups {
fn wakeup_in(&mut self, _: std::time::Duration, _: crate::ConversationId) {}
}
impl<IP: IdentityProvider, CS: ChatStore>
ServiceContext<(IP, NoopDelivery, NoopRegistration, NoopWakeups, CS)>
{
/// Builds a context around a real store, stubbing other services.
pub(crate) fn for_test(ident: IP, store: CS) -> Result<Self, ChatError> {
let name = ident.id().as_str().to_string();
@ -118,6 +132,7 @@ mod test_support {
mls_provider: MlsEphemeralPqProvider::new().map_err(ChatError::generic)?,
causal: CausalHistoryStore::new(),
identity: Identity::new(name),
wakeup_service: NoopWakeups {},
})
}
}

View File

@ -2,9 +2,12 @@
/// platform clients. Platforms can alter the behaviour of the chat core by supplying
/// different implementations.
use shared_traits::IdentityProvider;
use std::{fmt::Debug, fmt::Display};
use std::{
fmt::{Debug, Display},
time::Duration,
};
use crate::{AccountDirectory, types::AddressedEnvelope};
use crate::{AccountDirectory, ConversationId, types::AddressedEnvelope};
/// A Delivery service is responsible for payload transport.
/// This interface allows Conversations to send payloads on the wire as well as
@ -62,3 +65,7 @@ impl<T: RegistrationService> KeyPackageProvider for T {
RegistrationService::retrieve(self, device_id)
}
}
pub trait WakeupService: Debug {
fn wakeup_in(&mut self, duration: Duration, convo_id: ConversationId);
}

View File

@ -6,13 +6,22 @@ edition = "2024"
# [[test]]
# name = "integration_tests_core"
[dev-dependencies]
[dependencies]
# Workspace dependencies (sorted)
chat-sqlite = { workspace = true }
components = { workspace = true }
libchat = { workspace = true }
logos-account = { workspace = true , features = ["dev"]}
logos-account = { workspace = true, features = ["dev"]}
shared-traits = { workspace = true }
# External dependencies (sorted)
tracing = "0.1"
[dev-dependencies]
chat-sqlite = { workspace = true }
storage = { workspace = true }
# External dependencies (sorted)
tempfile = "3"
tracing = "0.1.44"
tracing-subscriber = "0.3"

View File

@ -1 +1,4 @@
mod test_client;
mod wakeup;
pub use test_client::TestHarness;

View File

@ -0,0 +1,335 @@
use libchat::{ConversationId, Core, IdentityProvider, PayloadOutcome};
use logos_account::TestLogosAccount;
use shared_traits::IdentId;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::{Deref, DerefMut};
use std::time::Duration;
use tracing::{info, warn};
use components::{EphemeralRegistry, LocalBroadcaster, MemStore};
use crate::wakeup::{TestWakeupProvider, TestWakeupService, WakeupRecord};
type OnMessageCallback = dyn Fn(&TestClient, PayloadOutcome);
type WS = TestWakeupService;
type WP = TestWakeupProvider;
const SARO: usize = 0;
const RAYA: usize = 1;
const PAX: usize = 2;
const MIRA: usize = 3;
// type ClientType = CoreClient<TestLogosAccount, LocalBroadcaster, EphemeralRegistry, WP, MemStore>;
type ClientType = Core<(
TestLogosAccount,
LocalBroadcaster,
EphemeralRegistry,
WP,
MemStore,
)>;
#[derive(Debug)]
pub struct ReceivedMessage<T> {
pub convo_id: ConversationId,
pub contents: T,
}
pub struct TestClient {
inner: ClientType,
received_messages: Vec<ReceivedMessage<Vec<u8>>>,
}
impl TestClient {
fn init(client: ClientType) -> Self {
Self {
inner: client,
received_messages: vec![],
}
}
pub fn addr(&self) -> IdentId {
self.inner.ident_id().clone()
}
fn drain_outcomes(&mut self) -> Vec<PayloadOutcome> {
let mut messages = vec![];
while let Some(data) = self.inner.ds().poll() {
messages.push(data);
}
let mut outcomes = vec![];
for data in messages {
let outcome = self.inner.handle_payload(&data).unwrap();
warn!(id= ?self.ident_id(),?outcome, "DRAIN CLIENT");
// Copy Convo Messages to received buffer
match &outcome {
PayloadOutcome::Empty => continue,
PayloadOutcome::Convo(convo_outcome) => {
if let Some(data) = &convo_outcome.content {
info!(
content = String::from_utf8_lossy(&data.bytes).to_string(),
"COT"
);
self.received_messages.push(ReceivedMessage {
convo_id: convo_outcome.convo_id.clone(),
contents: data.bytes.clone(),
});
}
}
PayloadOutcome::Inbox(_) => {}
}
if !matches!(outcome, PayloadOutcome::Empty) {
outcomes.push(outcome);
}
}
outcomes
}
pub fn received_messages(&self) -> &[ReceivedMessage<Vec<u8>>] {
&self.received_messages
}
pub fn check(&self, convo_id: &str, content: &[u8]) -> bool {
for msg in &self.received_messages {
if msg.convo_id == convo_id && msg.contents == content {
return true;
}
}
false
}
pub fn convo_count(&self) -> usize {
self.list_conversations().map_or(0, |v| v.len())
}
}
impl Deref for TestClient {
type Target = ClientType;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl DerefMut for TestClient {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
#[allow(unused)]
pub struct Observation {
ident: IdentId,
outcome: PayloadOutcome,
}
#[allow(unused)]
pub struct TestHarness<const N: usize> {
addresses: HashMap<usize, IdentId>,
clients: Vec<TestClient>,
wakeup_service: WS,
cb: Box<OnMessageCallback>,
// List of outcomes that were detected across all clients.
pub observed_outcomes: Vec<Observation>,
}
impl<const N: usize> TestHarness<N> {
pub fn new(cb: impl Fn(&TestClient, PayloadOutcome) + 'static) -> Self {
const { assert!(N > 0, "TestHarness requires at least one client") };
const { assert!(N <= 4, "Only 4 clients are supported(Soft Limit") };
let mut clients = vec![];
let mut addresses = HashMap::new();
let ds = LocalBroadcaster::new();
let rs = EphemeralRegistry::new();
let ws = TestWakeupService::new();
for i in 0..N {
let wp = ws.new_provider(i);
let ident = TestLogosAccount::new(Self::names(i));
addresses.insert(i, ident.id().clone());
let core_client =
ClientType::new_with_name(ident, ds.clone(), rs.clone(), wp, MemStore::new())
.unwrap();
let client = TestClient::init(core_client);
clients.push(client);
}
dbg!(&rs);
Self {
addresses,
clients,
wakeup_service: ws,
cb: Box::new(cb),
observed_outcomes: vec![],
}
}
pub fn client(&mut self, i: usize) -> &TestClient {
&self.clients[i]
}
pub fn client_mut(&mut self, i: usize) -> &mut TestClient {
&mut self.clients[i]
}
fn names(i: usize) -> &'static str {
match i {
SARO => "saro",
RAYA => "raya",
PAX => "pax",
MIRA => "mira",
_ => "unnamed",
}
}
pub fn process(&mut self, duration: Duration) {
self.process_payloads();
let records = self.wakeup_service.advance_time(duration);
self.process_records(records);
}
pub fn process_until(&mut self, predicate: impl Fn(&mut TestHarness<N>) -> bool) {
let timeout = Duration::from_mins(1);
let step = Duration::from_millis(50);
let mut elapsed = Duration::ZERO;
while !predicate(self) {
if elapsed >= timeout {
panic!("process_until timed out after {:?}", timeout);
}
self.process(step);
elapsed += step;
}
}
pub fn process_until_label(
&mut self,
label: &str,
predicate: impl Fn(&mut TestHarness<N>) -> bool,
) {
info!(label, "Process Until");
self.process_until(predicate);
}
fn process_payloads(&mut self) {
// Process existing payloads for all clients.
for client in self.clients.iter_mut() {
for outcome in client.drain_outcomes() {
info!(id = ?client.ident_id(), ?outcome, "Process drain");
self.observed_outcomes.push(Observation {
ident: client.ident_id().clone(),
outcome: outcome.clone(),
});
info!(id = ?client.ident_id(), ?outcome, "Process drain");
(self.cb)(client, outcome)
}
}
}
fn process_records(&mut self, records: Vec<WakeupRecord>) {
for record in records {
self.clients[record.client_index]
.wakeup(&record.convo_id)
.expect("Error During wakeup");
}
}
}
// Avoid Developer confusion by gating access functions
// based on the number of clients in the harness
impl TestHarness<1> {
pub fn saro(&mut self) -> &mut TestClient {
&mut self.clients[SARO]
}
}
impl TestHarness<2> {
pub fn saro(&mut self) -> &mut TestClient {
&mut self.clients[SARO]
}
pub fn raya(&mut self) -> &mut TestClient {
&mut self.clients[RAYA]
}
}
impl TestHarness<3> {
pub fn saro(&mut self) -> &mut TestClient {
&mut self.clients[SARO]
}
pub fn raya(&mut self) -> &mut TestClient {
&mut self.clients[RAYA]
}
pub fn pax(&mut self) -> &mut TestClient {
&mut self.clients[PAX]
}
}
impl TestHarness<4> {
pub fn saro(&mut self) -> &mut TestClient {
&mut self.clients[SARO]
}
pub fn raya(&mut self) -> &mut TestClient {
&mut self.clients[RAYA]
}
pub fn pax(&mut self) -> &mut TestClient {
&mut self.clients[PAX]
}
pub fn mira(&mut self) -> &mut TestClient {
&mut self.clients[MIRA]
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_() {
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_test_writer()
.try_init();
let mut harness = TestHarness::<2>::new(|client, outcome| {
info!( id=?&client.ident_id(), outcome = ?outcome, "Result");
});
//Create Convo
let particpants = &[&harness.raya().addr()];
let convo_id = harness
.saro()
.create_group_convo(particpants)
.expect("saro create group");
harness.process_until_label("Raya Join", |h| h.raya().convo_count() == 1);
assert_eq!(harness.raya().convo_count(), 1, "raya did not join");
harness
.saro()
.send_content(convo_id.as_str(), b"Hello")
.expect("raya send");
harness.process(Duration::from_millis(200));
assert!(harness.raya().check(&convo_id, b"Hello"))
}
}

View File

@ -0,0 +1,176 @@
use libchat::{ConversationId, WakeupService};
use std::cell::RefCell;
use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::fmt::Debug;
use std::rc::Rc;
use std::time::Duration;
use tracing::{info, trace};
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd)]
pub(crate) struct WakeupRecord {
pub expiry: Duration,
pub client_index: usize,
pub convo_id: String,
}
pub struct TestWakeupProvider {
service: Rc<RefCell<InnerWakeupService>>,
client_index: usize,
}
impl Debug for TestWakeupProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TestWakeupProvider")
.field("client_index", &self.client_index)
.finish()
}
}
impl TestWakeupProvider {
pub fn new(service: Rc<RefCell<InnerWakeupService>>, id: usize) -> Self {
Self {
service,
client_index: id,
}
}
}
impl WakeupService for TestWakeupProvider {
fn wakeup_in(&mut self, duration: Duration, convo_id: ConversationId) {
info!(?duration, convo_id, "Wakeup In");
self.service
.borrow_mut()
.register_wakeup(duration, self.client_index, convo_id);
}
}
pub struct InnerWakeupService {
now: Duration,
pending: BinaryHeap<Reverse<WakeupRecord>>,
}
impl InnerWakeupService {
pub fn new() -> Self {
Self {
now: Duration::new(0, 0),
pending: BinaryHeap::new(),
}
}
pub fn register_wakeup(&mut self, wake_in: Duration, client_index: usize, convo_id: String) {
info!(%client_index, ?wake_in, "ask for wake up");
self.pending.push(Reverse(WakeupRecord {
expiry: self.now + wake_in,
client_index,
convo_id,
}));
}
fn get_expired(&mut self) -> Vec<WakeupRecord> {
trace!("Get Expired");
let mut fired = vec![];
while self
.pending
.peek()
.is_some_and(|Reverse(w)| w.expiry <= self.now)
{
let Reverse(w) = self.pending.pop().unwrap();
info!(now = self.now.as_secs(), w.convo_id, "Popping");
fired.push(w);
}
fired
}
}
pub struct TestWakeupService {
inner: Rc<RefCell<InnerWakeupService>>,
}
impl Debug for TestWakeupService {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let srv = self.inner.borrow_mut();
f.debug_struct("TestWakeupService")
.field("heap", &srv.pending)
.finish()
}
}
impl TestWakeupService {
pub fn new() -> Self {
Self {
inner: Rc::new(RefCell::new(InnerWakeupService::new())),
}
}
pub fn new_provider(&self, id: usize) -> TestWakeupProvider {
TestWakeupProvider {
service: self.inner.clone(),
client_index: id,
}
}
// Returns the ConvoIDs that triggered in order
pub fn advance_time(&mut self, duration: Duration) -> Vec<WakeupRecord> {
let mut srv = self.inner.borrow_mut();
trace!(?duration, "Advanced");
// de-mls deadlines are real wall-clock; sleep so the millisecond-scale
// commit/consensus timers actually elapse between poll cycles
// Note: This is error prone as WakeupService tracks its own `now` variable. Does not account for processing time.
std::thread::sleep(duration);
srv.now = srv.now.checked_add(duration).unwrap();
srv.get_expired()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_wakeup_service() {
let _ = tracing_subscriber::fmt().with_test_writer().try_init();
let mut ws = TestWakeupService::new();
let mut p1 = ws.new_provider(1);
let mut p2 = ws.new_provider(2);
p1.wakeup_in(Duration::from_secs(2), "convo1".into());
p1.wakeup_in(Duration::from_secs(4), "convo1".into());
p2.wakeup_in(Duration::from_secs(5), "convo1".into());
p2.wakeup_in(Duration::from_secs(4), "convo1".into());
{
let batch = ws.advance_time(Duration::from_secs(2));
assert_eq!(batch.len(), 1, "too many records");
assert_eq!(batch[0].client_index, 1, "client mismatch");
}
{
let batch = ws.advance_time(Duration::from_secs(2));
assert_eq!(batch.len(), 2, "too many records");
assert_eq!(
batch[0].client_index, 1,
"client 1 shoudld be first, as it was entered first"
);
assert_eq!(batch[1].client_index, 2, "client 2 should be second");
}
{
let batch = ws.advance_time(Duration::from_secs(1));
assert_eq!(batch.len(), 1, "too many records");
assert_eq!(batch[0].client_index, 2, "client mismatch");
}
{
let batch = ws.advance_time(Duration::from_secs(1));
assert_eq!(batch.len(), 0, "records should be completely drained");
}
}
}

View File

@ -7,13 +7,21 @@
use std::ops::{Deref, DerefMut};
use components::{EphemeralRegistry, LocalBroadcaster, MemStore};
use libchat::{Core, MissingMessage};
use libchat::{Core, MissingMessage, WakeupService};
use logos_account::TestLogosAccount;
#[derive(Debug)]
struct NoopWakeupService {}
impl WakeupService for NoopWakeupService {
fn wakeup_in(&mut self, _: std::time::Duration, _: libchat::ConversationId) {}
}
struct Client {
inner: Core<(
TestLogosAccount,
LocalBroadcaster,
EphemeralRegistry,
NoopWakeupService,
MemStore,
)>,
}
@ -24,6 +32,7 @@ impl Client {
TestLogosAccount,
LocalBroadcaster,
EphemeralRegistry,
NoopWakeupService,
MemStore,
)>,
) -> Self {
@ -54,6 +63,7 @@ impl Deref for Client {
TestLogosAccount,
LocalBroadcaster,
EphemeralRegistry,
NoopWakeupService,
MemStore,
)>;
fn deref(&self) -> &Self::Target {
@ -73,19 +83,31 @@ fn missing_group_message_is_detected() {
let rs = EphemeralRegistry::new();
let saro_account = TestLogosAccount::new("saro");
let saro_ctx =
Core::new_with_name(saro_account, ds.new_consumer(), rs.clone(), MemStore::new()).unwrap();
let saro_ctx = Core::new_with_name(
saro_account,
ds.new_consumer(),
rs.clone(),
NoopWakeupService {},
MemStore::new(),
)
.unwrap();
let raya_account = TestLogosAccount::new("raya");
let raya_ctx =
Core::new_with_name(raya_account, ds.clone(), rs.clone(), MemStore::new()).unwrap();
let raya_ctx = Core::new_with_name(
raya_account,
ds.clone(),
rs.clone(),
NoopWakeupService {},
MemStore::new(),
)
.unwrap();
let mut saro = Client::init(saro_ctx);
let mut raya = Client::init(raya_ctx);
// Saro creates a group with Raya.
let raya_id = raya.ident_id().clone();
let convo_id = saro.create_group_convo(&[&raya_id]).unwrap().to_string();
let convo_id = saro.create_group_convo_v1(&[&raya_id]).unwrap().to_string();
// Raya joins (processes the Welcome + commit).
raya.process_messages();

View File

@ -1,212 +1,60 @@
use std::ops::{Deref, DerefMut};
use components::{EphemeralRegistry, LocalBroadcaster, MemStore};
use libchat::{
Content, ConversationClass, ConvoOutcome, Core, NewConversation, PayloadOutcome, hex_trunc,
};
use logos_account::TestLogosAccount;
type ResultCallback = Box<dyn Fn(&PayloadOutcome)>;
// Simple client Functionality for testing
struct Client {
inner: Core<(
TestLogosAccount,
LocalBroadcaster,
EphemeralRegistry,
MemStore,
)>,
on_result: Option<ResultCallback>,
new_conversations: Vec<NewConversation>,
received_messages: Vec<(libchat::ConversationId, Content)>,
}
impl Client {
fn init(
core: Core<(
TestLogosAccount,
LocalBroadcaster,
EphemeralRegistry,
MemStore,
)>,
cb: Option<impl Fn(&PayloadOutcome) + 'static>,
) -> Self {
Client {
inner: core,
on_result: cb.map(|f| Box::new(f) as ResultCallback),
new_conversations: Vec::new(),
received_messages: Vec::new(),
}
}
fn process_messages(&mut self) {
let payloads: Vec<_> = {
let ds = self.ds();
std::iter::from_fn(|| ds.poll()).collect()
};
for data in payloads {
let result = self.handle_payload(&data).unwrap();
if let Some(cb) = &self.on_result {
cb(&result);
}
match result {
PayloadOutcome::Empty => {}
PayloadOutcome::Convo(co) => self.absorb_convo_outcome(co),
PayloadOutcome::Inbox(io) => {
self.new_conversations.push(io.new_conversation);
if let Some(initial) = io.initial {
self.absorb_convo_outcome(initial);
}
}
}
}
}
fn absorb_convo_outcome(&mut self, outcome: ConvoOutcome) {
if let Some(content) = outcome.content {
self.received_messages.push((outcome.convo_id, content));
}
}
}
impl Deref for Client {
type Target = Core<(
TestLogosAccount,
LocalBroadcaster,
EphemeralRegistry,
MemStore,
)>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl DerefMut for Client {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
// Higher order function to handle printing
fn pretty_print(prefix: impl Into<String>) -> ResultCallback {
let prefix = prefix.into();
Box::new(move |result: &PayloadOutcome| match result {
PayloadOutcome::Empty => {}
PayloadOutcome::Inbox(io) => {
let cid = hex_trunc(io.new_conversation.convo_id.as_bytes());
println!(
"{prefix} ({cid:?}) [conversation started: {:?}]",
io.new_conversation.class
);
if let Some(initial) = &io.initial {
print_contents(&prefix, initial);
}
}
PayloadOutcome::Convo(co) => print_contents(&prefix, co),
})
}
fn print_contents(prefix: &str, outcome: &ConvoOutcome) {
let cid = hex_trunc(outcome.convo_id.as_bytes());
if let Some(content) = &outcome.content {
let text = String::from_utf8_lossy(&content.bytes);
println!("{prefix} ({cid:?}) {text}");
}
}
fn process(clients: &mut Vec<Client>) {
for client in clients {
client.process_messages();
}
}
use integration_tests_core::TestHarness;
use std::time::Duration;
#[test]
fn create_group() {
let ds = LocalBroadcaster::new();
let rs = EphemeralRegistry::new();
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_test_writer()
.try_init();
let saro_ident = TestLogosAccount::new("saro");
let saro =
Core::new_with_name(saro_ident, ds.new_consumer(), rs.clone(), MemStore::new()).unwrap();
let mut harness = TestHarness::<3>::new(|_, _| {});
let raya_ident = TestLogosAccount::new("raya");
let raya = Core::new_with_name(raya_ident, ds.clone(), rs.clone(), MemStore::new()).unwrap();
let raya_id = harness.raya().ident_id().clone();
let pax_id = harness.pax().ident_id().clone();
let mut clients = vec![
Client::init(saro, Some(pretty_print(" Saro "))),
Client::init(raya, Some(pretty_print(" Raya "))),
];
const M_R1: &[u8; 12] = b"Hi From Raya";
const M_P1: &[u8; 13] = b"Hey it's Pax!";
const SARO: usize = 0;
const RAYA: usize = 1;
// Step: Saro Create Convo with Raya
let raya_id = clients[RAYA].ident_id().clone();
let convo_id = clients[SARO]
.create_group_convo(&[&raya_id])
.unwrap()
.to_string();
let convo_id = harness
.saro()
.create_group_convo_v1(&[&raya_id])
.expect("Saro invite Raya ");
harness.process_until(|h| h.raya().list_conversations().unwrap().len() == 1);
// Raya can read this message because
// 1) It was sent after add_members was committed, and
// 2) LocalBroadcaster provides historical messages.
// Step: Raya Send Content
clients[SARO]
.send_content(&convo_id, b"ok who broke the group chat again")
.unwrap();
harness
.raya()
.send_content(&convo_id, M_R1)
.expect("Raya send Msg");
process(&mut clients);
harness.process_until(|h| h.saro().received_messages().len() == 1);
// Raya should observe exactly one new Group conversation from the
// welcome, even though no initial content arrives with it.
let raya_started = clients[RAYA]
.new_conversations
.iter()
.filter(|nc| matches!(nc.class, ConversationClass::Group))
.count();
assert_eq!(
raya_started, 1,
"Raya should have observed exactly one new Group conversation for the welcome"
);
// Step: Saro add Pax
clients[RAYA]
.send_content(&convo_id, b"it was literally working five minutes ago")
.unwrap();
process(&mut clients);
let pax_ident = TestLogosAccount::new("pax");
let pax = Core::new_with_name(pax_ident, ds, rs, MemStore::new()).unwrap();
clients.push(Client::init(pax, Some(pretty_print(" Pax"))));
const PAX: usize = 2;
let pax_id = clients[PAX].ident_id().clone();
clients[SARO]
harness
.saro()
.group_add_member(&convo_id, &[&pax_id])
.unwrap();
.expect("Saro invite pax");
harness.process_until(|h| h.pax().list_conversations().unwrap().len() == 1);
process(&mut clients);
// Step: Pax send Content
let pax_started = clients[PAX]
.new_conversations
.iter()
.filter(|nc| matches!(nc.class, ConversationClass::Group))
.count();
assert_eq!(
pax_started, 1,
"Pax should have observed exactly one new Group conversation for the welcome"
);
harness
.pax()
.send_content(&convo_id, M_P1)
.expect("Pax send");
harness.process(Duration::from_millis(500));
clients[PAX]
.send_content(&convo_id, b"ngl the key rotation is cooked")
.unwrap();
assert!(harness.saro().check(&convo_id, M_R1));
assert!(harness.saro().check(&convo_id, M_P1));
process(&mut clients);
assert!(!harness.raya().check(&convo_id, M_R1));
assert!(harness.raya().check(&convo_id, M_P1));
clients[SARO]
.send_content(&convo_id, b"bro we literally just added you to the group ")
.unwrap();
process(&mut clients);
assert!(!harness.pax().check(&convo_id, M_R1));
assert!(!harness.pax().check(&convo_id, M_P1));
}

View File

@ -1,15 +1,22 @@
use chat_sqlite::{ChatStorage, StorageConfig};
use libchat::{ConversationClass, Core, Introduction, PayloadOutcome};
use libchat::{ConversationClass, Core, Introduction, PayloadOutcome, WakeupService};
use logos_account::TestLogosAccount;
use storage::{ConversationStore, IdentityStore};
use tempfile::tempdir;
use components::{EphemeralRegistry, LocalBroadcaster};
#[derive(Debug)]
struct NoopWakeupService {}
impl WakeupService for NoopWakeupService {
fn wakeup_in(&mut self, _: std::time::Duration, _: libchat::ConversationId) {}
}
type PrivateCore = Core<(
TestLogosAccount,
LocalBroadcaster,
EphemeralRegistry,
NoopWakeupService,
ChatStorage,
)>;
@ -60,11 +67,19 @@ fn ctx_integration() {
saro_account,
ds.clone(),
rs.clone(),
NoopWakeupService {},
ChatStorage::in_memory(),
)
.unwrap();
let raya_account = TestLogosAccount::new("raya");
let mut raya = Core::new_with_name(raya_account, ds, rs, ChatStorage::in_memory()).unwrap();
let mut raya = Core::new_with_name(
raya_account,
ds,
rs,
NoopWakeupService {},
ChatStorage::in_memory(),
)
.unwrap();
// Raya creates intro bundle and sends to Saro
let bundle = raya.create_intro_bundle().unwrap();
@ -107,7 +122,7 @@ fn identity_persistence() {
let rs = EphemeralRegistry::new();
let store1 = ChatStorage::new(StorageConfig::InMemory).unwrap();
let alice_account = TestLogosAccount::new("alice");
let ctx1 = Core::new_with_name(alice_account, ds, rs, store1).unwrap();
let ctx1 = Core::new_with_name(alice_account, ds, rs, NoopWakeupService {}, store1).unwrap();
let pubkey1 = ctx1.identity().public_key();
let name1 = ctx1.installation_name().to_string();
@ -127,7 +142,7 @@ fn open_persists_new_identity() {
let rs = EphemeralRegistry::new();
let store = ChatStorage::new(StorageConfig::File(db_path.clone())).unwrap();
let alice_account = TestLogosAccount::new("alice");
let core = Core::new_from_store(alice_account, ds, rs, store).unwrap();
let core = Core::new_from_store(alice_account, ds, rs, NoopWakeupService {}, store).unwrap();
let pubkey = core.identity().public_key();
drop(core);
@ -147,11 +162,19 @@ fn conversation_metadata_persistence() {
alice_account,
ds.clone(),
rs.clone(),
NoopWakeupService {},
ChatStorage::in_memory(),
)
.unwrap();
let bob_account = TestLogosAccount::new("bob");
let mut bob = Core::new_with_name(bob_account, ds, rs, ChatStorage::in_memory()).unwrap();
let mut bob = Core::new_with_name(
bob_account,
ds,
rs,
NoopWakeupService {},
ChatStorage::in_memory(),
)
.unwrap();
let bundle = alice.create_intro_bundle().unwrap();
let intro = Introduction::try_from(bundle.as_slice()).unwrap();
@ -180,11 +203,19 @@ fn conversation_full_flow() {
alice_account,
ds.clone(),
rs.clone(),
NoopWakeupService {},
ChatStorage::in_memory(),
)
.unwrap();
let bob_account = TestLogosAccount::new("bob");
let mut bob = Core::new_with_name(bob_account, ds, rs, ChatStorage::in_memory()).unwrap();
let mut bob = Core::new_with_name(
bob_account,
ds,
rs,
NoopWakeupService {},
ChatStorage::in_memory(),
)
.unwrap();
let bundle = alice.create_intro_bundle().unwrap();
let intro = Introduction::try_from(bundle.as_slice()).unwrap();

View File

@ -0,0 +1,178 @@
use integration_tests_core::TestHarness;
use tracing::info;
#[test]
fn groupv2_2way_roundtrip() {
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_test_writer()
.try_init();
const S_M1: &[u8] = b"aaaaa";
const R_M1: &[u8] = b"Hello";
// Initialize TestHarness with 2 clients
let mut harness = TestHarness::<2>::new(|_, _| {});
//Saro Create Convo
let particpants = &[&harness.raya().addr()];
let convo_id = harness
.saro()
.create_group_convo_v2(particpants)
.expect("saro create group");
// Carry the invite through (commit, WelcomeReady, routing to Raya's inbox,
// accept_welcome); settle until Raya has joined.
harness.process_until_label("Saro Send", |h| h.raya().convo_count() == 1);
// Saro sends a message; settle until Raya receives it.
info!(target: "chat", "Saro -> sending: {S_M1:?}");
harness
.saro()
.send_content(&convo_id, S_M1)
.expect("saro send");
harness.process_until(|h| h.raya().check(&convo_id, S_M1));
// Raya replies; settle until Saro receives it.
info!(target: "chat", "Raya -> sending:{R_M1:?}");
harness.raya().send_content(&convo_id, R_M1).unwrap();
harness.process_until(|h| h.saro().check(&convo_id, R_M1));
}
#[test]
fn core_client() {
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_test_writer()
.try_init();
const S_M1: &[u8] = b"HI";
const R_M1: &[u8] = b"hi back";
const S_M2: &[u8] = b"EPOCHCHK";
let mut harness = TestHarness::<3>::new(|_, _| {});
let particpants = &[&harness.raya().addr()];
let convo_id = harness
.saro()
.create_group_convo_v2(particpants)
.expect("Saro create");
// Carry the invite through (commit, WelcomeReady, routing to Raya's inbox,
// accept_welcome); settle until Raya has joined.
harness.process_until_label("saro create", |h| h.raya().convo_count() == 1);
// Saro sends a message; settle until Raya receives it.
info!(target: "chat", "Saro -> sending: {S_M1:?}");
harness
.saro()
.send_content(&convo_id, S_M1)
.expect("saro send");
harness.process_until_label("Recv S_M1", |h| h.raya().check(&convo_id, S_M1));
// Raya replies; settle until Saro receives it.
info!(target: "chat", "Raya -> sending: {R_M1:?}");
harness
.raya()
.send_content(&convo_id, R_M1)
.expect("raya send");
harness.process_until_label("Recv R_M1", |h| h.saro().check(&convo_id, R_M1));
// Raya (a non-creator) invites Pax; settle until Pax has joined.
let particpants = &[&harness.pax().addr()];
harness
.raya()
.group_add_member(&convo_id, particpants)
.expect("Raya add Pax");
harness.process_until_label("Raya add Pax", |h| h.pax().convo_count() == 1);
// Everyone must be at the SAME epoch after Pax joined: a marker Saro sends
// now decrypts only for members that applied the Add commit.
info!(target: "chat", "Saro -> sending: EPOCHCHK");
harness.saro().send_content(&convo_id, S_M2).unwrap();
harness.process_until_label("epoch check", |h| {
h.raya().check(&convo_id, S_M2) && h.pax().check(&convo_id, S_M2)
});
}
#[test]
fn core_client_batch_add() {
// Saro creates the group and adds BOTH Raya and Pax at the same time: one
// Add commit producing a single welcome that names both joiners.
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_test_writer();
let mut harness = TestHarness::<3>::new(|_, _| {});
let particpants = &[&harness.raya().addr(), &harness.pax().addr()];
harness
.saro()
.create_group_convo_v2(particpants)
.expect("Saro create");
// Carry the invite through (commit, WelcomeReady, routing to Raya's inbox,
// accept_welcome); settle until Raya has joined.
harness.process_until_label("saro create", |h| {
h.raya().convo_count() == 1 && h.pax().convo_count() == 1
});
}
#[test]
fn core_client_four_members_two_epochs() {
// Epoch 1: Saro creates and batch-adds Raya + Pax (3 members). Epoch 2: Raya
// (a non-creator) adds a 4th member, Mira. Afterwards every member must be
// at the same epoch (each can decrypt a freshly-sent message) and settled
// back in Working (the >sn_max election that the 4th member triggers must
// have completed — no one stuck in Freezing/Selection/Reelection).
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_test_writer()
.try_init();
const MSG: &[u8] = b"CONVERGED";
let mut harness = TestHarness::<4>::new(|_, _| {});
let particpants = &[&harness.raya().addr(), &harness.pax().addr()];
let convo_id = harness
.saro()
.create_group_convo_v2(particpants)
.expect("Saro create");
// Carry the invite through (commit, WelcomeReady, routing to Raya's inbox,
// accept_welcome); settle until Raya has joined.
harness.process_until_label("Raya + Pax join", |h| {
h.raya().convo_count() == 1 && h.pax().convo_count() == 1
});
// Epoch 2: Raya adds the 4th member; settle until Mira has joined and the
// >sn_max election has returned everyone to Working.
let members = &[&harness.mira().addr()];
harness
.raya()
.group_add_member(&convo_id, members)
.expect("Add Mira");
// TODO: Add State == Working for all clients
harness.process_until_label("Mira join", |h| h.mira().convo_count() == 1);
// Same epoch: a message Saro sends now must reach all three peers.
harness
.saro()
.send_content(&convo_id, MSG)
.expect("Saro send");
harness.process_until_label("all chats converge", |h| {
h.raya().check(&convo_id, MSG)
&& h.pax().check(&convo_id, MSG)
&& h.mira().check(&convo_id, MSG)
});
}

View File

@ -1,7 +1,7 @@
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use components::EphemeralRegistry;
use components::{EphemeralRegistry, ThreadedWakeupService, WakeupEvent};
use crossbeam_channel::{Receiver, Sender, select};
use libchat::{
ChatError, ChatStorage, ConversationId, ConvoOutcome, Core, DeliveryService, InboxOutcome,
@ -13,7 +13,7 @@ use parking_lot::Mutex;
use crate::errors::ClientError;
use crate::event::Event;
type ClientCore<T, R> = Core<(TestLogosAccount, T, R, ChatStorage)>;
type ClientCore<T, R> = Core<(TestLogosAccount, T, R, ThreadedWakeupService, ChatStorage)>;
/// The transport as the client sees it: a [`DeliveryService`] for outbound
/// publishing plus the inbound payload stream the worker drains. One object owns
@ -52,14 +52,17 @@ impl<T: Transport> ChatClient<T, EphemeralRegistry> {
pub fn new(name: impl Into<String>, mut transport: T) -> (Self, Receiver<Event>) {
let inbound = transport.inbound();
let ident = TestLogosAccount::new(name);
let (wakeup_tx, wakeup_rx) = crossbeam_channel::unbounded();
let wakeup_service = ThreadedWakeupService::new(wakeup_tx);
let core = Core::new_with_name(
ident,
transport,
EphemeralRegistry::new(),
wakeup_service,
ChatStorage::in_memory(),
)
.unwrap();
Self::spawn(core, inbound)
Self::spawn(core, inbound, wakeup_rx)
}
/// Open or create a persistent client backed by `StorageConfig`.
@ -74,8 +77,16 @@ impl<T: Transport> ChatClient<T, EphemeralRegistry> {
let store = ChatStorage::new(config).map_err(ChatError::from)?;
let inbound = transport.inbound();
let ident = TestLogosAccount::new(name);
let core = Core::new_from_store(ident, transport, EphemeralRegistry::new(), store)?;
Ok(Self::spawn(core, inbound))
let (wakeup_tx, wakeup_rx) = crossbeam_channel::unbounded();
let wakeup_service = ThreadedWakeupService::new(wakeup_tx);
let core = Core::new_from_store(
ident,
transport,
EphemeralRegistry::new(),
wakeup_service,
store,
)?;
Ok(Self::spawn(core, inbound, wakeup_rx))
}
}
@ -106,19 +117,25 @@ where
let store = ChatStorage::new(config).map_err(ChatError::from)?;
let inbound = transport.inbound();
let ident = TestLogosAccount::new(name);
let mut core = Core::new_from_store(ident, transport, registry, store)?;
let (wakeup_tx, wakeup_rx) = crossbeam_channel::unbounded();
let wakeup_service = ThreadedWakeupService::new(wakeup_tx);
let mut core = Core::new_from_store(ident, transport, registry, wakeup_service, store)?;
core.register_keypackage()?;
Ok(Self::spawn(core, inbound))
Ok(Self::spawn(core, inbound, wakeup_rx))
}
fn spawn(core: ClientCore<T, R>, inbound: Receiver<Vec<u8>>) -> (Self, Receiver<Event>) {
fn spawn(
core: ClientCore<T, R>,
inbound: Receiver<Vec<u8>>,
wakeup_events: Receiver<WakeupEvent>,
) -> (Self, Receiver<Event>) {
let core = Arc::new(Mutex::new(core));
let (event_tx, event_rx) = crossbeam_channel::unbounded();
let (shutdown_tx, shutdown_rx) = crossbeam_channel::bounded::<()>(0);
let worker = thread::spawn({
let core = Arc::clone(&core);
move || worker_loop(core, inbound, shutdown_rx, event_tx)
move || worker_loop(core, inbound, wakeup_events, shutdown_rx, event_tx)
});
(
@ -187,6 +204,7 @@ impl<T: DeliveryService, R: RegistrationService> Drop for ChatClient<T, R> {
fn worker_loop<T, R>(
core: Arc<Mutex<ClientCore<T, R>>>,
inbound: Receiver<Vec<u8>>,
wakeup_events: Receiver<WakeupEvent>,
shutdown: Receiver<()>,
event_tx: Sender<Event>,
) where
@ -217,6 +235,14 @@ fn worker_loop<T, R>(
}
}
}
recv(wakeup_events) -> msg => {
let Ok(WakeupEvent { convo_id }) = msg else {
return; // wakeup service's sender dropped
};
if let Err(e) = core.lock().wakeup(&convo_id) {
tracing::warn!("wakeup failed: {e:?}");
}
}
recv(shutdown) -> _ => return,
}
}

View File

@ -11,6 +11,7 @@ storage = { workspace = true }
# External dependencies (sorted)
base64 = "0.22"
crossbeam-channel = { workspace = true }
hex = "0.4.3"
reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] }

View File

@ -6,6 +6,7 @@ use std::{
};
use libchat::{AddressedEnvelope, DeliveryService};
use tracing::info;
#[derive(Debug)]
struct BroadcasterShared<T> {
@ -107,6 +108,7 @@ impl DeliveryService for LocalBroadcaster {
type Error = String;
fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Self::Error> {
info!(?envelope.delivery_address, len=envelope.data.len(), "DS:Publish");
self.outbound_msgs.push(Self::msg_id(&envelope));
self.shared.borrow_mut().messages.push_back(envelope);
@ -114,6 +116,7 @@ impl DeliveryService for LocalBroadcaster {
}
fn subscribe(&mut self, delivery_address: &str) -> Result<(), Self::Error> {
info!(delivery_address, "DS:Subscribe");
// Strict temporal ordering of subscriptions is not enforced.
// Subscriptions are evaluated on polling, not when the message is published
self.subscriptions.insert(delivery_address.to_string());

View File

@ -1,8 +1,10 @@
mod contact_registry;
mod delivery;
mod storage;
mod wakeup;
pub use contact_registry::EphemeralRegistry;
pub use contact_registry::http::{HttpRegistry, HttpRegistryError};
pub use delivery::*;
pub use storage::*;
pub use wakeup::*;

View File

@ -0,0 +1,133 @@
use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::fmt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
use crossbeam_channel::Sender;
use libchat::{ConversationId, WakeupService};
#[derive(Debug, Eq, PartialEq)]
struct WakeupRecord {
expiry: Instant,
convo_id: ConversationId,
}
impl Ord for WakeupRecord {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.expiry.cmp(&other.expiry)
}
}
impl PartialOrd for WakeupRecord {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
/// Sent to the wakeup queue when a previously registered timer expires.
#[derive(Debug, Clone)]
pub struct WakeupEvent {
pub convo_id: ConversationId,
}
struct Shared {
pending: Mutex<BinaryHeap<Reverse<WakeupRecord>>>,
condvar: Condvar,
running: AtomicBool,
}
/// A [`WakeupService`] backed by a background thread that sleeps until the
/// nearest pending deadline, then emits a [`WakeupEvent`] on `events`.
pub struct ThreadedWakeupService {
shared: Arc<Shared>,
thread: Option<JoinHandle<()>>,
}
impl fmt::Debug for ThreadedWakeupService {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ThreadedWakeupService").finish()
}
}
impl ThreadedWakeupService {
pub fn new(events: Sender<WakeupEvent>) -> Self {
let shared = Arc::new(Shared {
pending: Mutex::new(BinaryHeap::new()),
condvar: Condvar::new(),
running: AtomicBool::new(true),
});
let thread = thread::spawn({
let shared = Arc::clone(&shared);
move || run(shared, events)
});
Self {
shared,
thread: Some(thread),
}
}
}
impl WakeupService for ThreadedWakeupService {
fn wakeup_in(&mut self, duration: Duration, convo_id: ConversationId) {
let mut pending = self.shared.pending.lock().unwrap();
pending.push(Reverse(WakeupRecord {
expiry: Instant::now() + duration,
convo_id,
}));
// The worker may be sleeping until a later deadline; wake it so it
// can recompute the time until the new nearest deadline.
self.shared.condvar.notify_one();
}
}
impl Drop for ThreadedWakeupService {
fn drop(&mut self) {
self.shared.running.store(false, Ordering::SeqCst);
self.shared.condvar.notify_one();
if let Some(thread) = self.thread.take() {
let _ = thread.join();
}
}
}
/// Background loop: sleep until the nearest deadline (or forever if the heap
/// is empty), then drain and emit any expired records.
fn run(shared: Arc<Shared>, events: Sender<WakeupEvent>) {
loop {
let mut pending = shared.pending.lock().unwrap();
if !shared.running.load(Ordering::SeqCst) {
return;
}
let Some(Reverse(next)) = pending.peek() else {
// Nothing scheduled: wait until a registration or shutdown wakes us.
drop(shared.condvar.wait(pending).unwrap());
continue;
};
let now = Instant::now();
if next.expiry > now {
let timeout = next.expiry - now;
drop(shared.condvar.wait_timeout(pending, timeout).unwrap());
continue;
}
let Reverse(record) = pending.pop().unwrap();
drop(pending);
if events
.send(WakeupEvent {
convo_id: record.convo_id,
})
.is_err()
{
return;
}
}
}

View File

@ -48,6 +48,7 @@
pkgs.pkg-config
pkgs.cmake
pkgs.perl
pkgs.protobuf
];
};
}