remove async, fix issue with add tests, implement additional tests

This commit is contained in:
seemenkina 2026-06-04 11:50:55 +07:00
parent 4def589051
commit 0a21462432
No known key found for this signature in database
9 changed files with 395 additions and 133 deletions

12
Cargo.lock generated
View File

@ -1610,7 +1610,6 @@ dependencies = [
"rand 0.9.4",
"storage 0.1.0",
"thiserror 2.0.18",
"tokio",
"tracing",
]
@ -1881,7 +1880,7 @@ dependencies = [
[[package]]
name = "de-mls"
version = "3.0.0"
source = "git+https://github.com/vacp2p/de-mls?branch=main#b59183e1d92fdd08b99bb5e3ba1389ca9b60d68a"
source = "git+https://github.com/vacp2p/de-mls?branch=main#fe926935f338bf9998d58483fc5bcbecbb287bd4"
dependencies = [
"hashgraph-like-consensus",
"indexmap 2.14.0",
@ -2647,20 +2646,17 @@ dependencies = [
[[package]]
name = "hashgraph-like-consensus"
version = "0.4.0"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d846ed9b2da5aa98872bbc7a0c143ccb7a4b2145d41bb4d5f9ad298030b38d5"
checksum = "b6505a8e7032d4060a54c22578807de3f3ef5eedf1fde62fc3b58354d42e91c1"
dependencies = [
"alloy",
"alloy-signer",
"async-stream",
"futures",
"parking_lot",
"prost 0.13.5",
"prost-build",
"sha2",
"thiserror 2.0.18",
"tokio",
"tracing",
"uuid",
]
@ -6050,9 +6046,7 @@ dependencies = [
"bytes",
"libc",
"mio",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.61.2",

View File

@ -4,8 +4,6 @@ version = "0.1.0"
edition = "2024"
[dependencies]
# Workspace dependencies (sorted)
blake2 = { workspace = true }
@ -16,9 +14,9 @@ storage = { workspace = true }
# External dependencies (sorted)
alloy = "1.8.3"
chat-proto = { git = "https://github.com/logos-messaging/chat_proto", branch ="version_rollback" }
de-mls = { git = "https://github.com/vacp2p/de-mls", branch = "main"}
hashgraph-like-consensus = "0.4.0"
chat-proto = { git = "https://github.com/logos-messaging/chat_proto", branch = "version_rollback" }
de-mls = { git = "https://github.com/vacp2p/de-mls", branch = "main" }
hashgraph-like-consensus = "0.5.0"
hex = "0.4.3"
openmls = "0.8.1"
openmls_libcrux_crypto = "0.3.1"
@ -28,5 +26,4 @@ openmls_traits = "0.5.0"
prost = "0.13.5"
rand = "0.9"
thiserror = "2.0.18"
tokio = "1.52.3"
tracing = "0.1.44"
tracing = "0.1.44"

View File

@ -3,6 +3,7 @@ mod group_v2;
use crate::{AccountId, ContentData, DeliveryService, RegistrationService};
use chat_proto::logoschat::encryption::EncryptedPayload;
use de_mls::core::ConversationState;
use libchat::{IdentityProvider, WakeupService};
use std::fmt::Debug;
@ -70,4 +71,6 @@ pub trait BaseGroupConvo<S: ExternalServices>: BaseConvo<S> {
service_ctx: &mut ServiceContext<S>,
members: &[&AccountId],
) -> Result<(), ChatError>;
fn conversation_state(&self) -> Result<ConversationState, ChatError>;
}

View File

@ -7,6 +7,7 @@ use std::rc::Rc;
use blake2::{Blake2b, Digest, digest::consts::U6};
use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload};
use de_mls::app::ConversationState;
use openmls::prelude::tls_codec::Deserialize;
use openmls::prelude::*;
@ -300,6 +301,11 @@ where
.publish(env)
.map_err(|e| ChatError::Generic(format!("Publish: {e}")))
}
// GroupV1 is not a de-mls conversation; it has no de-mls phase, setup working rn.
fn conversation_state(&self) -> Result<ConversationState, ChatError> {
Ok(ConversationState::Working)
}
}
impl<MP: MlsProvider> GroupV1Convo<MP> {

View File

@ -1,24 +1,12 @@
// This Implementation is a Quick and Dirty Integration of DeMLS into libchat.
// DeMLS and Libchat have different execution models, trait definitions and ownership/lifetimes of objects.
// The easies path is to do a Spike to see what it would take, gather the friction points and then iterate.
//
// Since de-mls::user contains the state-machine and is Async the easiest path is to generate async runtimes
// for each call. This is inefficient but requres the lease amount of effort.
// Expect this branch to not be merged.
macro_rules! run_async {
($expr:expr) => {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { $expr })
};
}
use alloy::signers::local::PrivateKeySigner;
use blake2::{Blake2b, Digest, digest::consts::U6};
use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload};
use de_mls::app::{ConsensusContext, ConversationConfig, SessionTick, User, UserPlugins};
use de_mls::core::{ScoringConfig, SessionEvent, StewardListConfig};
use de_mls::core::{ConversationState, ScoringConfig, SessionEvent, StewardListConfig};
use de_mls::defaults::{
DefaultConsensusPlugin, DefaultConversationPluginsFactory, MemoryDeMlsStorage,
};
@ -150,11 +138,6 @@ pub struct GroupV2Convo {
// Use a wrapper for now, and then look at refactoring.
buffer_ds: Arc<Mutex<BufferDs>>,
app_id: String,
/// Inviter side: accounts we called `add_member` for, awaiting their
/// `WelcomeReady`. Each `WelcomeReady` is routed to one popped entry — see
/// the correlation caveat in `after_op` (only safe for one outstanding
/// invite today).
pending_invites: Vec<AccountId>,
}
impl std::fmt::Debug for GroupV2Convo {
@ -187,8 +170,7 @@ impl GroupV2Convo {
ChatError,
> {
let identity = LocalDemlsMember::new(identity_name);
let credentials =
Arc::new(MlsCredentials::from_member_id(&identity).map_err(ChatError::generic)?);
let credentials = Arc::new(MlsCredentials::from_member_id(&identity)?);
let storage = Arc::new(MemoryDeMlsStorage::new());
let conversation_plugins = DefaultConversationPluginsFactory::new(storage, credentials);
@ -229,11 +211,7 @@ impl GroupV2Convo {
let identity_name = service_ctx.identity_provider.friendly_name();
let (mut user, transport, app_id) = Self::build_demls(identity_name)?;
run_async!(
user.start_conversation(convo_id.as_str(), true)
.await
.unwrap()
);
user.start_conversation(convo_id.as_str(), true)?;
// Ensure that the BufferDs gets drained
transport.lock().unwrap().drain(service_ctx)?;
@ -243,7 +221,6 @@ impl GroupV2Convo {
user,
buffer_ds: transport,
app_id,
pending_invites: vec![],
})
}
@ -256,7 +233,7 @@ impl GroupV2Convo {
let name = service_ctx.identity_provider.friendly_name();
let (user, transport, app_id) = Self::build_demls(name.clone())?;
let kp = user.generate_key_package().map_err(ChatError::generic)?;
let kp = user.generate_key_package()?;
service_ctx
.rs
.register(&name, kp.as_bytes().to_vec())
@ -267,7 +244,6 @@ impl GroupV2Convo {
user,
buffer_ds: transport,
app_id,
pending_invites: vec![],
})
}
@ -279,8 +255,7 @@ impl GroupV2Convo {
service_ctx: &mut ServiceContext<S>,
welcome: &MemberWelcome,
) -> Result<(), ChatError> {
let (convo_id, tick) = run_async!(self.user.accept_welcome(&welcome.welcome_bytes).await)
.map_err(ChatError::generic)?;
let (convo_id, tick) = self.user.accept_welcome(&welcome.welcome_bytes)?;
self.convo_id = convo_id;
if !welcome.conversation_sync_bytes.is_empty() {
@ -291,13 +266,10 @@ impl GroupV2Convo {
self.user.app_id().to_vec(),
0,
);
run_async!(self.user.process_inbound_packet(pkt).await).map_err(ChatError::generic)?;
self.user.process_inbound_packet(pkt)?;
}
let events = self
.user
.drain_events(&self.convo_id)
.map_err(ChatError::generic)?;
let events = self.user.drain_events(&self.convo_id)?;
self.init(service_ctx)?;
self.after_op(service_ctx, tick, &events)
}
@ -364,12 +336,9 @@ where
) -> Result<(), ChatError> {
let _signer = MlsIdentityProvider(&service_ctx.identity_provider);
let tick = run_async!(
self.user
.send_app_message(&self.convo_id, content.to_vec())
.await
.unwrap()
);
let tick = self
.user
.send_app_message(&self.convo_id, content.to_vec())?;
// Ensure that the BufferDs gets drained - done inside after_op
self.after_op(service_ctx, tick, &vec![])?;
Ok(())
@ -403,11 +372,8 @@ where
};
info!(len = packet.payload.len(), "Inbound Pkt");
let tick = run_async!(self.user.process_inbound_packet(packet).await.unwrap());
let events = self
.user
.drain_events(&self.convo_id)
.map_err(ChatError::generic)?;
let tick = self.user.process_inbound_packet(packet)?;
let events = self.user.drain_events(&self.convo_id)?;
let out = self.events_to_content(events.clone());
self.after_op(service_ctx, tick, &events)?;
Ok(out)
@ -416,11 +382,8 @@ where
#[instrument(name = "groupv2.wakeup", skip_all, fields(user_id = %ctx.identity_provider.friendly_name()))]
fn wakeup(&mut self, ctx: &mut ServiceContext<S>) -> Result<(), ChatError> {
info!(app = self.app_id(), "Wakeup");
let tick = run_async!(self.user.poll_session(&self.convo_id).await.unwrap());
let events = self
.user
.drain_events(&self.convo_id)
.map_err(ChatError::generic)?;
let tick = self.user.poll_session(&self.convo_id)?;
let events = self.user.drain_events(&self.convo_id)?;
self.after_op(ctx, tick, &events)
}
}
@ -444,18 +407,17 @@ where
.retrieve(member)
.map_err(ChatError::generic)?
.ok_or_else(|| ChatError::generic("No key package"))?;
last_tick = run_async!(self.user.add_member(&self.convo_id, &kp_bytes).await)
.map_err(ChatError::generic)?;
// Remember who we invited so after_op can route their
// WelcomeReady to their InboxV2 channel (FIFO).
self.pending_invites.push((*member).clone());
last_tick = self.user.add_member(&self.convo_id, &kp_bytes)?;
}
let events = self
.user
.drain_events(&self.convo_id)
.map_err(ChatError::generic)?;
let events = self.user.drain_events(&self.convo_id)?;
self.after_op(service_ctx, last_tick, &events)
}
fn conversation_state(&self) -> Result<ConversationState, ChatError> {
self.user
.get_conversation_state(&self.convo_id)
.map_err(ChatError::DeMlsGeneric)
}
}
impl GroupV2Convo {
@ -465,17 +427,17 @@ impl GroupV2Convo {
tick: SessionTick,
events: &[SessionEvent],
) -> Result<(), ChatError> {
// Route any welcome our commit produced to the matching invitee over
// their InboxV2 1-1 channel.
//
// TODO(chat): welcome→invitee routing is positional, so only safe for
// one outstanding invite. `MemberWelcome` doesn't identify its joiner;
// batch invites need a real welcome→account match.
// Route each welcome to the joiners it names over their InboxV2 1-1
// channel. The welcome carries `joiner_identities` (member-id bytes =
// account name), so any node that commits an Add can address delivery
// — no local invite tracking, and batch Adds route correctly.
for evt in events {
if let SessionEvent::WelcomeReady(welcome) = evt
&& let Some(account) = self.pending_invites.pop()
{
crate::inbox_v2::invite_user_v2(&mut service_ctx.ds, &account, welcome)?;
if let SessionEvent::WelcomeReady(welcome) = evt {
for joiner in &welcome.joiner_identities {
let name = String::from_utf8(joiner.clone()).map_err(ChatError::generic)?;
let account = AccountId::new(name);
crate::inbox_v2::invite_user_v2(&mut service_ctx.ds, &account, welcome)?;
}
}
}

View File

@ -12,6 +12,7 @@ use crate::{AccountId, errors::ChatError};
use crate::{DeliveryService, IdentityProvider, RegistrationService};
use chat_proto::logoschat::encryption::EncryptedPayload;
use chat_proto::logoschat::envelope::EnvelopeV1;
use de_mls::core::ConversationState;
use libchat::{ContentData, WakeupService};
use prost::Message;
use storage::ChatStore;
@ -53,6 +54,12 @@ where
let mut client = self.client.borrow_mut();
client.add_member(&self.convo_id, participants)
}
pub fn conversation_state(&self) -> Result<ConversationState, ChatError> {
self.client
.borrow()
.conversation_state(self.convo_id.as_str())
}
}
// This allows the ExternalServices trait to be converted from a tuple.
@ -287,6 +294,16 @@ where
convo.add_member(&mut self.service_ctx, members)
}
pub fn conversation_state(
&self,
convo_id: ConversationIdRef,
) -> Result<ConversationState, ChatError> {
match self.cached_convos.get(convo_id) {
Some(ConvoTypeOwned::Group(c)) => c.conversation_state(),
None => Err(ChatError::generic("conversation not found")),
}
}
// Decode bytes and send to protocol for processing.
pub fn handle_payload(&mut self, payload: &[u8]) -> Result<Option<ContentData>, ChatError> {
let env = EnvelopeV1::decode(payload)?;

View File

@ -1,4 +1,4 @@
use de_mls::mls_crypto::MlsError;
use de_mls::{app::UserError, mls_crypto::MlsError};
use openmls::prelude::tls_codec;
pub use thiserror::Error;
@ -14,6 +14,8 @@ pub enum ChatError {
Delivery(String),
#[error("Demls: {0}")]
DemlsWrapped(#[from] MlsError),
#[error("Demls generic: {0}")]
DeMlsGeneric(#[from] UserError),
}
impl ChatError {

View File

@ -10,4 +10,5 @@ pub use libchat::{
};
pub use core_client::{CoreClient, GroupConvo};
pub use de_mls::core::ConversationState;
pub use errors::ChatError;

View File

@ -6,7 +6,7 @@ use tracing::{debug, info, warn};
use components::{EphemeralRegistry, LocalBroadcaster, MemStore};
use core_client::CoreClient;
use core_client::{ConversationState, CoreClient};
use libchat::{ContentData, WakeupService, hex_trunc};
use logos_account::TestLogosAccount;
@ -89,6 +89,56 @@ fn process(clients: &mut Vec<PollableClient>, wakeups: &mut Vec<WakeupProvider>,
}
}
/// Pump the event loop until `done` holds, re-checking between fixed slices.
/// This is the settle barrier between test actions: do an action, call
/// `process_until(<expected post-condition>)`, then do the next action. It
/// waits for the actual outcome rather than a guessed cycle count, so it
/// absorbs consensus retries and the ms-timer jitter. Fails loudly if the
/// condition isn't reached within `max_ms`.
fn process_until(
clients: &mut Vec<PollableClient>,
wakeups: &mut Vec<WakeupProvider>,
label: &str,
mut done: impl FnMut(&[PollableClient]) -> bool,
max_ms: u32,
) {
let slice = 200;
let mut elapsed = 0;
while elapsed < max_ms {
if done(clients) {
return;
}
process(clients, wakeups, slice);
elapsed += slice;
}
assert!(
done(clients),
"process_until({label}): not settled within {max_ms}ms"
);
}
/// True once `client` has joined (has a conversation).
fn joined(client: &PollableClient) -> bool {
client
.list_conversations()
.map(|c| !c.is_empty())
.unwrap_or(false)
}
/// True once `client`'s (first) conversation is back in `Working`.
fn is_working(client: &PollableClient) -> bool {
let Ok(convos) = client.list_conversations() else {
return false;
};
let Some(id) = convos.first() else {
return false;
};
client
.convo(id)
.map(|h| h.conversation_state().unwrap() == ConversationState::Working)
.unwrap_or(false)
}
use std::cmp::Reverse;
use std::collections::BinaryHeap;
@ -159,15 +209,18 @@ impl WakeupService for ManualWakeupService {
}
}
// Higher order function to handle printing
fn pretty_print(prefix: impl Into<String>) -> Box<dyn Fn(ContentData)> {
/// Per-client `on_content` callback: log each received message and record it into `sink` so a
/// test can assert who decrypted it — i.e. who is at the current epoch.
fn pretty_print(
prefix: impl Into<String>,
sink: Rc<RefCell<Vec<String>>>,
) -> Box<dyn Fn(ContentData)> {
let prefix = prefix.into();
Box::new(move |c: ContentData| {
let cid = hex_trunc(c.conversation_id.as_bytes());
let content = String::from_utf8_lossy(&c.data);
// Log via tracing (not println!) so received messages appear inline in
// the same INFO stream as the de-mls events, without needing --nocapture.
let content = String::from_utf8_lossy(&c.data).to_string();
warn!(target: "chat", convo = ?cid, "{prefix} received: {content}");
sink.borrow_mut().push(content);
})
}
@ -322,10 +375,14 @@ fn core_client() {
.unwrap();
pwp.fill_slot(&pax);
let saro_rx = Rc::new(RefCell::new(Vec::<String>::new()));
let raya_rx = Rc::new(RefCell::new(Vec::<String>::new()));
let pax_rx = Rc::new(RefCell::new(Vec::<String>::new()));
let mut clients = vec![
PollableClient::init(saro, Some(pretty_print(" Saro "))),
PollableClient::init(raya, Some(pretty_print(" Raya "))),
PollableClient::init(pax, Some(pretty_print(" Pax "))),
PollableClient::init(saro, Some(pretty_print(" Saro ", saro_rx.clone()))),
PollableClient::init(raya, Some(pretty_print(" Raya ", raya_rx.clone()))),
PollableClient::init(pax, Some(pretty_print(" Pax ", pax_rx.clone()))),
];
let mut wakeups = vec![swp, rwp];
@ -334,43 +391,47 @@ fn core_client() {
const RAYA: usize = 1;
const PAX: usize = 2;
let wait_time_ms: u32 = 400;
let saro_convo = clients[SARO]
.create_group_convo(&[&clients[RAYA].account_id()])
.unwrap();
// Bounded driver: de-mls reschedules its steward poll every tick, so a
// drain-until-empty loop (`process_all`) never terminates. Step a fixed
// number of seconds instead, like the de-mls integration tests do.
//
// This carries the commit through, fires `WelcomeReady`, routes the
// welcome to Raya's InboxV2 1-1 channel, and lets her `accept_welcome`.
// Run extra cycles afterward so Raya polls her inbox and joins after the
// welcome is published.
process(&mut clients, &mut wakeups, wait_time_ms);
// Raya joined via the invite path.
let raya_convos = clients[RAYA].list_conversations().unwrap();
assert!(
!raya_convos.is_empty(),
"Raya should have joined the conversation via the welcome invite"
// Carry the invite through (commit, WelcomeReady, routing to Raya's inbox,
// accept_welcome); settle until Raya has joined.
process_until(
&mut clients,
&mut wakeups,
"raya joins",
|c| joined(&c[RAYA]),
6000,
);
// Saro sends a message; Raya receives it (look for "Raya received: HI"
// in the log).
let raya_convo = clients[RAYA]
.convo(&clients[RAYA].list_conversations().unwrap()[0])
.expect("Raya must have a usable conversation handle");
// Saro sends a message; settle until Raya receives it.
info!(target: "chat", "Saro -> sending: HI");
saro_convo.send_content(b"HI").unwrap();
process(&mut clients, &mut wakeups, wait_time_ms);
process_until(
&mut clients,
&mut wakeups,
"raya receives HI",
|_| raya_rx.borrow().iter().any(|m| m == "HI"),
4000,
);
// Raya replies; Saro receives it (look for "Saro received: hi back").
let raya_convo = clients[RAYA]
.convo(&raya_convos[0])
.expect("Raya must have a usable conversation handle");
// Raya replies; settle until Saro receives it.
info!(target: "chat", "Raya -> sending: hi back");
raya_convo.send_content(b"hi back").unwrap();
process(&mut clients, &mut wakeups, wait_time_ms);
process_until(
&mut clients,
&mut wakeups,
"saro receives hi back",
|_| saro_rx.borrow().iter().any(|m| m == "hi back"),
4000,
);
// Raya (a non-creator) invites Pax; settle until Pax has joined.
if RAYA_INVITE {
&raya_convo
} else {
@ -378,17 +439,236 @@ fn core_client() {
}
.add_member(&[&clients[PAX].account_id()])
.unwrap();
process_until(
&mut clients,
&mut wakeups,
"pax joins",
|c| joined(&c[PAX]),
8000,
);
process(&mut clients, &mut wakeups, wait_time_ms);
process(&mut clients, &mut wakeups, wait_time_ms);
process(&mut clients, &mut wakeups, wait_time_ms);
let pax_convos = clients[PAX].list_conversations().unwrap();
let pax_convo = clients[PAX]
.convo(&pax_convos[0])
.expect("PAX must have a usable conversation handle");
info!(target: "chat", "Pax -> sending: hi back");
raya_convo.send_content(b"hi yall").unwrap();
pax_convo.send_content(b"Hey I'm PAX").unwrap();
process(&mut clients, &mut wakeups, wait_time_ms);
// Everyone must be at the SAME epoch after Pax joined: a marker Saro sends
// now decrypts only for members that applied the Add commit.
info!(target: "chat", "Saro -> sending: EPOCHCHK");
saro_convo.send_content(b"EPOCHCHK").unwrap();
process_until(
&mut clients,
&mut wakeups,
"raya+pax receive EPOCHCHK",
|_| {
raya_rx.borrow().iter().any(|m| m == "EPOCHCHK")
&& pax_rx.borrow().iter().any(|m| m == "EPOCHCHK")
},
4000,
);
}
#[test]
fn core_client_batch_add() {
// Saro creates the group and adds BOTH Raya and Pax at the same time: one
// Add commit producing a single welcome that names both joiners.
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_test_writer()
.try_init();
let swp = WakeupProvider::new();
let rwp = WakeupProvider::new();
let pwp = WakeupProvider::new();
let ds = LocalBroadcaster::new();
let rs = EphemeralRegistry::new();
let saro = CoreClient::new(
TestLogosAccount::new("saro"),
ds.clone(),
rs.clone(),
swp.create_wakeup_service(),
MemStore::new(),
)
.unwrap();
swp.fill_slot(&saro);
let raya = CoreClient::new(
TestLogosAccount::new("raya"),
ds.clone(),
rs.clone(),
rwp.create_wakeup_service(),
MemStore::new(),
)
.unwrap();
rwp.fill_slot(&raya);
let pax = CoreClient::new(
TestLogosAccount::new("pax"),
ds.clone(),
rs.clone(),
pwp.create_wakeup_service(),
MemStore::new(),
)
.unwrap();
pwp.fill_slot(&pax);
// This test asserts only on joins, not message receipt — discard the sinks.
let mut clients = vec![
PollableClient::init(
saro,
Some(pretty_print(
" Saro ",
Rc::new(RefCell::new(vec![])),
)),
),
PollableClient::init(
raya,
Some(pretty_print(
" Raya ",
Rc::new(RefCell::new(vec![])),
)),
),
PollableClient::init(
pax,
Some(pretty_print(
" Pax ",
Rc::new(RefCell::new(vec![])),
)),
),
];
let mut wakeups = vec![swp, rwp];
const SARO: usize = 0;
const RAYA: usize = 1;
const PAX: usize = 2;
clients[SARO]
.create_group_convo(&[&clients[RAYA].account_id(), &clients[PAX].account_id()])
.unwrap();
// One welcome names both joiners; settle until both have joined.
process_until(
&mut clients,
&mut wakeups,
"raya+pax join via batch welcome",
|c| joined(&c[RAYA]) && joined(&c[PAX]),
6000,
);
}
#[test]
fn core_client_four_members_two_epochs() {
// Epoch 1: Saro creates and batch-adds Raya + Pax (3 members). Epoch 2: Raya
// (a non-creator) adds a 4th member, Mira. Afterwards every member must be
// at the same epoch (each can decrypt a freshly-sent message) and settled
// back in Working (the >sn_max election that the 4th member triggers must
// have completed — no one stuck in Freezing/Selection/Reelection).
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_test_writer()
.try_init();
let swp = WakeupProvider::new();
let rwp = WakeupProvider::new();
let pwp = WakeupProvider::new();
let mwp = WakeupProvider::new();
let ds = LocalBroadcaster::new();
let rs = EphemeralRegistry::new();
let saro_rx = Rc::new(RefCell::new(Vec::<String>::new()));
let raya_rx = Rc::new(RefCell::new(Vec::<String>::new()));
let pax_rx = Rc::new(RefCell::new(Vec::<String>::new()));
let mira_rx = Rc::new(RefCell::new(Vec::<String>::new()));
let saro = CoreClient::new(
TestLogosAccount::new("saro"),
ds.clone(),
rs.clone(),
swp.create_wakeup_service(),
MemStore::new(),
)
.unwrap();
swp.fill_slot(&saro);
let raya = CoreClient::new(
TestLogosAccount::new("raya"),
ds.clone(),
rs.clone(),
rwp.create_wakeup_service(),
MemStore::new(),
)
.unwrap();
rwp.fill_slot(&raya);
let pax = CoreClient::new(
TestLogosAccount::new("pax"),
ds.clone(),
rs.clone(),
pwp.create_wakeup_service(),
MemStore::new(),
)
.unwrap();
pwp.fill_slot(&pax);
let mira = CoreClient::new(
TestLogosAccount::new("mira"),
ds.clone(),
rs.clone(),
mwp.create_wakeup_service(),
MemStore::new(),
)
.unwrap();
mwp.fill_slot(&mira);
let mut clients = vec![
PollableClient::init(saro, Some(pretty_print(" Saro ", saro_rx.clone()))),
PollableClient::init(raya, Some(pretty_print(" Raya ", raya_rx.clone()))),
PollableClient::init(pax, Some(pretty_print(" Pax ", pax_rx.clone()))),
PollableClient::init(
mira,
Some(pretty_print(" Mira ", mira_rx.clone())),
),
];
let mut wakeups = vec![swp, rwp, pwp, mwp];
const SARO: usize = 0;
const RAYA: usize = 1;
const PAX: usize = 2;
const MIRA: usize = 3;
// Epoch 1: batch-add Raya and Pax; settle until both have joined.
let saro_convo = clients[SARO]
.create_group_convo(&[&clients[RAYA].account_id(), &clients[PAX].account_id()])
.unwrap();
process_until(
&mut clients,
&mut wakeups,
"raya+pax join",
|c| joined(&c[RAYA]) && joined(&c[PAX]),
6000,
);
let raya_convo = clients[RAYA]
.convo(&clients[RAYA].list_conversations().unwrap()[0])
.expect("Raya must have a usable conversation handle");
// Epoch 2: Raya adds the 4th member; settle until Mira has joined and the
// >sn_max election has returned everyone to Working.
raya_convo
.add_member(&[&clients[MIRA].account_id()])
.unwrap();
process_until(
&mut clients,
&mut wakeups,
"mira joins + all working",
|c| joined(&c[MIRA]) && [SARO, RAYA, PAX, MIRA].iter().all(|&i| is_working(&c[i])),
10000,
);
// Same epoch: a message Saro sends now must reach all three peers.
saro_convo.send_content(b"CONVERGED").unwrap();
process_until(
&mut clients,
&mut wakeups,
"everyone receives CONVERGED",
|_| {
[&raya_rx, &pax_rx, &mira_rx]
.iter()
.all(|rx| rx.borrow().iter().any(|m| m == "CONVERGED"))
},
4000,
);
}