diff --git a/Cargo.lock b/Cargo.lock index 5c70c00..69d998c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1610,7 +1610,6 @@ dependencies = [ "rand 0.9.4", "storage 0.1.0", "thiserror 2.0.18", - "tokio", "tracing", ] @@ -1881,7 +1880,7 @@ dependencies = [ [[package]] name = "de-mls" version = "3.0.0" -source = "git+https://github.com/vacp2p/de-mls?branch=main#b59183e1d92fdd08b99bb5e3ba1389ca9b60d68a" +source = "git+https://github.com/vacp2p/de-mls?branch=main#fe926935f338bf9998d58483fc5bcbecbb287bd4" dependencies = [ "hashgraph-like-consensus", "indexmap 2.14.0", @@ -2647,20 +2646,17 @@ dependencies = [ [[package]] name = "hashgraph-like-consensus" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d846ed9b2da5aa98872bbc7a0c143ccb7a4b2145d41bb4d5f9ad298030b38d5" +checksum = "b6505a8e7032d4060a54c22578807de3f3ef5eedf1fde62fc3b58354d42e91c1" dependencies = [ "alloy", "alloy-signer", - "async-stream", - "futures", "parking_lot", "prost 0.13.5", "prost-build", "sha2", "thiserror 2.0.18", - "tokio", "tracing", "uuid", ] @@ -6050,9 +6046,7 @@ dependencies = [ "bytes", "libc", "mio", - "parking_lot", "pin-project-lite", - "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.61.2", diff --git a/core/core_client/Cargo.toml b/core/core_client/Cargo.toml index 6199b58..9c6b410 100644 --- a/core/core_client/Cargo.toml +++ b/core/core_client/Cargo.toml @@ -4,8 +4,6 @@ version = "0.1.0" edition = "2024" - - [dependencies] # Workspace dependencies (sorted) blake2 = { workspace = true } @@ -16,9 +14,9 @@ storage = { workspace = true } # External dependencies (sorted) alloy = "1.8.3" -chat-proto = { git = "https://github.com/logos-messaging/chat_proto", branch ="version_rollback" } -de-mls = { git = "https://github.com/vacp2p/de-mls", branch = "main"} -hashgraph-like-consensus = "0.4.0" +chat-proto = { git = "https://github.com/logos-messaging/chat_proto", branch = "version_rollback" } +de-mls = { git = "https://github.com/vacp2p/de-mls", branch = "main" } +hashgraph-like-consensus = "0.5.0" hex = "0.4.3" openmls = "0.8.1" openmls_libcrux_crypto = "0.3.1" @@ -28,5 +26,4 @@ openmls_traits = "0.5.0" prost = "0.13.5" rand = "0.9" thiserror = "2.0.18" -tokio = "1.52.3" -tracing = "0.1.44" \ No newline at end of file +tracing = "0.1.44" diff --git a/core/core_client/src/conversation.rs b/core/core_client/src/conversation.rs index 0ad7d8d..42c42d6 100644 --- a/core/core_client/src/conversation.rs +++ b/core/core_client/src/conversation.rs @@ -3,6 +3,7 @@ mod group_v2; use crate::{AccountId, ContentData, DeliveryService, RegistrationService}; use chat_proto::logoschat::encryption::EncryptedPayload; +use de_mls::core::ConversationState; use libchat::{IdentityProvider, WakeupService}; use std::fmt::Debug; @@ -70,4 +71,6 @@ pub trait BaseGroupConvo: BaseConvo { service_ctx: &mut ServiceContext, members: &[&AccountId], ) -> Result<(), ChatError>; + + fn conversation_state(&self) -> Result; } diff --git a/core/core_client/src/conversation/group_v1.rs b/core/core_client/src/conversation/group_v1.rs index 3c87c17..fd4310c 100644 --- a/core/core_client/src/conversation/group_v1.rs +++ b/core/core_client/src/conversation/group_v1.rs @@ -7,6 +7,7 @@ use std::rc::Rc; use blake2::{Blake2b, Digest, digest::consts::U6}; use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload}; +use de_mls::app::ConversationState; use openmls::prelude::tls_codec::Deserialize; use openmls::prelude::*; @@ -300,6 +301,11 @@ where .publish(env) .map_err(|e| ChatError::Generic(format!("Publish: {e}"))) } + + // GroupV1 is not a de-mls conversation; it has no de-mls phase, setup working rn. + fn conversation_state(&self) -> Result { + Ok(ConversationState::Working) + } } impl GroupV1Convo { diff --git a/core/core_client/src/conversation/group_v2.rs b/core/core_client/src/conversation/group_v2.rs index f4b8d33..d69c367 100644 --- a/core/core_client/src/conversation/group_v2.rs +++ b/core/core_client/src/conversation/group_v2.rs @@ -1,24 +1,12 @@ // This Implementation is a Quick and Dirty Integration of DeMLS into libchat. // DeMLS and Libchat have different execution models, trait definitions and ownership/lifetimes of objects. // The easies path is to do a Spike to see what it would take, gather the friction points and then iterate. -// -// Since de-mls::user contains the state-machine and is Async the easiest path is to generate async runtimes -// for each call. This is inefficient but requres the lease amount of effort. -// Expect this branch to not be merged. - -macro_rules! run_async { - ($expr:expr) => { - tokio::runtime::Runtime::new() - .unwrap() - .block_on(async { $expr }) - }; -} use alloy::signers::local::PrivateKeySigner; use blake2::{Blake2b, Digest, digest::consts::U6}; use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload}; use de_mls::app::{ConsensusContext, ConversationConfig, SessionTick, User, UserPlugins}; -use de_mls::core::{ScoringConfig, SessionEvent, StewardListConfig}; +use de_mls::core::{ConversationState, ScoringConfig, SessionEvent, StewardListConfig}; use de_mls::defaults::{ DefaultConsensusPlugin, DefaultConversationPluginsFactory, MemoryDeMlsStorage, }; @@ -150,11 +138,6 @@ pub struct GroupV2Convo { // Use a wrapper for now, and then look at refactoring. buffer_ds: Arc>, app_id: String, - /// Inviter side: accounts we called `add_member` for, awaiting their - /// `WelcomeReady`. Each `WelcomeReady` is routed to one popped entry — see - /// the correlation caveat in `after_op` (only safe for one outstanding - /// invite today). - pending_invites: Vec, } impl std::fmt::Debug for GroupV2Convo { @@ -187,8 +170,7 @@ impl GroupV2Convo { ChatError, > { let identity = LocalDemlsMember::new(identity_name); - let credentials = - Arc::new(MlsCredentials::from_member_id(&identity).map_err(ChatError::generic)?); + let credentials = Arc::new(MlsCredentials::from_member_id(&identity)?); let storage = Arc::new(MemoryDeMlsStorage::new()); let conversation_plugins = DefaultConversationPluginsFactory::new(storage, credentials); @@ -229,11 +211,7 @@ impl GroupV2Convo { let identity_name = service_ctx.identity_provider.friendly_name(); let (mut user, transport, app_id) = Self::build_demls(identity_name)?; - run_async!( - user.start_conversation(convo_id.as_str(), true) - .await - .unwrap() - ); + user.start_conversation(convo_id.as_str(), true)?; // Ensure that the BufferDs gets drained transport.lock().unwrap().drain(service_ctx)?; @@ -243,7 +221,6 @@ impl GroupV2Convo { user, buffer_ds: transport, app_id, - pending_invites: vec![], }) } @@ -256,7 +233,7 @@ impl GroupV2Convo { let name = service_ctx.identity_provider.friendly_name(); let (user, transport, app_id) = Self::build_demls(name.clone())?; - let kp = user.generate_key_package().map_err(ChatError::generic)?; + let kp = user.generate_key_package()?; service_ctx .rs .register(&name, kp.as_bytes().to_vec()) @@ -267,7 +244,6 @@ impl GroupV2Convo { user, buffer_ds: transport, app_id, - pending_invites: vec![], }) } @@ -279,8 +255,7 @@ impl GroupV2Convo { service_ctx: &mut ServiceContext, welcome: &MemberWelcome, ) -> Result<(), ChatError> { - let (convo_id, tick) = run_async!(self.user.accept_welcome(&welcome.welcome_bytes).await) - .map_err(ChatError::generic)?; + let (convo_id, tick) = self.user.accept_welcome(&welcome.welcome_bytes)?; self.convo_id = convo_id; if !welcome.conversation_sync_bytes.is_empty() { @@ -291,13 +266,10 @@ impl GroupV2Convo { self.user.app_id().to_vec(), 0, ); - run_async!(self.user.process_inbound_packet(pkt).await).map_err(ChatError::generic)?; + self.user.process_inbound_packet(pkt)?; } - let events = self - .user - .drain_events(&self.convo_id) - .map_err(ChatError::generic)?; + let events = self.user.drain_events(&self.convo_id)?; self.init(service_ctx)?; self.after_op(service_ctx, tick, &events) } @@ -364,12 +336,9 @@ where ) -> Result<(), ChatError> { let _signer = MlsIdentityProvider(&service_ctx.identity_provider); - let tick = run_async!( - self.user - .send_app_message(&self.convo_id, content.to_vec()) - .await - .unwrap() - ); + let tick = self + .user + .send_app_message(&self.convo_id, content.to_vec())?; // Ensure that the BufferDs gets drained - done inside after_op self.after_op(service_ctx, tick, &vec![])?; Ok(()) @@ -403,11 +372,8 @@ where }; info!(len = packet.payload.len(), "Inbound Pkt"); - let tick = run_async!(self.user.process_inbound_packet(packet).await.unwrap()); - let events = self - .user - .drain_events(&self.convo_id) - .map_err(ChatError::generic)?; + let tick = self.user.process_inbound_packet(packet)?; + let events = self.user.drain_events(&self.convo_id)?; let out = self.events_to_content(events.clone()); self.after_op(service_ctx, tick, &events)?; Ok(out) @@ -416,11 +382,8 @@ where #[instrument(name = "groupv2.wakeup", skip_all, fields(user_id = %ctx.identity_provider.friendly_name()))] fn wakeup(&mut self, ctx: &mut ServiceContext) -> Result<(), ChatError> { info!(app = self.app_id(), "Wakeup"); - let tick = run_async!(self.user.poll_session(&self.convo_id).await.unwrap()); - let events = self - .user - .drain_events(&self.convo_id) - .map_err(ChatError::generic)?; + let tick = self.user.poll_session(&self.convo_id)?; + let events = self.user.drain_events(&self.convo_id)?; self.after_op(ctx, tick, &events) } } @@ -444,18 +407,17 @@ where .retrieve(member) .map_err(ChatError::generic)? .ok_or_else(|| ChatError::generic("No key package"))?; - last_tick = run_async!(self.user.add_member(&self.convo_id, &kp_bytes).await) - .map_err(ChatError::generic)?; - // Remember who we invited so after_op can route their - // WelcomeReady to their InboxV2 channel (FIFO). - self.pending_invites.push((*member).clone()); + last_tick = self.user.add_member(&self.convo_id, &kp_bytes)?; } - let events = self - .user - .drain_events(&self.convo_id) - .map_err(ChatError::generic)?; + let events = self.user.drain_events(&self.convo_id)?; self.after_op(service_ctx, last_tick, &events) } + + fn conversation_state(&self) -> Result { + self.user + .get_conversation_state(&self.convo_id) + .map_err(ChatError::DeMlsGeneric) + } } impl GroupV2Convo { @@ -465,17 +427,17 @@ impl GroupV2Convo { tick: SessionTick, events: &[SessionEvent], ) -> Result<(), ChatError> { - // Route any welcome our commit produced to the matching invitee over - // their InboxV2 1-1 channel. - // - // TODO(chat): welcome→invitee routing is positional, so only safe for - // one outstanding invite. `MemberWelcome` doesn't identify its joiner; - // batch invites need a real welcome→account match. + // Route each welcome to the joiners it names over their InboxV2 1-1 + // channel. The welcome carries `joiner_identities` (member-id bytes = + // account name), so any node that commits an Add can address delivery + // — no local invite tracking, and batch Adds route correctly. for evt in events { - if let SessionEvent::WelcomeReady(welcome) = evt - && let Some(account) = self.pending_invites.pop() - { - crate::inbox_v2::invite_user_v2(&mut service_ctx.ds, &account, welcome)?; + if let SessionEvent::WelcomeReady(welcome) = evt { + for joiner in &welcome.joiner_identities { + let name = String::from_utf8(joiner.clone()).map_err(ChatError::generic)?; + let account = AccountId::new(name); + crate::inbox_v2::invite_user_v2(&mut service_ctx.ds, &account, welcome)?; + } } } diff --git a/core/core_client/src/core_client.rs b/core/core_client/src/core_client.rs index 3775be0..f8d95aa 100644 --- a/core/core_client/src/core_client.rs +++ b/core/core_client/src/core_client.rs @@ -12,6 +12,7 @@ use crate::{AccountId, errors::ChatError}; use crate::{DeliveryService, IdentityProvider, RegistrationService}; use chat_proto::logoschat::encryption::EncryptedPayload; use chat_proto::logoschat::envelope::EnvelopeV1; +use de_mls::core::ConversationState; use libchat::{ContentData, WakeupService}; use prost::Message; use storage::ChatStore; @@ -53,6 +54,12 @@ where let mut client = self.client.borrow_mut(); client.add_member(&self.convo_id, participants) } + + pub fn conversation_state(&self) -> Result { + self.client + .borrow() + .conversation_state(self.convo_id.as_str()) + } } // This allows the ExternalServices trait to be converted from a tuple. @@ -287,6 +294,16 @@ where convo.add_member(&mut self.service_ctx, members) } + pub fn conversation_state( + &self, + convo_id: ConversationIdRef, + ) -> Result { + match self.cached_convos.get(convo_id) { + Some(ConvoTypeOwned::Group(c)) => c.conversation_state(), + None => Err(ChatError::generic("conversation not found")), + } + } + // Decode bytes and send to protocol for processing. pub fn handle_payload(&mut self, payload: &[u8]) -> Result, ChatError> { let env = EnvelopeV1::decode(payload)?; diff --git a/core/core_client/src/errors.rs b/core/core_client/src/errors.rs index 0310704..cf620d0 100644 --- a/core/core_client/src/errors.rs +++ b/core/core_client/src/errors.rs @@ -1,4 +1,4 @@ -use de_mls::mls_crypto::MlsError; +use de_mls::{app::UserError, mls_crypto::MlsError}; use openmls::prelude::tls_codec; pub use thiserror::Error; @@ -14,6 +14,8 @@ pub enum ChatError { Delivery(String), #[error("Demls: {0}")] DemlsWrapped(#[from] MlsError), + #[error("Demls generic: {0}")] + DeMlsGeneric(#[from] UserError), } impl ChatError { diff --git a/core/core_client/src/lib.rs b/core/core_client/src/lib.rs index c63baab..e128da7 100644 --- a/core/core_client/src/lib.rs +++ b/core/core_client/src/lib.rs @@ -10,4 +10,5 @@ pub use libchat::{ }; pub use core_client::{CoreClient, GroupConvo}; +pub use de_mls::core::ConversationState; pub use errors::ChatError; diff --git a/core/integration_tests_core/tests/dev_tests.rs b/core/integration_tests_core/tests/dev_tests.rs index 52a09f5..70c1bae 100644 --- a/core/integration_tests_core/tests/dev_tests.rs +++ b/core/integration_tests_core/tests/dev_tests.rs @@ -6,7 +6,7 @@ use tracing::{debug, info, warn}; use components::{EphemeralRegistry, LocalBroadcaster, MemStore}; -use core_client::CoreClient; +use core_client::{ConversationState, CoreClient}; use libchat::{ContentData, WakeupService, hex_trunc}; use logos_account::TestLogosAccount; @@ -89,6 +89,56 @@ fn process(clients: &mut Vec, wakeups: &mut Vec, } } +/// Pump the event loop until `done` holds, re-checking between fixed slices. +/// This is the settle barrier between test actions: do an action, call +/// `process_until()`, then do the next action. It +/// waits for the actual outcome rather than a guessed cycle count, so it +/// absorbs consensus retries and the ms-timer jitter. Fails loudly if the +/// condition isn't reached within `max_ms`. +fn process_until( + clients: &mut Vec, + wakeups: &mut Vec, + label: &str, + mut done: impl FnMut(&[PollableClient]) -> bool, + max_ms: u32, +) { + let slice = 200; + let mut elapsed = 0; + while elapsed < max_ms { + if done(clients) { + return; + } + process(clients, wakeups, slice); + elapsed += slice; + } + assert!( + done(clients), + "process_until({label}): not settled within {max_ms}ms" + ); +} + +/// True once `client` has joined (has a conversation). +fn joined(client: &PollableClient) -> bool { + client + .list_conversations() + .map(|c| !c.is_empty()) + .unwrap_or(false) +} + +/// True once `client`'s (first) conversation is back in `Working`. +fn is_working(client: &PollableClient) -> bool { + let Ok(convos) = client.list_conversations() else { + return false; + }; + let Some(id) = convos.first() else { + return false; + }; + client + .convo(id) + .map(|h| h.conversation_state().unwrap() == ConversationState::Working) + .unwrap_or(false) +} + use std::cmp::Reverse; use std::collections::BinaryHeap; @@ -159,15 +209,18 @@ impl WakeupService for ManualWakeupService { } } -// Higher order function to handle printing -fn pretty_print(prefix: impl Into) -> Box { +/// Per-client `on_content` callback: log each received message and record it into `sink` so a +/// test can assert who decrypted it — i.e. who is at the current epoch. +fn pretty_print( + prefix: impl Into, + sink: Rc>>, +) -> Box { let prefix = prefix.into(); Box::new(move |c: ContentData| { let cid = hex_trunc(c.conversation_id.as_bytes()); - let content = String::from_utf8_lossy(&c.data); - // Log via tracing (not println!) so received messages appear inline in - // the same INFO stream as the de-mls events, without needing --nocapture. + let content = String::from_utf8_lossy(&c.data).to_string(); warn!(target: "chat", convo = ?cid, "{prefix} received: {content}"); + sink.borrow_mut().push(content); }) } @@ -322,10 +375,14 @@ fn core_client() { .unwrap(); pwp.fill_slot(&pax); + let saro_rx = Rc::new(RefCell::new(Vec::::new())); + let raya_rx = Rc::new(RefCell::new(Vec::::new())); + let pax_rx = Rc::new(RefCell::new(Vec::::new())); + let mut clients = vec![ - PollableClient::init(saro, Some(pretty_print(" Saro "))), - PollableClient::init(raya, Some(pretty_print(" Raya "))), - PollableClient::init(pax, Some(pretty_print(" Pax "))), + PollableClient::init(saro, Some(pretty_print(" Saro ", saro_rx.clone()))), + PollableClient::init(raya, Some(pretty_print(" Raya ", raya_rx.clone()))), + PollableClient::init(pax, Some(pretty_print(" Pax ", pax_rx.clone()))), ]; let mut wakeups = vec![swp, rwp]; @@ -334,43 +391,47 @@ fn core_client() { const RAYA: usize = 1; const PAX: usize = 2; - let wait_time_ms: u32 = 400; - let saro_convo = clients[SARO] .create_group_convo(&[&clients[RAYA].account_id()]) .unwrap(); - // Bounded driver: de-mls reschedules its steward poll every tick, so a - // drain-until-empty loop (`process_all`) never terminates. Step a fixed - // number of seconds instead, like the de-mls integration tests do. - // - // This carries the commit through, fires `WelcomeReady`, routes the - // welcome to Raya's InboxV2 1-1 channel, and lets her `accept_welcome`. - // Run extra cycles afterward so Raya polls her inbox and joins after the - // welcome is published. - process(&mut clients, &mut wakeups, wait_time_ms); - - // Raya joined via the invite path. - let raya_convos = clients[RAYA].list_conversations().unwrap(); - assert!( - !raya_convos.is_empty(), - "Raya should have joined the conversation via the welcome invite" + // Carry the invite through (commit, WelcomeReady, routing to Raya's inbox, + // accept_welcome); settle until Raya has joined. + process_until( + &mut clients, + &mut wakeups, + "raya joins", + |c| joined(&c[RAYA]), + 6000, ); - // Saro sends a message; Raya receives it (look for "Raya received: HI" - // in the log). + let raya_convo = clients[RAYA] + .convo(&clients[RAYA].list_conversations().unwrap()[0]) + .expect("Raya must have a usable conversation handle"); + + // Saro sends a message; settle until Raya receives it. info!(target: "chat", "Saro -> sending: HI"); saro_convo.send_content(b"HI").unwrap(); - process(&mut clients, &mut wakeups, wait_time_ms); + process_until( + &mut clients, + &mut wakeups, + "raya receives HI", + |_| raya_rx.borrow().iter().any(|m| m == "HI"), + 4000, + ); - // Raya replies; Saro receives it (look for "Saro received: hi back"). - let raya_convo = clients[RAYA] - .convo(&raya_convos[0]) - .expect("Raya must have a usable conversation handle"); + // Raya replies; settle until Saro receives it. info!(target: "chat", "Raya -> sending: hi back"); raya_convo.send_content(b"hi back").unwrap(); - process(&mut clients, &mut wakeups, wait_time_ms); + process_until( + &mut clients, + &mut wakeups, + "saro receives hi back", + |_| saro_rx.borrow().iter().any(|m| m == "hi back"), + 4000, + ); + // Raya (a non-creator) invites Pax; settle until Pax has joined. if RAYA_INVITE { &raya_convo } else { @@ -378,17 +439,236 @@ fn core_client() { } .add_member(&[&clients[PAX].account_id()]) .unwrap(); + process_until( + &mut clients, + &mut wakeups, + "pax joins", + |c| joined(&c[PAX]), + 8000, + ); - process(&mut clients, &mut wakeups, wait_time_ms); - process(&mut clients, &mut wakeups, wait_time_ms); - process(&mut clients, &mut wakeups, wait_time_ms); - - let pax_convos = clients[PAX].list_conversations().unwrap(); - let pax_convo = clients[PAX] - .convo(&pax_convos[0]) - .expect("PAX must have a usable conversation handle"); - info!(target: "chat", "Pax -> sending: hi back"); - raya_convo.send_content(b"hi yall").unwrap(); - pax_convo.send_content(b"Hey I'm PAX").unwrap(); - process(&mut clients, &mut wakeups, wait_time_ms); + // Everyone must be at the SAME epoch after Pax joined: a marker Saro sends + // now decrypts only for members that applied the Add commit. + info!(target: "chat", "Saro -> sending: EPOCHCHK"); + saro_convo.send_content(b"EPOCHCHK").unwrap(); + process_until( + &mut clients, + &mut wakeups, + "raya+pax receive EPOCHCHK", + |_| { + raya_rx.borrow().iter().any(|m| m == "EPOCHCHK") + && pax_rx.borrow().iter().any(|m| m == "EPOCHCHK") + }, + 4000, + ); +} + +#[test] +fn core_client_batch_add() { + // Saro creates the group and adds BOTH Raya and Pax at the same time: one + // Add commit producing a single welcome that names both joiners. + let _ = tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .with_test_writer() + .try_init(); + + let swp = WakeupProvider::new(); + let rwp = WakeupProvider::new(); + let pwp = WakeupProvider::new(); + + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + + let saro = CoreClient::new( + TestLogosAccount::new("saro"), + ds.clone(), + rs.clone(), + swp.create_wakeup_service(), + MemStore::new(), + ) + .unwrap(); + swp.fill_slot(&saro); + let raya = CoreClient::new( + TestLogosAccount::new("raya"), + ds.clone(), + rs.clone(), + rwp.create_wakeup_service(), + MemStore::new(), + ) + .unwrap(); + rwp.fill_slot(&raya); + let pax = CoreClient::new( + TestLogosAccount::new("pax"), + ds.clone(), + rs.clone(), + pwp.create_wakeup_service(), + MemStore::new(), + ) + .unwrap(); + pwp.fill_slot(&pax); + + // This test asserts only on joins, not message receipt — discard the sinks. + let mut clients = vec![ + PollableClient::init( + saro, + Some(pretty_print( + " Saro ", + Rc::new(RefCell::new(vec![])), + )), + ), + PollableClient::init( + raya, + Some(pretty_print( + " Raya ", + Rc::new(RefCell::new(vec![])), + )), + ), + PollableClient::init( + pax, + Some(pretty_print( + " Pax ", + Rc::new(RefCell::new(vec![])), + )), + ), + ]; + let mut wakeups = vec![swp, rwp]; + + const SARO: usize = 0; + const RAYA: usize = 1; + const PAX: usize = 2; + + clients[SARO] + .create_group_convo(&[&clients[RAYA].account_id(), &clients[PAX].account_id()]) + .unwrap(); + + // One welcome names both joiners; settle until both have joined. + process_until( + &mut clients, + &mut wakeups, + "raya+pax join via batch welcome", + |c| joined(&c[RAYA]) && joined(&c[PAX]), + 6000, + ); +} + +#[test] +fn core_client_four_members_two_epochs() { + // Epoch 1: Saro creates and batch-adds Raya + Pax (3 members). Epoch 2: Raya + // (a non-creator) adds a 4th member, Mira. Afterwards every member must be + // at the same epoch (each can decrypt a freshly-sent message) and settled + // back in Working (the >sn_max election that the 4th member triggers must + // have completed — no one stuck in Freezing/Selection/Reelection). + let _ = tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .with_test_writer() + .try_init(); + + let swp = WakeupProvider::new(); + let rwp = WakeupProvider::new(); + let pwp = WakeupProvider::new(); + let mwp = WakeupProvider::new(); + + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + + let saro_rx = Rc::new(RefCell::new(Vec::::new())); + let raya_rx = Rc::new(RefCell::new(Vec::::new())); + let pax_rx = Rc::new(RefCell::new(Vec::::new())); + let mira_rx = Rc::new(RefCell::new(Vec::::new())); + + let saro = CoreClient::new( + TestLogosAccount::new("saro"), + ds.clone(), + rs.clone(), + swp.create_wakeup_service(), + MemStore::new(), + ) + .unwrap(); + swp.fill_slot(&saro); + let raya = CoreClient::new( + TestLogosAccount::new("raya"), + ds.clone(), + rs.clone(), + rwp.create_wakeup_service(), + MemStore::new(), + ) + .unwrap(); + rwp.fill_slot(&raya); + let pax = CoreClient::new( + TestLogosAccount::new("pax"), + ds.clone(), + rs.clone(), + pwp.create_wakeup_service(), + MemStore::new(), + ) + .unwrap(); + pwp.fill_slot(&pax); + let mira = CoreClient::new( + TestLogosAccount::new("mira"), + ds.clone(), + rs.clone(), + mwp.create_wakeup_service(), + MemStore::new(), + ) + .unwrap(); + mwp.fill_slot(&mira); + + let mut clients = vec![ + PollableClient::init(saro, Some(pretty_print(" Saro ", saro_rx.clone()))), + PollableClient::init(raya, Some(pretty_print(" Raya ", raya_rx.clone()))), + PollableClient::init(pax, Some(pretty_print(" Pax ", pax_rx.clone()))), + PollableClient::init( + mira, + Some(pretty_print(" Mira ", mira_rx.clone())), + ), + ]; + let mut wakeups = vec![swp, rwp, pwp, mwp]; + + const SARO: usize = 0; + const RAYA: usize = 1; + const PAX: usize = 2; + const MIRA: usize = 3; + + // Epoch 1: batch-add Raya and Pax; settle until both have joined. + let saro_convo = clients[SARO] + .create_group_convo(&[&clients[RAYA].account_id(), &clients[PAX].account_id()]) + .unwrap(); + process_until( + &mut clients, + &mut wakeups, + "raya+pax join", + |c| joined(&c[RAYA]) && joined(&c[PAX]), + 6000, + ); + + let raya_convo = clients[RAYA] + .convo(&clients[RAYA].list_conversations().unwrap()[0]) + .expect("Raya must have a usable conversation handle"); + + // Epoch 2: Raya adds the 4th member; settle until Mira has joined and the + // >sn_max election has returned everyone to Working. + raya_convo + .add_member(&[&clients[MIRA].account_id()]) + .unwrap(); + process_until( + &mut clients, + &mut wakeups, + "mira joins + all working", + |c| joined(&c[MIRA]) && [SARO, RAYA, PAX, MIRA].iter().all(|&i| is_working(&c[i])), + 10000, + ); + + // Same epoch: a message Saro sends now must reach all three peers. + saro_convo.send_content(b"CONVERGED").unwrap(); + process_until( + &mut clients, + &mut wakeups, + "everyone receives CONVERGED", + |_| { + [&raya_rx, &pax_rx, &mira_rx] + .iter() + .all(|rx| rx.borrow().iter().any(|m| m == "CONVERGED")) + }, + 4000, + ); }