mirror of
https://github.com/logos-messaging/libchat.git
synced 2026-06-28 03:59:27 +00:00
Refactor de-mls integration: enhance InboxV2 for GroupV2 handling, and improve BufferDs for welcome event processing.
This commit is contained in:
parent
9377021a9b
commit
0bfdf70a7c
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -1881,7 +1881,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "de-mls"
|
||||
version = "3.0.0"
|
||||
source = "git+https://github.com/vacp2p/de-mls?branch=main#deafb714d2bfc0c98af3f661161b92f5d621c306"
|
||||
source = "git+https://github.com/vacp2p/de-mls?branch=main#b59183e1d92fdd08b99bb5e3ba1389ca9b60d68a"
|
||||
dependencies = [
|
||||
"hashgraph-like-consensus",
|
||||
"indexmap 2.14.0",
|
||||
|
||||
@ -33,6 +33,7 @@ use libchat::WakeupService;
|
||||
use prost::Message;
|
||||
use rand::{self, Rng};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
use tracing::info;
|
||||
|
||||
use crate::AccountId;
|
||||
@ -42,6 +43,7 @@ use crate::{
|
||||
AddressedEncryptedPayload, ContentData, DeliveryService, RegistrationService,
|
||||
conversation::{BaseConvo, BaseGroupConvo, ChatError, Id},
|
||||
};
|
||||
use libchat::IdentityProvider;
|
||||
|
||||
/// This is a Test Wrapper of Demls MemberId Trait
|
||||
/// Libchat has its own trait that will need to be intergrated at somepoint.
|
||||
@ -73,29 +75,11 @@ impl MemberId for LocalDemlsMember {
|
||||
// All methods in Convo must call drain, to ensure that messages go out.
|
||||
pub struct BufferDs {
|
||||
queue: Vec<OutboundPacket>,
|
||||
welcomes: Vec<MemberWelcome>,
|
||||
}
|
||||
|
||||
impl BufferDs {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
queue: vec![],
|
||||
welcomes: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
/// Lift welcomes out of session events into the welcome queue.
|
||||
/// Other event variants are ignored — they're not "things to send."
|
||||
fn retrive_welcome_event(&mut self, events: &[SessionEvent]) {
|
||||
for evt in events {
|
||||
info!(event = format!("{:?}", evt), "Event Loop");
|
||||
|
||||
if let SessionEvent::WelcomeReady(w) = evt {
|
||||
self.welcomes.push(w.clone());
|
||||
}
|
||||
|
||||
dbg!(&self.welcomes);
|
||||
}
|
||||
Self { queue: vec![] }
|
||||
}
|
||||
|
||||
// Warn: Messages are not sent untill drain is called, which is after the return from User.
|
||||
@ -122,6 +106,7 @@ impl BufferDs {
|
||||
// All Payloads leaving GroupV2 are a GroupV2Frame
|
||||
let frame = GroupV2Frame {
|
||||
payload: Some(GroupV2Payload::DeMlsWrapper(pkt.payload.into())),
|
||||
sender_app_id: pkt.app_id.clone(), // pkt.app_id is the sender's User app_id
|
||||
};
|
||||
|
||||
// Wrap in EncryptedPayload
|
||||
@ -135,26 +120,11 @@ impl BufferDs {
|
||||
},
|
||||
};
|
||||
|
||||
// TODO(libchat: "Verify payloads routing"):
|
||||
// GroupV2Frame currently drops sender app_id. de-mls's
|
||||
// process_inbound_packet uses app_id to filter self-messages — without
|
||||
// round-tripping the sender's id through the frame, every inbound packet
|
||||
// looks like a self-echo and gets dropped.
|
||||
|
||||
let env = payload.into_envelope(pkt.conversation_id.clone());
|
||||
|
||||
service_ctx.ds.publish(env).map_err(ChatError::generic)?;
|
||||
}
|
||||
|
||||
// TODO: build proper convertion ao welcome bundle
|
||||
// for w in self.welcomes.drain(..) {
|
||||
// let envelope = build_inbox_welcome_envelope(w);
|
||||
// service_ctx
|
||||
// .ds
|
||||
// .publish(envelope)
|
||||
// .map_err(ChatError::generic)?;
|
||||
// }
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -180,6 +150,11 @@ pub struct GroupV2Convo {
|
||||
// Use a wrapper for now, and then look at refactoring.
|
||||
buffer_ds: Arc<Mutex<BufferDs>>,
|
||||
app_id: String,
|
||||
/// Inviter side: accounts we called `add_member` for, awaiting their
|
||||
/// `WelcomeReady`. Each `WelcomeReady` is routed to one popped entry — see
|
||||
/// the correlation caveat in `after_op` (only safe for one outstanding
|
||||
/// invite today).
|
||||
pending_invites: Vec<AccountId>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for GroupV2Convo {
|
||||
@ -199,36 +174,60 @@ fn rand_string(n: usize) -> String {
|
||||
}
|
||||
|
||||
impl GroupV2Convo {
|
||||
pub fn new<S: ExternalServices>(
|
||||
service_ctx: &mut ServiceContext<S>,
|
||||
) -> Result<Self, ChatError> {
|
||||
// Create new instances of all the dependencies that User needs.
|
||||
// Once working, these can be moved to be shared across different convo instances.
|
||||
let convo_id = rand_string(5);
|
||||
let signer = PrivateKeySigner::random();
|
||||
// let identity = WalletIdentity::from_wallet(signer.address());
|
||||
let identity = LocalDemlsMember::new(signer.address().to_string());
|
||||
|
||||
/// Build a de-mls `User` (plugins + BufferDs transport) without starting
|
||||
/// any conversation. Shared by `new` (creator) and `new_pending` (joiner).
|
||||
fn build_demls(
|
||||
identity_name: String,
|
||||
) -> Result<
|
||||
(
|
||||
User<DefaultConsensusPlugin, DefaultConversationPluginsFactory>,
|
||||
Arc<Mutex<BufferDs>>,
|
||||
String,
|
||||
),
|
||||
ChatError,
|
||||
> {
|
||||
let identity = LocalDemlsMember::new(identity_name);
|
||||
let credentials =
|
||||
Arc::new(MlsCredentials::from_member_id(&identity).map_err(ChatError::generic)?);
|
||||
let storage = Arc::new(MemoryDeMlsStorage::new());
|
||||
let conversation_plugins = DefaultConversationPluginsFactory::new(storage, credentials);
|
||||
|
||||
let consensus_signer = EthereumConsensusSigner::new(signer);
|
||||
let consensus_signer = EthereumConsensusSigner::new(PrivateKeySigner::random());
|
||||
let consensus = ConsensusContext::<DefaultConsensusPlugin>::new(consensus_signer);
|
||||
|
||||
// TODO(config): TEST-ONLY millisecond timers. de-mls deadlines are real
|
||||
// wall-clock, so the default 60s timers never fire under fast virtual
|
||||
// time. Production needs a real config injected from the caller, not
|
||||
// these hardcoded values.
|
||||
let conversation_config = ConversationConfig {
|
||||
commit_inactivity_duration: Duration::from_millis(50),
|
||||
freeze_duration: Duration::from_millis(20),
|
||||
voting_delay: Duration::from_millis(30),
|
||||
election_voting_delay: Duration::from_millis(30),
|
||||
consensus_timeout: Duration::from_millis(150),
|
||||
proposal_expiration: Duration::from_millis(2000),
|
||||
..ConversationConfig::default()
|
||||
};
|
||||
|
||||
let plugins = UserPlugins {
|
||||
conversation_plugins,
|
||||
consensus,
|
||||
default_conversation_config: ConversationConfig::default(),
|
||||
default_conversation_config: conversation_config,
|
||||
default_scoring_config: ScoringConfig::default(),
|
||||
default_steward_list_config: StewardListConfig::default(),
|
||||
};
|
||||
|
||||
let ds = BufferDs::new();
|
||||
let transport = Arc::new(Mutex::new(ds));
|
||||
let transport = Arc::new(Mutex::new(BufferDs::new()));
|
||||
let user = User::new_with_plugins(Box::new(identity), plugins, transport.clone());
|
||||
Ok((user, transport, rand_string(5)))
|
||||
}
|
||||
|
||||
let mut user = User::new_with_plugins(Box::new(identity), plugins, transport.clone());
|
||||
pub fn new<S: ExternalServices>(
|
||||
service_ctx: &mut ServiceContext<S>,
|
||||
) -> Result<Self, ChatError> {
|
||||
let convo_id = rand_string(5);
|
||||
let identity_name = service_ctx.identity_provider.friendly_name();
|
||||
let (mut user, transport, app_id) = Self::build_demls(identity_name)?;
|
||||
|
||||
run_async!(
|
||||
user.start_conversation(convo_id.as_str(), true)
|
||||
@ -243,10 +242,66 @@ impl GroupV2Convo {
|
||||
convo_id,
|
||||
user,
|
||||
buffer_ds: transport,
|
||||
app_id: rand_string(5),
|
||||
app_id,
|
||||
pending_invites: vec![],
|
||||
})
|
||||
}
|
||||
|
||||
/// Joiner side: build a de-mls `User` and register its key package under
|
||||
/// the account name, but do NOT start a conversation. `convo_id` stays
|
||||
/// empty until [`Self::accept_welcome`] fills it.
|
||||
pub fn new_pending<S: ExternalServices>(
|
||||
service_ctx: &mut ServiceContext<S>,
|
||||
) -> Result<Self, ChatError> {
|
||||
let name = service_ctx.identity_provider.friendly_name();
|
||||
let (user, transport, app_id) = Self::build_demls(name.clone())?;
|
||||
|
||||
let kp = user.generate_key_package().map_err(ChatError::generic)?;
|
||||
service_ctx
|
||||
.rs
|
||||
.register(&name, kp.as_bytes().to_vec())
|
||||
.map_err(ChatError::generic)?;
|
||||
|
||||
Ok(Self {
|
||||
convo_id: String::new(),
|
||||
user,
|
||||
buffer_ds: transport,
|
||||
app_id,
|
||||
pending_invites: vec![],
|
||||
})
|
||||
}
|
||||
|
||||
/// Joiner side: ingest a de-mls welcome handed over the InboxV2 1-1
|
||||
/// channel. Attaches MLS (filling `convo_id`), replays the bundled
|
||||
/// `ConversationSync`, then subscribes to the conversation address.
|
||||
pub fn accept_welcome<S: ExternalServices>(
|
||||
&mut self,
|
||||
service_ctx: &mut ServiceContext<S>,
|
||||
welcome: &MemberWelcome,
|
||||
) -> Result<(), ChatError> {
|
||||
let (convo_id, tick) = run_async!(self.user.accept_welcome(&welcome.welcome_bytes).await)
|
||||
.map_err(ChatError::generic)?;
|
||||
self.convo_id = convo_id;
|
||||
|
||||
if !welcome.conversation_sync_bytes.is_empty() {
|
||||
let pkt = InboundPacket::new(
|
||||
welcome.conversation_sync_bytes.clone(),
|
||||
APP_MSG_SUBTOPIC,
|
||||
&self.convo_id,
|
||||
self.user.app_id().to_vec(),
|
||||
0,
|
||||
);
|
||||
run_async!(self.user.process_inbound_packet(pkt).await).map_err(ChatError::generic)?;
|
||||
}
|
||||
|
||||
let events = self
|
||||
.user
|
||||
.drain_events(&self.convo_id)
|
||||
.map_err(ChatError::generic)?;
|
||||
self.init(service_ctx)?;
|
||||
self.after_op(service_ctx, tick, &events)
|
||||
}
|
||||
|
||||
fn delivery_address_from_id(convo_id: &str) -> String {
|
||||
let hash = Blake2b::<U6>::new()
|
||||
.chain_update("delivery_addr|")
|
||||
@ -330,13 +385,18 @@ where
|
||||
return Err(ChatError::generic("Expected plaintext"));
|
||||
}
|
||||
};
|
||||
let frame = GroupV2Frame::decode(bytes.as_ref()).map_err(ChatError::generic)?;
|
||||
let inner = match frame.payload {
|
||||
Some(GroupV2Payload::DeMlsWrapper(b)) => b.to_vec(),
|
||||
_ => return Ok(None),
|
||||
};
|
||||
|
||||
// Fake a InboundPacket
|
||||
let packet = InboundPacket {
|
||||
payload: bytes.to_vec(),
|
||||
payload: inner,
|
||||
subtopic: APP_MSG_SUBTOPIC.to_string(), // Assume APP TOPIC, Welcome Messages go to InboxV2
|
||||
conversation_id: self.convo_id.to_string(),
|
||||
app_id: self.app_id().as_bytes().to_vec(),
|
||||
app_id: frame.sender_app_id,
|
||||
timestamp: 0,
|
||||
};
|
||||
|
||||
@ -381,9 +441,9 @@ where
|
||||
.ok_or_else(|| ChatError::generic("No key package"))?;
|
||||
last_tick = run_async!(self.user.add_member(&self.convo_id, &kp_bytes).await)
|
||||
.map_err(ChatError::generic)?;
|
||||
// TODO(libchat: "Parse welcomes and create GroupV2"):
|
||||
// remember `member` so we can route the eventual WelcomeReady
|
||||
// event to its delivery_address. Needs API decision with libchat.
|
||||
// Remember who we invited so after_op can route their
|
||||
// WelcomeReady to their InboxV2 channel (FIFO).
|
||||
self.pending_invites.push((*member).clone());
|
||||
}
|
||||
let events = self
|
||||
.user
|
||||
@ -395,16 +455,30 @@ where
|
||||
|
||||
impl GroupV2Convo {
|
||||
fn after_op<S: ExternalServices>(
|
||||
&self,
|
||||
&mut self,
|
||||
service_ctx: &mut ServiceContext<S>,
|
||||
tick: SessionTick,
|
||||
events: &[SessionEvent],
|
||||
) -> Result<(), ChatError> {
|
||||
let mut buf = self.buffer_ds.lock().unwrap();
|
||||
buf.retrive_welcome_event(events);
|
||||
buf.drain(service_ctx)?;
|
||||
drop(buf);
|
||||
// Route any welcome our commit produced to the matching invitee over
|
||||
// their InboxV2 1-1 channel.
|
||||
//
|
||||
// TODO(chat): welcome→invitee routing is positional, so only safe for
|
||||
// one outstanding invite. `MemberWelcome` doesn't identify its joiner;
|
||||
// batch invites need a real welcome→account match.
|
||||
for evt in events {
|
||||
if let SessionEvent::WelcomeReady(welcome) = evt
|
||||
&& let Some(account) = self.pending_invites.pop()
|
||||
{
|
||||
crate::inbox_v2::invite_user_v2(&mut service_ctx.ds, &account, welcome)?;
|
||||
}
|
||||
}
|
||||
|
||||
self.buffer_ds.lock().unwrap().drain(service_ctx)?;
|
||||
if let Some(d) = tick.next_wakeup_in {
|
||||
// TODO(chat): WakeupService is second-granularity but de-mls
|
||||
// deadlines are sub-second; `as_secs().max(1)` floors them up to 1s,
|
||||
// silently over-waiting. Needs a millisecond-capable wakeup.
|
||||
service_ctx
|
||||
.wakeup_service
|
||||
.wakeup_in(d.as_secs().max(1) as u32, &self.convo_id);
|
||||
@ -440,8 +514,10 @@ use prost::{Oneof, bytes::Bytes};
|
||||
|
||||
#[derive(Clone, PartialEq, Message)]
|
||||
pub struct GroupV2Frame {
|
||||
#[prost(oneof = "GroupV2Payload", tags = "1")]
|
||||
#[prost(oneof = "GroupV2Payload", tags = "2, 3")]
|
||||
pub payload: Option<GroupV2Payload>,
|
||||
#[prost(bytes = "vec", tag = "4")]
|
||||
pub sender_app_id: Vec<u8>,
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Oneof)]
|
||||
|
||||
@ -284,7 +284,6 @@ where
|
||||
// Dispatch encrypted payload to Inbox, and register the created Conversation
|
||||
fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result<Option<ContentData>, ChatError> {
|
||||
if let Some(convo) = self.pq_inbox.handle_frame(&mut self.service_ctx, payload)? {
|
||||
let convo: Box<dyn BaseGroupConvo<S>> = Box::new(convo);
|
||||
self.register_convo(ConvoTypeOwned::Group(convo))?;
|
||||
}
|
||||
Ok(None)
|
||||
|
||||
@ -3,6 +3,7 @@ use std::ops::Deref;
|
||||
use std::rc::Rc;
|
||||
|
||||
use chat_proto::logoschat::envelope::EnvelopeV1;
|
||||
use de_mls::protos::de_mls::messages::v1::MemberWelcome;
|
||||
use openmls::prelude::tls_codec::Serialize;
|
||||
use openmls::prelude::*;
|
||||
use openmls_libcrux_crypto::CryptoProvider as LibcruxCryptoProvider;
|
||||
@ -20,6 +21,7 @@ use crate::DeliveryService;
|
||||
use crate::IdentityProvider;
|
||||
use crate::RegistrationService;
|
||||
use crate::conversation::BaseConvo;
|
||||
use crate::conversation::BaseGroupConvo;
|
||||
use crate::conversation::ExternalServices;
|
||||
use crate::conversation::GroupV2Convo;
|
||||
use crate::conversation::ServiceContext;
|
||||
@ -94,6 +96,28 @@ pub trait MlsProvider: OpenMlsProvider {
|
||||
) -> Result<(), ChatError>;
|
||||
}
|
||||
|
||||
/// Deliver a de-mls welcome to `account_id` over its InboxV2 1-1 channel.
|
||||
/// Function mirroring the GroupV1 `invite_user` path, but carrying a de-mls `MemberWelcome`.
|
||||
pub fn invite_user_v2<DS: DeliveryService>(
|
||||
ds: &mut DS,
|
||||
account_id: &AccountId,
|
||||
welcome: &MemberWelcome,
|
||||
) -> Result<(), ChatError> {
|
||||
let frame = InboxV2Frame {
|
||||
payload: Some(InviteType::GroupV2(welcome.encode_to_vec())),
|
||||
};
|
||||
let envelope = EnvelopeV1 {
|
||||
conversation_hint: conversation_id_for(account_id),
|
||||
salt: 0,
|
||||
payload: frame.encode_to_vec().into(),
|
||||
};
|
||||
ds.publish(AddressedEnvelope {
|
||||
delivery_address: delivery_address_for(account_id),
|
||||
data: envelope.encode_to_vec(),
|
||||
})
|
||||
.map_err(ChatError::generic)
|
||||
}
|
||||
|
||||
/// This is a PQ based provider that uses in memory storage.
|
||||
pub struct MlsEphemeralPqProvider {
|
||||
crypto: LibcruxCryptoProvider,
|
||||
@ -165,6 +189,7 @@ pub struct InboxV2<CS> {
|
||||
account_id: AccountId,
|
||||
_store: Rc<RefCell<CS>>,
|
||||
mls_provider: Rc<RefCell<MlsEphemeralPqProvider>>,
|
||||
pending_demls: RefCell<Option<GroupV2Convo>>,
|
||||
}
|
||||
|
||||
impl<CS: ChatStore> InboxV2<CS> {
|
||||
@ -179,6 +204,7 @@ impl<CS: ChatStore> InboxV2<CS> {
|
||||
account_id,
|
||||
_store,
|
||||
mls_provider: Rc::new(RefCell::new(provider)),
|
||||
pending_demls: RefCell::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
@ -212,7 +238,15 @@ impl<CS: ChatStore> InboxV2<CS> {
|
||||
&service_ctx.identity_provider.friendly_name(),
|
||||
keypackage_bytes,
|
||||
)
|
||||
.map_err(ChatError::generic)
|
||||
.map_err(ChatError::generic)?;
|
||||
|
||||
// de-mls (GroupV2) joiner: build a conversation-less User and register
|
||||
// its de-mls key package under the same account name. This shadows the
|
||||
// OpenMLS key package above in the registry; GroupV2 is the path the
|
||||
// de-mls integration exercises.
|
||||
*self.pending_demls.borrow_mut() = Some(GroupV2Convo::new_pending(service_ctx)?);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
@ -261,17 +295,33 @@ impl<CS: ChatStore> InboxV2<CS> {
|
||||
&self,
|
||||
service_ctx: &mut ServiceContext<S>,
|
||||
payload_bytes: &[u8],
|
||||
) -> Result<Option<GroupV1Convo<MlsEphemeralPqProvider>>, ChatError> {
|
||||
let inbox_frame = InboxV2Frame::decode(payload_bytes)?;
|
||||
|
||||
) -> Result<Option<Box<dyn BaseGroupConvo<S>>>, ChatError> {
|
||||
// On a broadcast transport the inbox address also receives traffic
|
||||
// that isn't an invite (or that prost decodes into an empty frame).
|
||||
// Treat anything we can't interpret as "not for us" and skip it,
|
||||
// rather than failing the whole poll cycle.
|
||||
let Ok(inbox_frame) = InboxV2Frame::decode(payload_bytes) else {
|
||||
return Ok(None);
|
||||
};
|
||||
let Some(payload) = inbox_frame.payload else {
|
||||
return Err(ChatError::Generic("InboxV2Payload missing".into()));
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
match payload {
|
||||
InviteType::GroupV1(group_v1_heavy_invite) => self
|
||||
.handle_heavy_invite(service_ctx, group_v1_heavy_invite)
|
||||
.map(Some),
|
||||
InviteType::GroupV1(inv) => {
|
||||
Ok(Some(Box::new(self.handle_heavy_invite(service_ctx, inv)?)))
|
||||
}
|
||||
InviteType::GroupV2(welcome_bytes) => {
|
||||
let mut convo = self
|
||||
.pending_demls
|
||||
.borrow_mut()
|
||||
.take()
|
||||
.ok_or_else(|| ChatError::generic("no pending de-mls convo"))?;
|
||||
let mw =
|
||||
MemberWelcome::decode(welcome_bytes.as_slice()).map_err(ChatError::generic)?;
|
||||
convo.accept_welcome(service_ctx, &mw)?;
|
||||
Ok(Some(Box::new(convo)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -307,7 +357,7 @@ impl<CS: ChatStore> InboxV2<CS> {
|
||||
|
||||
#[derive(Clone, PartialEq, Message)]
|
||||
pub struct InboxV2Frame {
|
||||
#[prost(oneof = "InviteType", tags = "1")]
|
||||
#[prost(oneof = "InviteType", tags = "1, 2")]
|
||||
pub payload: Option<InviteType>,
|
||||
}
|
||||
|
||||
@ -315,6 +365,8 @@ pub struct InboxV2Frame {
|
||||
pub enum InviteType {
|
||||
#[prost(message, tag = "1")]
|
||||
GroupV1(GroupV1HeavyInvite),
|
||||
#[prost(bytes, tag = "2")]
|
||||
GroupV2(Vec<u8>),
|
||||
}
|
||||
|
||||
#[derive(Clone, PartialEq, Message)]
|
||||
|
||||
@ -125,6 +125,10 @@ fn process(clients: &mut Vec<PollableClient>, wakeups: &mut Vec<WakeupProvider>,
|
||||
for client in clients.as_mut_slice() {
|
||||
client.process_messages();
|
||||
}
|
||||
|
||||
// de-mls deadlines are real wall-clock; sleep so the millisecond-scale
|
||||
// commit/consensus timers actually elapse between poll cycles.
|
||||
std::thread::sleep(std::time::Duration::from_millis(60));
|
||||
}
|
||||
}
|
||||
|
||||
@ -203,8 +207,10 @@ fn pretty_print(prefix: impl Into<String>) -> Box<dyn Fn(ContentData)> {
|
||||
let prefix = prefix.into();
|
||||
Box::new(move |c: ContentData| {
|
||||
let cid = hex_trunc(c.conversation_id.as_bytes());
|
||||
let content = String::from_utf8(c.data).unwrap();
|
||||
println!("{} ({:?}) {}", prefix, cid, content)
|
||||
let content = String::from_utf8_lossy(&c.data);
|
||||
// Log via tracing (not println!) so received messages appear inline in
|
||||
// the same INFO stream as the de-mls events, without needing --nocapture.
|
||||
info!(target: "chat", convo = ?cid, "{prefix} received: {content}");
|
||||
})
|
||||
}
|
||||
|
||||
@ -369,22 +375,34 @@ fn core_client() {
|
||||
.create_group_convo(&[&clients[RAYA].account_id()])
|
||||
.unwrap();
|
||||
|
||||
// Manaully process the DS
|
||||
process_all(&mut clients, &mut wakeups);
|
||||
process_all(&mut clients, &mut wakeups);
|
||||
// s_convo.send_content(b"HI").unwrap();
|
||||
// Bounded driver: de-mls reschedules its steward poll every tick, so a
|
||||
// drain-until-empty loop (`process_all`) never terminates. Step a fixed
|
||||
// number of seconds instead, like the de-mls integration tests do.
|
||||
//
|
||||
// This carries the commit through, fires `WelcomeReady`, routes the
|
||||
// welcome to Raya's InboxV2 1-1 channel, and lets her `accept_welcome`.
|
||||
// Run extra cycles afterward so Raya polls her inbox and joins after the
|
||||
// welcome is published.
|
||||
process(&mut clients, &mut wakeups, 80);
|
||||
|
||||
// Manaully process the DS
|
||||
process_all(&mut clients, &mut wakeups);
|
||||
// Raya joined via the invite path.
|
||||
let raya_convos = clients[RAYA].list_conversations().unwrap();
|
||||
assert!(
|
||||
!raya_convos.is_empty(),
|
||||
"Raya should have joined the conversation via the welcome invite"
|
||||
);
|
||||
|
||||
// // TODO: Needs Invite path working first
|
||||
// let convo_id = clients[RAYA].list_conversations().unwrap().pop().unwrap();
|
||||
// let r_convo = clients[RAYA].convo(&convo_id).expect("Convo exists");
|
||||
// process(&mut clients, &mut wakeups, 10);
|
||||
// r_convo.send_content(b"PEW").unwrap();
|
||||
// process(&mut clients, &mut wakeups, 10);
|
||||
// Saro sends a message; Raya receives it (look for "Raya received: HI"
|
||||
// in the log).
|
||||
info!(target: "chat", "Saro -> sending: HI");
|
||||
s_convo.send_content(b"HI").unwrap();
|
||||
process(&mut clients, &mut wakeups, 20);
|
||||
|
||||
// s_convo.send_content(b"SARO again").unwrap();
|
||||
// process(&mut clients, &mut wakeups, 10);
|
||||
println!("Hello");
|
||||
// Raya replies; Saro receives it (look for "Saro received: hi back").
|
||||
let raya_convo = clients[RAYA]
|
||||
.convo(&raya_convos[0])
|
||||
.expect("Raya must have a usable conversation handle");
|
||||
info!(target: "chat", "Raya -> sending: hi back");
|
||||
raya_convo.send_content(b"hi back").unwrap();
|
||||
process(&mut clients, &mut wakeups, 20);
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user