mirror of
https://github.com/logos-messaging/libchat.git
synced 2026-06-27 19:49:31 +00:00
ChatClient migration (#145)
* Simplify client * Fixups * Update Cli-Client to use builder * undeprecate legacy convos * Allow Storage config in builder * bug fixes * Clippy fix * fixes
This commit is contained in:
parent
e1921b944d
commit
a5abefa314
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -3605,6 +3605,7 @@ dependencies = [
|
||||
"logos-account",
|
||||
"parking_lot",
|
||||
"shared-traits",
|
||||
"storage",
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
"tracing",
|
||||
|
||||
@ -5,7 +5,7 @@ use std::path::{Path, PathBuf};
|
||||
use anyhow::Result;
|
||||
use arboard::Clipboard;
|
||||
use crossbeam_channel::Receiver;
|
||||
use logos_chat::{ChatClient, EphemeralRegistry, Event, RegistrationService, Transport};
|
||||
use logos_chat::{ChatClient, ChatStore, Event, IdentityProvider, RegistrationService, Transport};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::utils::now;
|
||||
@ -41,8 +41,14 @@ pub struct AppState {
|
||||
pub active_chat: Option<String>,
|
||||
}
|
||||
|
||||
pub struct ChatApp<T: Transport, R: RegistrationService = EphemeralRegistry> {
|
||||
pub client: ChatClient<T, R>,
|
||||
pub struct ChatApp<I, T, R, S>
|
||||
where
|
||||
I: IdentityProvider + Send + 'static,
|
||||
T: Transport,
|
||||
R: RegistrationService + Send + 'static,
|
||||
S: ChatStore + Send + 'static,
|
||||
{
|
||||
pub client: ChatClient<I, T, R, S>,
|
||||
events: Receiver<Event>,
|
||||
pub state: AppState,
|
||||
/// Ephemeral command output — not persisted, cleared on chat switch.
|
||||
@ -53,13 +59,15 @@ pub struct ChatApp<T: Transport, R: RegistrationService = EphemeralRegistry> {
|
||||
state_path: PathBuf,
|
||||
}
|
||||
|
||||
impl<T, R> ChatApp<T, R>
|
||||
impl<I, T, R, S> ChatApp<I, T, R, S>
|
||||
where
|
||||
I: IdentityProvider + Send,
|
||||
T: Transport,
|
||||
R: RegistrationService + Send + 'static,
|
||||
S: ChatStore + Send,
|
||||
{
|
||||
pub fn new(
|
||||
client: ChatClient<T, R>,
|
||||
client: ChatClient<I, T, R, S>,
|
||||
events: Receiver<Event>,
|
||||
user_name: &str,
|
||||
data_dir: &Path,
|
||||
|
||||
@ -8,7 +8,10 @@ use std::path::{Path, PathBuf};
|
||||
use anyhow::{Context, Result};
|
||||
use clap::{Parser, ValueEnum};
|
||||
use crossbeam_channel::Receiver;
|
||||
use logos_chat::{ChatClient, Event, HttpRegistry, RegistrationService, StorageConfig, Transport};
|
||||
use logos_chat::{
|
||||
ChatClient, ChatClientBuilder, ChatStore, Event, HttpRegistry, IdentityProvider,
|
||||
RegistrationService, StorageConfig, Transport,
|
||||
};
|
||||
|
||||
use app::ChatApp;
|
||||
|
||||
@ -113,14 +116,20 @@ fn run<T: Transport>(transport: T, cli: &Cli) -> Result<()> {
|
||||
match cli.registry_url.as_deref() {
|
||||
Some(url) => {
|
||||
let registry = HttpRegistry::new(url);
|
||||
let (client, events) =
|
||||
ChatClient::open_with_registry(cli.name.clone(), storage, transport, registry)
|
||||
.map_err(|e| anyhow::anyhow!("{e:?}"))
|
||||
.context("failed to open chat client with HTTP registry")?;
|
||||
let (client, events) = ChatClientBuilder::new()
|
||||
.transport(transport)
|
||||
.storage_config(storage)
|
||||
.registration(registry)
|
||||
.build()
|
||||
.map_err(|e| anyhow::anyhow!("{e:?}"))
|
||||
.context("failed to open chat client with HTTP registry")?;
|
||||
launch_tui(client, events, cli)
|
||||
}
|
||||
None => {
|
||||
let (client, events) = ChatClient::open(cli.name.clone(), storage, transport)
|
||||
let (client, events) = ChatClientBuilder::new()
|
||||
.transport(transport)
|
||||
.storage_config(storage)
|
||||
.build()
|
||||
.map_err(|e| anyhow::anyhow!("{e:?}"))
|
||||
.context("failed to open chat client")?;
|
||||
launch_tui(client, events, cli)
|
||||
@ -128,10 +137,16 @@ fn run<T: Transport>(transport: T, cli: &Cli) -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
fn launch_tui<T, R>(client: ChatClient<T, R>, events: Receiver<Event>, cli: &Cli) -> Result<()>
|
||||
fn launch_tui<I, T, R, S>(
|
||||
client: ChatClient<I, T, R, S>,
|
||||
events: Receiver<Event>,
|
||||
cli: &Cli,
|
||||
) -> Result<()>
|
||||
where
|
||||
I: IdentityProvider + Send,
|
||||
T: Transport,
|
||||
R: RegistrationService + Send + 'static,
|
||||
S: ChatStore + Send,
|
||||
{
|
||||
let mut app = ChatApp::new(client, events, &cli.name, &cli.data)?;
|
||||
|
||||
@ -176,18 +191,22 @@ fn run_logos_delivery(cli: Cli) -> Result<()> {
|
||||
.to_str()
|
||||
.context("db path contains non-UTF-8 characters")?
|
||||
.to_string();
|
||||
logos_chat::ChatClient::open(
|
||||
cli.name.clone(),
|
||||
logos_chat::StorageConfig::Encrypted {
|
||||
|
||||
logos_chat::ChatClientBuilder::new()
|
||||
.storage_config(logos_chat::StorageConfig::Encrypted {
|
||||
path: db_str,
|
||||
key: "chat-cli".to_string(),
|
||||
},
|
||||
delivery,
|
||||
)
|
||||
.map_err(|e| anyhow::anyhow!("{e:?}"))
|
||||
.context("failed to open persistent client")?
|
||||
})
|
||||
.transport(delivery)
|
||||
.build()
|
||||
.map_err(|e| anyhow::anyhow!("{e:?}"))
|
||||
.context("failed to open persistent client")?
|
||||
}
|
||||
None => logos_chat::ChatClient::new(cli.name.clone(), delivery),
|
||||
None => logos_chat::ChatClientBuilder::new()
|
||||
.transport(delivery)
|
||||
.build()
|
||||
.map_err(|e| anyhow::anyhow!("{e:?}"))
|
||||
.context("failed to open chat client")?,
|
||||
};
|
||||
|
||||
let mut app = ChatApp::new(client, events, &cli.name, &data_dir)?;
|
||||
@ -209,10 +228,12 @@ fn run_logos_delivery(cli: Cli) -> Result<()> {
|
||||
)
|
||||
}
|
||||
|
||||
fn run_app<T, R>(terminal: &mut ui::Tui, app: &mut ChatApp<T, R>) -> Result<()>
|
||||
fn run_app<I, T, R, S>(terminal: &mut ui::Tui, app: &mut ChatApp<I, T, R, S>) -> Result<()>
|
||||
where
|
||||
I: IdentityProvider + Send,
|
||||
T: Transport,
|
||||
R: RegistrationService + Send + 'static,
|
||||
S: ChatStore + Send,
|
||||
{
|
||||
loop {
|
||||
app.process_incoming()?;
|
||||
|
||||
@ -16,7 +16,7 @@ use ratatui::{
|
||||
widgets::{Block, Borders, List, ListItem, Paragraph, Wrap},
|
||||
};
|
||||
|
||||
use logos_chat::{RegistrationService, Transport};
|
||||
use logos_chat::{ChatStore, IdentityProvider, RegistrationService, Transport};
|
||||
|
||||
use crate::app::ChatApp;
|
||||
|
||||
@ -38,10 +38,13 @@ pub fn restore() -> io::Result<()> {
|
||||
}
|
||||
|
||||
/// Draw the UI.
|
||||
pub fn draw<D: Transport, R: RegistrationService + Send + 'static>(
|
||||
frame: &mut Frame,
|
||||
app: &ChatApp<D, R>,
|
||||
) {
|
||||
pub fn draw<I, D, R, S>(frame: &mut Frame, app: &ChatApp<I, D, R, S>)
|
||||
where
|
||||
I: IdentityProvider + Send + 'static,
|
||||
D: Transport + Send + 'static,
|
||||
R: RegistrationService + Send + 'static,
|
||||
S: ChatStore + Send + 'static,
|
||||
{
|
||||
let chunks = Layout::default()
|
||||
.direction(Direction::Vertical)
|
||||
.constraints([
|
||||
@ -58,11 +61,13 @@ pub fn draw<D: Transport, R: RegistrationService + Send + 'static>(
|
||||
draw_status(frame, app, chunks[3]);
|
||||
}
|
||||
|
||||
fn draw_header<D: Transport, R: RegistrationService + Send + 'static>(
|
||||
frame: &mut Frame,
|
||||
app: &ChatApp<D, R>,
|
||||
area: Rect,
|
||||
) {
|
||||
fn draw_header<I, D, R, S>(frame: &mut Frame, app: &ChatApp<I, D, R, S>, area: Rect)
|
||||
where
|
||||
I: IdentityProvider + Send + 'static,
|
||||
D: Transport + Send + 'static,
|
||||
R: RegistrationService + Send + 'static,
|
||||
S: ChatStore + Send + 'static,
|
||||
{
|
||||
let title = match app.current_session() {
|
||||
Some(session) => {
|
||||
let id = &session.chat_id[..8.min(session.chat_id.len())];
|
||||
@ -85,11 +90,13 @@ fn draw_header<D: Transport, R: RegistrationService + Send + 'static>(
|
||||
frame.render_widget(header, area);
|
||||
}
|
||||
|
||||
fn draw_messages<D: Transport, R: RegistrationService + Send + 'static>(
|
||||
frame: &mut Frame,
|
||||
app: &ChatApp<D, R>,
|
||||
area: Rect,
|
||||
) {
|
||||
fn draw_messages<I, D, R, S>(frame: &mut Frame, app: &ChatApp<I, D, R, S>, area: Rect)
|
||||
where
|
||||
I: IdentityProvider + Send + 'static,
|
||||
D: Transport + Send + 'static,
|
||||
R: RegistrationService + Send + 'static,
|
||||
S: ChatStore + Send + 'static,
|
||||
{
|
||||
let remote_name = app
|
||||
.current_session()
|
||||
.map(|s| s.display_name())
|
||||
@ -175,11 +182,13 @@ fn draw_messages<D: Transport, R: RegistrationService + Send + 'static>(
|
||||
frame.render_stateful_widget(messages_widget, area, &mut list_state);
|
||||
}
|
||||
|
||||
fn draw_input<D: Transport, R: RegistrationService + Send + 'static>(
|
||||
frame: &mut Frame,
|
||||
app: &ChatApp<D, R>,
|
||||
area: Rect,
|
||||
) {
|
||||
fn draw_input<I, D, R, S>(frame: &mut Frame, app: &ChatApp<I, D, R, S>, area: Rect)
|
||||
where
|
||||
I: IdentityProvider + Send + 'static,
|
||||
D: Transport + Send + 'static,
|
||||
R: RegistrationService + Send + 'static,
|
||||
S: ChatStore + Send + 'static,
|
||||
{
|
||||
// Inner width: area minus borders (2).
|
||||
let inner_width = area.width.saturating_sub(2) as usize;
|
||||
let input_len = app.input.len();
|
||||
@ -206,11 +215,13 @@ fn draw_input<D: Transport, R: RegistrationService + Send + 'static>(
|
||||
frame.set_cursor_position((cursor_x, area.y + 1));
|
||||
}
|
||||
|
||||
fn draw_status<D: Transport, R: RegistrationService + Send + 'static>(
|
||||
frame: &mut Frame,
|
||||
app: &ChatApp<D, R>,
|
||||
area: Rect,
|
||||
) {
|
||||
fn draw_status<I, D, R, S>(frame: &mut Frame, app: &ChatApp<I, D, R, S>, area: Rect)
|
||||
where
|
||||
I: IdentityProvider + Send + 'static,
|
||||
D: Transport + Send + 'static,
|
||||
R: RegistrationService + Send + 'static,
|
||||
S: ChatStore + Send + 'static,
|
||||
{
|
||||
let status = Paragraph::new(app.status.as_str())
|
||||
.style(Style::default().fg(Color::Gray))
|
||||
.block(Block::default().title(" Status ").borders(Borders::ALL))
|
||||
@ -220,9 +231,13 @@ fn draw_status<D: Transport, R: RegistrationService + Send + 'static>(
|
||||
}
|
||||
|
||||
/// Handle keyboard events.
|
||||
pub fn handle_events<D: Transport, R: RegistrationService + Send + 'static>(
|
||||
app: &mut ChatApp<D, R>,
|
||||
) -> io::Result<bool> {
|
||||
pub fn handle_events<I, D, R, S>(app: &mut ChatApp<I, D, R, S>) -> io::Result<bool>
|
||||
where
|
||||
I: IdentityProvider + Send + 'static,
|
||||
D: Transport + Send + 'static,
|
||||
R: RegistrationService + Send + 'static,
|
||||
S: ChatStore + Send + 'static,
|
||||
{
|
||||
// Poll for events with a short timeout to allow checking incoming messages
|
||||
if event::poll(std::time::Duration::from_millis(100))?
|
||||
&& let Event::Key(key) = event::read()?
|
||||
|
||||
@ -9,12 +9,15 @@ use openmls::prelude::tls_codec::Deserialize;
|
||||
use openmls::prelude::*;
|
||||
use prost::Message as _;
|
||||
use shared_traits::IdentIdRef;
|
||||
use std::collections::VecDeque;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::account_directory::{AccountDirectory, resolve_device_ids};
|
||||
use crate::conversation::ConversationIdRef;
|
||||
use crate::inbox_v2::MlsProvider;
|
||||
use crate::service_context::{ExternalServices, ServiceContext};
|
||||
|
||||
use crate::utils::{blake2b_hex, hash_size};
|
||||
use crate::{
|
||||
DeliveryService, IdentityProvider,
|
||||
conversation::{ChatError, Convo, GroupConvo, Identified},
|
||||
@ -23,9 +26,13 @@ use crate::{
|
||||
types::AddressedEncryptedPayload,
|
||||
};
|
||||
|
||||
const OUTBOUND_HASH_CACHE_SIZE: usize = 25;
|
||||
|
||||
pub struct GroupV1Convo {
|
||||
mls_group: MlsGroup,
|
||||
convo_id: String,
|
||||
// Cache outbound message Id's to filter out re-entrant messages
|
||||
outbound_msgs: VecDeque<String>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for GroupV1Convo {
|
||||
@ -54,6 +61,7 @@ impl GroupV1Convo {
|
||||
Ok(Self {
|
||||
mls_group,
|
||||
convo_id,
|
||||
outbound_msgs: VecDeque::new(),
|
||||
})
|
||||
}
|
||||
|
||||
@ -76,6 +84,7 @@ impl GroupV1Convo {
|
||||
Ok(Self {
|
||||
mls_group,
|
||||
convo_id,
|
||||
outbound_msgs: VecDeque::new(),
|
||||
})
|
||||
}
|
||||
|
||||
@ -93,6 +102,7 @@ impl GroupV1Convo {
|
||||
Ok(GroupV1Convo {
|
||||
mls_group,
|
||||
convo_id,
|
||||
outbound_msgs: VecDeque::new(),
|
||||
})
|
||||
}
|
||||
|
||||
@ -100,8 +110,6 @@ impl GroupV1Convo {
|
||||
fn subscribe(ds: &mut impl DeliveryService, convo_id: &str) -> Result<(), ChatError> {
|
||||
ds.subscribe(&Self::delivery_address_from_id(convo_id))
|
||||
.map_err(ChatError::generic)?;
|
||||
ds.subscribe(&Self::ctrl_delivery_address_from_id(convo_id))
|
||||
.map_err(ChatError::generic)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -129,18 +137,6 @@ impl GroupV1Convo {
|
||||
Self::delivery_address_from_id(&self.convo_id)
|
||||
}
|
||||
|
||||
fn ctrl_delivery_address_from_id(convo_id: &str) -> String {
|
||||
let hash = Blake2b::<U6>::new()
|
||||
.chain_update("ctrl_delivery_addr|")
|
||||
.chain_update(convo_id)
|
||||
.finalize();
|
||||
hex::encode(hash)
|
||||
}
|
||||
|
||||
fn ctrl_delivery_address(&self) -> String {
|
||||
Self::ctrl_delivery_address_from_id(&self.convo_id)
|
||||
}
|
||||
|
||||
/// Resolve an account to a KeyPackage for *every* device it authorizes.
|
||||
///
|
||||
/// First resolves the account to its device ids through the account
|
||||
@ -178,8 +174,8 @@ impl GroupV1Convo {
|
||||
fn send_message<S: ExternalServices>(
|
||||
&mut self,
|
||||
content: &[u8],
|
||||
cx: &ServiceContext<S>,
|
||||
) -> Result<Vec<AddressedEncryptedPayload>, ChatError> {
|
||||
cx: &mut ServiceContext<S>,
|
||||
) -> Result<(), ChatError> {
|
||||
let sender_id = cx.mls_identity.id().as_str();
|
||||
let reliable = cx.causal.on_send(&self.convo_id, sender_id, content);
|
||||
let wire = reliable.encode_to_vec();
|
||||
@ -189,16 +185,38 @@ impl GroupV1Convo {
|
||||
.create_message(&cx.mls_provider, &cx.mls_identity, &wire)
|
||||
.unwrap();
|
||||
|
||||
let a = AddressedEncryptedPayload {
|
||||
let msg_bytes = mls_message_out.to_bytes().unwrap();
|
||||
self.send_payload(cx, msg_bytes)
|
||||
}
|
||||
|
||||
// Publish outboubound payloads to the DeliveryService
|
||||
fn send_payload<S: ExternalServices>(
|
||||
&mut self,
|
||||
cx: &mut ServiceContext<S>,
|
||||
msg_bytes: Vec<u8>,
|
||||
) -> Result<(), ChatError> {
|
||||
// Hash and Cache to detect inbound messages
|
||||
let msg_hash = blake2b_hex::<hash_size::MessageId>(&[&msg_bytes]);
|
||||
self.outbound_msgs.push_back(msg_hash);
|
||||
if self.outbound_msgs.len() > OUTBOUND_HASH_CACHE_SIZE {
|
||||
let _ = self.outbound_msgs.remove(0);
|
||||
}
|
||||
|
||||
// Wrap in Payload frames
|
||||
let aep = AddressedEncryptedPayload {
|
||||
delivery_address: self.delivery_address(),
|
||||
data: EncryptedPayload {
|
||||
encryption: Some(encrypted_payload::Encryption::Plaintext(Plaintext {
|
||||
payload: mls_message_out.to_bytes().unwrap().into(),
|
||||
payload: msg_bytes.into(),
|
||||
})),
|
||||
},
|
||||
};
|
||||
let env = aep.into_envelope(self.convo_id.clone());
|
||||
|
||||
Ok(vec![a])
|
||||
// Send via DS
|
||||
cx.ds
|
||||
.publish(env)
|
||||
.map_err(|e| ChatError::Delivery(e.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
@ -214,13 +232,7 @@ impl<S: ExternalServices> Convo<S> for GroupV1Convo {
|
||||
cx: &mut ServiceContext<S>,
|
||||
content: &[u8],
|
||||
) -> Result<(), ChatError> {
|
||||
let payloads = self.send_message(content, cx)?;
|
||||
for payload in payloads {
|
||||
cx.ds
|
||||
.publish(payload.into_envelope(self.id().into()))
|
||||
.map_err(|e| ChatError::Delivery(e.to_string()))?;
|
||||
}
|
||||
Ok(())
|
||||
self.send_message(content, cx)
|
||||
}
|
||||
|
||||
fn handle_frame(
|
||||
@ -238,7 +250,14 @@ impl<S: ExternalServices> Convo<S> for GroupV1Convo {
|
||||
}
|
||||
};
|
||||
|
||||
let mls_message =
|
||||
// Bail early if we sent this message
|
||||
let msg_hash = blake2b_hex::<hash_size::MessageId>(&[bytes.as_ref()]);
|
||||
if self.outbound_msgs.contains(&msg_hash) {
|
||||
debug!("Dropping message, sent from self");
|
||||
return Ok(ConvoOutcome::empty(self.convo_id.to_string()));
|
||||
}
|
||||
|
||||
let mls_message: MlsMessageIn =
|
||||
MlsMessageIn::tls_deserialize_exact_bytes(&bytes).map_err(ChatError::generic)?;
|
||||
|
||||
let protocol_message: ProtocolMessage = mls_message
|
||||
@ -332,22 +351,6 @@ impl<S: ExternalServices> GroupConvo<S> for GroupV1Convo {
|
||||
.invite_user(&mut cx.ds, account_id, &welcome)?;
|
||||
}
|
||||
|
||||
let encrypted_payload = EncryptedPayload {
|
||||
encryption: Some(encrypted_payload::Encryption::Plaintext(Plaintext {
|
||||
payload: commit.to_bytes()?.into(),
|
||||
})),
|
||||
};
|
||||
|
||||
let addr_enc_payload = AddressedEncryptedPayload {
|
||||
delivery_address: self.ctrl_delivery_address(),
|
||||
data: encrypted_payload,
|
||||
};
|
||||
// Prepare commit message
|
||||
// TODO: (P1) Make GroupConvos agnostic to framing so its less error prone and more
|
||||
let env = addr_enc_payload.into_envelope(self.convo_id.clone());
|
||||
|
||||
cx.ds
|
||||
.publish(env)
|
||||
.map_err(|e| ChatError::Generic(format!("Publish: {e}")))
|
||||
self.send_payload(cx, commit.to_bytes()?)
|
||||
}
|
||||
}
|
||||
|
||||
@ -29,6 +29,6 @@ pub use outcomes::{
|
||||
pub use service_context::ExternalServices;
|
||||
pub use service_traits::{DeliveryService, RegistrationService, WakeupService};
|
||||
pub use shared_traits::{IdentId, IdentIdRef, IdentityProvider};
|
||||
pub use storage::ConversationKind;
|
||||
pub use storage::{ChatStore, ConversationKind};
|
||||
pub use types::AddressedEnvelope;
|
||||
pub use utils::{hex_trunc, trunc};
|
||||
|
||||
@ -13,6 +13,7 @@ components = { workspace = true }
|
||||
crossbeam-channel = { workspace = true }
|
||||
crypto = { workspace = true }
|
||||
libchat = { workspace = true }
|
||||
storage = { workspace = true }
|
||||
logos-account = { workspace = true, features = ["dev"]}
|
||||
shared-traits = { workspace = true }
|
||||
|
||||
|
||||
@ -1,19 +1,28 @@
|
||||
use logos_chat::{ChatClient, Event, InProcessDelivery, MessageBus};
|
||||
use components::EphemeralRegistry;
|
||||
use logos_chat::{ChatClientBuilder, Event, InProcessDelivery, MessageBus};
|
||||
use std::time::Duration;
|
||||
|
||||
fn main() {
|
||||
let bus = MessageBus::default();
|
||||
let saro_delivery = InProcessDelivery::new(bus.clone());
|
||||
let raya_delivery = InProcessDelivery::new(bus);
|
||||
let reg = EphemeralRegistry::new();
|
||||
|
||||
let (mut saro, saro_events) = ChatClient::new("saro", saro_delivery);
|
||||
let (mut raya, raya_events) = ChatClient::new("raya", raya_delivery);
|
||||
let (mut saro, saro_events) = ChatClientBuilder::new()
|
||||
.transport(InProcessDelivery::new(bus.clone()))
|
||||
.registration(reg.clone())
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let (mut raya, raya_events) = ChatClientBuilder::new()
|
||||
.transport(InProcessDelivery::new(bus))
|
||||
.registration(reg)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let raya_bundle = raya.create_intro_bundle().unwrap();
|
||||
#[allow(deprecated)]
|
||||
saro.create_conversation(&raya_bundle, b"hello raya")
|
||||
.unwrap();
|
||||
|
||||
// Raya's worker delivers the new conversation, then its initial message.
|
||||
let raya_convo_id = match raya_events.recv_timeout(Duration::from_secs(5)).unwrap() {
|
||||
Event::ConversationStarted { convo_id, .. } => convo_id,
|
||||
other => panic!("expected ConversationStarted, got {other:?}"),
|
||||
|
||||
215
crates/client/src/builder.rs
Normal file
215
crates/client/src/builder.rs
Normal file
@ -0,0 +1,215 @@
|
||||
use components::EphemeralRegistry;
|
||||
use crossbeam_channel::Receiver;
|
||||
use libchat::{ChatError, ChatStorage, IdentityProvider, RegistrationService, StorageConfig};
|
||||
use storage::ChatStore;
|
||||
|
||||
use crate::Transport;
|
||||
use crate::client::ChatClient;
|
||||
use crate::delegate::DelegateSigner;
|
||||
use crate::errors::ClientError;
|
||||
use crate::event::Event;
|
||||
|
||||
/// Marker for a builder field that has not been configured; the corresponding
|
||||
/// component will be filled in with a sensible default when `build()` is called.
|
||||
pub struct Unset;
|
||||
|
||||
pub struct ChatClientBuilder<I = Unset, T = Unset, R = Unset, S = Unset> {
|
||||
ident: I,
|
||||
transport: T,
|
||||
registration: R,
|
||||
storage: S,
|
||||
}
|
||||
|
||||
impl Default for ChatClientBuilder {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
ident: Unset,
|
||||
transport: Unset,
|
||||
registration: Unset,
|
||||
storage: Unset,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ChatClientBuilder {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
}
|
||||
|
||||
impl<I, T, R, S> ChatClientBuilder<I, T, R, S> {
|
||||
pub fn ident<NI>(self, ident: NI) -> ChatClientBuilder<NI, T, R, S> {
|
||||
ChatClientBuilder {
|
||||
ident,
|
||||
transport: self.transport,
|
||||
registration: self.registration,
|
||||
storage: self.storage,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn transport<NT>(self, transport: NT) -> ChatClientBuilder<I, NT, R, S> {
|
||||
ChatClientBuilder {
|
||||
ident: self.ident,
|
||||
transport,
|
||||
registration: self.registration,
|
||||
storage: self.storage,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn registration<NR>(self, registration: NR) -> ChatClientBuilder<I, T, NR, S> {
|
||||
ChatClientBuilder {
|
||||
ident: self.ident,
|
||||
transport: self.transport,
|
||||
registration,
|
||||
storage: self.storage,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn storage<NS>(self, storage: NS) -> ChatClientBuilder<I, T, R, NS> {
|
||||
ChatClientBuilder {
|
||||
ident: self.ident,
|
||||
transport: self.transport,
|
||||
registration: self.registration,
|
||||
storage,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn storage_config(self, config: StorageConfig) -> ChatClientBuilder<I, T, R, ChatStorage> {
|
||||
let storage = ChatStorage::new(config)
|
||||
.map_err(ChatError::from)
|
||||
.expect("Storage config file should be valid");
|
||||
|
||||
ChatClientBuilder {
|
||||
ident: self.ident,
|
||||
transport: self.transport,
|
||||
registration: self.registration,
|
||||
storage,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type Built<I, T, R, S> = Result<(ChatClient<I, T, R, S>, Receiver<Event>), ClientError>;
|
||||
|
||||
// All four explicitly provided.
|
||||
impl<I, T, R, S> ChatClientBuilder<I, T, R, S>
|
||||
where
|
||||
I: IdentityProvider + Send + 'static,
|
||||
T: Transport + Send + 'static,
|
||||
R: RegistrationService + Send + 'static,
|
||||
S: ChatStore + Send + 'static,
|
||||
{
|
||||
pub fn build(self) -> Built<I, T, R, S> {
|
||||
ChatClient::new(self.ident, self.transport, self.registration, self.storage)
|
||||
}
|
||||
}
|
||||
|
||||
// Transport only; I, R, S all default.
|
||||
impl<T: Transport + Send + 'static> ChatClientBuilder<Unset, T, Unset, Unset> {
|
||||
pub fn build(self) -> Built<DelegateSigner, T, EphemeralRegistry, ChatStorage> {
|
||||
ChatClient::new(
|
||||
DelegateSigner::random(),
|
||||
self.transport,
|
||||
EphemeralRegistry::new(),
|
||||
ChatStorage::in_memory(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// I and T; R and S default.
|
||||
impl<I, T> ChatClientBuilder<I, T, Unset, Unset>
|
||||
where
|
||||
I: IdentityProvider + Send + 'static,
|
||||
T: Transport + Send + 'static,
|
||||
{
|
||||
pub fn build(self) -> Built<I, T, EphemeralRegistry, ChatStorage> {
|
||||
ChatClient::new(
|
||||
self.ident,
|
||||
self.transport,
|
||||
EphemeralRegistry::new(),
|
||||
ChatStorage::in_memory(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// T and R; I and S default.
|
||||
impl<T, R> ChatClientBuilder<Unset, T, R, Unset>
|
||||
where
|
||||
T: Transport + Send + 'static,
|
||||
R: RegistrationService + Send + 'static,
|
||||
{
|
||||
pub fn build(self) -> Built<DelegateSigner, T, R, ChatStorage> {
|
||||
ChatClient::new(
|
||||
DelegateSigner::random(),
|
||||
self.transport,
|
||||
self.registration,
|
||||
ChatStorage::in_memory(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// T and S; I and R default.
|
||||
impl<T, S> ChatClientBuilder<Unset, T, Unset, S>
|
||||
where
|
||||
T: Transport + Send + 'static,
|
||||
S: ChatStore + Send + 'static,
|
||||
{
|
||||
pub fn build(self) -> Built<DelegateSigner, T, EphemeralRegistry, S> {
|
||||
ChatClient::new(
|
||||
DelegateSigner::random(),
|
||||
self.transport,
|
||||
EphemeralRegistry::new(),
|
||||
self.storage,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// I, T, and R; S defaults.
|
||||
impl<I, T, R> ChatClientBuilder<I, T, R, Unset>
|
||||
where
|
||||
I: IdentityProvider + Send + 'static,
|
||||
T: Transport + Send + 'static,
|
||||
R: RegistrationService + Send + 'static,
|
||||
{
|
||||
pub fn build(self) -> Built<I, T, R, ChatStorage> {
|
||||
ChatClient::new(
|
||||
self.ident,
|
||||
self.transport,
|
||||
self.registration,
|
||||
ChatStorage::in_memory(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// T, R, and S; I defaults.
|
||||
impl<T, R, S> ChatClientBuilder<Unset, T, R, S>
|
||||
where
|
||||
T: Transport + Send + 'static,
|
||||
R: RegistrationService + Send + 'static,
|
||||
S: ChatStore + Send + 'static,
|
||||
{
|
||||
pub fn build(self) -> Built<DelegateSigner, T, R, S> {
|
||||
ChatClient::new(
|
||||
DelegateSigner::random(),
|
||||
self.transport,
|
||||
self.registration,
|
||||
self.storage,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// I, T, and S; R defaults.
|
||||
impl<I, T, S> ChatClientBuilder<I, T, Unset, S>
|
||||
where
|
||||
I: IdentityProvider + Send + 'static,
|
||||
T: Transport + Send + 'static,
|
||||
S: ChatStore + Send + 'static,
|
||||
{
|
||||
pub fn build(self) -> Built<I, T, EphemeralRegistry, S> {
|
||||
ChatClient::new(
|
||||
self.ident,
|
||||
self.transport,
|
||||
EphemeralRegistry::new(),
|
||||
self.storage,
|
||||
)
|
||||
}
|
||||
}
|
||||
@ -1,21 +1,21 @@
|
||||
use std::sync::Arc;
|
||||
use std::thread::{self, JoinHandle};
|
||||
|
||||
use components::{EphemeralRegistry, ThreadedWakeupService, WakeupEvent};
|
||||
use components::{ThreadedWakeupService, WakeupEvent};
|
||||
use crossbeam_channel::{Receiver, Sender, select};
|
||||
use crypto::Ed25519VerifyingKey;
|
||||
use libchat::{
|
||||
AccountDirectory, ChatError, ChatStorage, ConversationId, ConvoOutcome, Core, DeliveryService,
|
||||
IdentId, IdentIdRef, InboxOutcome, Introduction, PayloadOutcome, RegistrationService,
|
||||
StorageConfig,
|
||||
AccountDirectory, ConversationId, ConvoOutcome, Core, DeliveryService, IdentId, IdentIdRef,
|
||||
IdentityProvider, InboxOutcome, Introduction, PayloadOutcome, RegistrationService,
|
||||
};
|
||||
use parking_lot::Mutex;
|
||||
use storage::ChatStore;
|
||||
|
||||
use crate::delegate::{DelegateCredential, DelegateSigner};
|
||||
use crate::delegate::DelegateCredential;
|
||||
use crate::errors::ClientError;
|
||||
use crate::event::Event;
|
||||
|
||||
type ClientCore<T, R> = Core<(DelegateSigner, T, R, ThreadedWakeupService, ChatStorage)>;
|
||||
type ClientCore<I, T, R, S> = Core<(I, T, R, ThreadedWakeupService, S)>;
|
||||
type AccountAddressRef<'a> = &'a str;
|
||||
type LocalSignerId = IdentId;
|
||||
|
||||
@ -40,120 +40,50 @@ pub trait Transport: DeliveryService + Send + 'static {
|
||||
/// caller's thread: they briefly lock the core, invoke it, and return — no
|
||||
/// message-passing round-trip. The `Arc`/`Mutex`/threads live entirely here;
|
||||
/// the core never mentions threads.
|
||||
pub struct ChatClient<T: DeliveryService, R: RegistrationService = EphemeralRegistry> {
|
||||
pub struct ChatClient<I, T, R, S>
|
||||
where
|
||||
I: IdentityProvider + Send + 'static,
|
||||
T: Transport + Send + 'static,
|
||||
R: RegistrationService + Send + 'static,
|
||||
S: ChatStore + Send + 'static,
|
||||
{
|
||||
/// `parking_lot::Mutex` for its eventual fairness: an inbound burst can't
|
||||
/// starve caller operations of the lock.
|
||||
core: Arc<Mutex<ClientCore<T, R>>>,
|
||||
core: Arc<Mutex<ClientCore<I, T, R, S>>>,
|
||||
/// Dropped on `Drop` to wake the worker's `select!` and shut it down.
|
||||
shutdown: Option<Sender<()>>,
|
||||
worker: Option<JoinHandle<()>>,
|
||||
address: String,
|
||||
}
|
||||
|
||||
// ── Default-registry constructors ────────────────────────────────────────────
|
||||
|
||||
impl<T: Transport> ChatClient<T, EphemeralRegistry> {
|
||||
/// Create an in-memory, ephemeral client. Identity is lost on drop.
|
||||
pub fn new(_: impl Into<String>, mut transport: T) -> (Self, Receiver<Event>) {
|
||||
let inbound = transport.inbound();
|
||||
let delegate = DelegateSigner::random();
|
||||
|
||||
let (wakeup_tx, wakeup_rx) = crossbeam_channel::unbounded();
|
||||
let wakeup_service = ThreadedWakeupService::new(wakeup_tx);
|
||||
let core = Core::new_with_name(
|
||||
delegate,
|
||||
transport,
|
||||
EphemeralRegistry::new(),
|
||||
wakeup_service,
|
||||
ChatStorage::in_memory(),
|
||||
)
|
||||
.unwrap();
|
||||
Self::spawn(core, inbound, wakeup_rx)
|
||||
}
|
||||
|
||||
/// Open or create a persistent client backed by `StorageConfig`.
|
||||
///
|
||||
/// If an identity already exists in storage it is loaded; otherwise a new
|
||||
/// one is created and saved.
|
||||
pub fn open(
|
||||
_: impl Into<String>,
|
||||
config: StorageConfig,
|
||||
mut transport: T,
|
||||
) -> Result<(Self, Receiver<Event>), ClientError> {
|
||||
let store = ChatStorage::new(config).map_err(ChatError::from)?;
|
||||
let inbound = transport.inbound();
|
||||
let delegate = DelegateSigner::random();
|
||||
let (wakeup_tx, wakeup_rx) = crossbeam_channel::unbounded();
|
||||
let wakeup_service = ThreadedWakeupService::new(wakeup_tx);
|
||||
let core = Core::new_from_store(
|
||||
delegate,
|
||||
transport,
|
||||
EphemeralRegistry::new(),
|
||||
wakeup_service,
|
||||
store,
|
||||
)?;
|
||||
Ok(Self::spawn(core, inbound, wakeup_rx))
|
||||
}
|
||||
}
|
||||
|
||||
// ── Caller-supplied registry + shared methods ────────────────────────────────
|
||||
|
||||
impl<T, R> ChatClient<T, R>
|
||||
// -- GenericChatClient
|
||||
impl<I, T, R, S> ChatClient<I, T, R, S>
|
||||
where
|
||||
I: IdentityProvider + Send + 'static,
|
||||
T: Transport + Send + 'static,
|
||||
R: RegistrationService + Send + 'static,
|
||||
S: ChatStore + Send + 'static,
|
||||
{
|
||||
/// Open or create a persistent client with a caller-supplied registration
|
||||
/// service. Use this to swap in a network-backed registry (e.g. the
|
||||
/// testnet KeyPackage Registry) in place of the default in-memory store.
|
||||
///
|
||||
/// Submits this account's KeyPackage to the registry as the last step of
|
||||
/// construction. The default in-memory `open` path skips this call, but
|
||||
/// when a real registry is wired in we want each session to publish so
|
||||
/// other clients can fetch it.
|
||||
pub fn open_with_registry(
|
||||
_: impl Into<String>,
|
||||
config: StorageConfig,
|
||||
mut transport: T,
|
||||
registry: R,
|
||||
) -> Result<(Self, Receiver<Event>), ClientError>
|
||||
where
|
||||
T: Transport,
|
||||
{
|
||||
let store = ChatStorage::new(config).map_err(ChatError::from)?;
|
||||
let inbound = transport.inbound();
|
||||
let delegate = DelegateSigner::random();
|
||||
let (wakeup_tx, wakeup_rx) = crossbeam_channel::unbounded();
|
||||
let wakeup_service = ThreadedWakeupService::new(wakeup_tx);
|
||||
let mut core = Core::new_from_store(delegate, transport, registry, wakeup_service, store)?;
|
||||
core.register_keypackage()?;
|
||||
Ok(Self::spawn(core, inbound, wakeup_rx))
|
||||
}
|
||||
|
||||
/// Create a client with ephemeral storage with the provided Transport and RegistrationService.
|
||||
pub fn new_ephemeral(
|
||||
delegate: DelegateSigner,
|
||||
pub fn new(
|
||||
ident: I,
|
||||
mut transport: T,
|
||||
reg: R,
|
||||
storage: S,
|
||||
) -> Result<(Self, Receiver<Event>), ClientError> {
|
||||
let inbound = transport.inbound();
|
||||
|
||||
let (wakeup_tx, wakeup_rx) = crossbeam_channel::unbounded();
|
||||
let wakeup_service = ThreadedWakeupService::new(wakeup_tx);
|
||||
let core = Core::new_with_name(
|
||||
delegate,
|
||||
transport,
|
||||
reg,
|
||||
wakeup_service,
|
||||
ChatStorage::in_memory(),
|
||||
)?;
|
||||
let core = Core::new_with_name(ident, transport, reg, wakeup_service, storage)?;
|
||||
Ok(Self::spawn(core, inbound, wakeup_rx))
|
||||
}
|
||||
|
||||
fn spawn(
|
||||
core: ClientCore<T, R>,
|
||||
core: ClientCore<I, T, R, S>,
|
||||
inbound: Receiver<Vec<u8>>,
|
||||
wakeup_events: Receiver<WakeupEvent>,
|
||||
) -> (Self, Receiver<Event>) {
|
||||
let address = core.ident_id().to_string();
|
||||
let core = Arc::new(Mutex::new(core));
|
||||
let (event_tx, event_rx) = crossbeam_channel::unbounded();
|
||||
let (shutdown_tx, shutdown_rx) = crossbeam_channel::bounded::<()>(0);
|
||||
@ -168,11 +98,16 @@ where
|
||||
core,
|
||||
shutdown: Some(shutdown_tx),
|
||||
worker: Some(worker),
|
||||
address,
|
||||
},
|
||||
event_rx,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn addr(&self) -> AccountAddressRef<'_> {
|
||||
&self.address
|
||||
}
|
||||
|
||||
/// Returns the installation name (identity label) of this client.
|
||||
pub fn installation_name(&self) -> String {
|
||||
self.core.lock().installation_name().to_string()
|
||||
@ -237,7 +172,13 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: DeliveryService, R: RegistrationService> Drop for ChatClient<T, R> {
|
||||
impl<I, T, R, S> Drop for ChatClient<I, T, R, S>
|
||||
where
|
||||
I: IdentityProvider + Send + 'static,
|
||||
T: Transport + Send + 'static,
|
||||
R: RegistrationService + Send + 'static,
|
||||
S: ChatStore + Send + 'static,
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
// Dropping the sender disconnects the worker's shutdown channel, waking
|
||||
// its `select!` so it can exit; then we join it.
|
||||
@ -251,8 +192,8 @@ impl<T: DeliveryService, R: RegistrationService> Drop for ChatClient<T, R> {
|
||||
/// Background loop: block until an inbound payload or shutdown arrives, drive
|
||||
/// the core on each payload, and forward events. No polling — `select!` parks
|
||||
/// the thread until one of the channels is ready.
|
||||
fn worker_loop<T, R>(
|
||||
core: Arc<Mutex<ClientCore<T, R>>>,
|
||||
fn worker_loop<I: IdentityProvider + 'static, T, R, S: ChatStore + 'static>(
|
||||
core: Arc<Mutex<ClientCore<I, T, R, S>>>,
|
||||
inbound: Receiver<Vec<u8>>,
|
||||
wakeup_events: Receiver<WakeupEvent>,
|
||||
shutdown: Receiver<()>,
|
||||
|
||||
@ -1,9 +1,11 @@
|
||||
mod builder;
|
||||
mod client;
|
||||
mod delegate;
|
||||
mod delivery_in_process;
|
||||
mod errors;
|
||||
mod event;
|
||||
|
||||
pub use builder::{ChatClientBuilder, Unset};
|
||||
pub use client::{ChatClient, Transport};
|
||||
pub use delegate::DelegateSigner;
|
||||
pub use delivery_in_process::{InProcessDelivery, MessageBus};
|
||||
@ -12,8 +14,8 @@ pub use event::Event;
|
||||
|
||||
// Re-export types callers need to interact with ChatClient.
|
||||
pub use libchat::{
|
||||
AddressedEnvelope, ConversationClass, ConversationId, DeliveryService, RegistrationService,
|
||||
StorageConfig,
|
||||
AddressedEnvelope, ChatStore, ConversationClass, ConversationId, DeliveryService,
|
||||
IdentityProvider, RegistrationService, StorageConfig,
|
||||
};
|
||||
|
||||
// Re-export bundled registry implementations so callers can pick one without
|
||||
|
||||
@ -6,8 +6,8 @@ use crypto::Ed25519VerifyingKey;
|
||||
use libchat::{AccountDirectory, IdentityProvider, SignedDeviceBundle, encode_bundle_payload};
|
||||
use logos_account::TestLogosAccount;
|
||||
use logos_chat::{
|
||||
AddressedEnvelope, ChatClient, DelegateSigner, DeliveryService, Event, InProcessDelivery,
|
||||
MessageBus, StorageConfig, Transport,
|
||||
AddressedEnvelope, ChatClient, ChatClientBuilder, DelegateSigner, DeliveryService, Event,
|
||||
InProcessDelivery, MessageBus, Transport,
|
||||
};
|
||||
|
||||
/// Publish a signed device bundle endorsing `device` as a device of `account`,
|
||||
@ -27,6 +27,24 @@ fn publish_device_bundle(
|
||||
reg.publish(&bundle).unwrap();
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn create_test_client(
|
||||
message_bus: MessageBus,
|
||||
reg: EphemeralRegistry,
|
||||
) -> Result<
|
||||
(
|
||||
ChatClient<DelegateSigner, InProcessDelivery, EphemeralRegistry, libchat::ChatStorage>,
|
||||
Receiver<Event>,
|
||||
),
|
||||
logos_chat::ClientError,
|
||||
> {
|
||||
let d = InProcessDelivery::new(message_bus);
|
||||
ChatClientBuilder::new()
|
||||
.transport(d)
|
||||
.registration(reg)
|
||||
.build()
|
||||
}
|
||||
|
||||
/// Block until the next event arrives and matches; panic on timeout/mismatch.
|
||||
fn expect_event<F, T>(events: &Receiver<Event>, label: &str, mut f: F) -> T
|
||||
where
|
||||
@ -41,8 +59,35 @@ where
|
||||
#[test]
|
||||
fn direct_v1_integration() {
|
||||
let bus = MessageBus::default();
|
||||
let saro_delivery = InProcessDelivery::new(bus.clone());
|
||||
let raya_delivery = InProcessDelivery::new(bus);
|
||||
let reg_service = EphemeralRegistry::new();
|
||||
|
||||
let (mut saro, _saro_events) =
|
||||
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
|
||||
let (raya, raya_events) =
|
||||
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
|
||||
|
||||
let convo_id = saro.create_direct_conversation(raya.addr()).unwrap();
|
||||
|
||||
// The invite payload yields ConversationStarted then MessageReceived.
|
||||
expect_event(&raya_events, "ConversationStarted", |e| match e {
|
||||
Event::ConversationStarted { convo_id, .. } => Ok(convo_id),
|
||||
other => Err(other),
|
||||
});
|
||||
|
||||
saro.send_message(&convo_id, b"Hey from saro")
|
||||
.expect("payload mismatch");
|
||||
expect_event(&raya_events, "MessageReceived", |e| match e {
|
||||
Event::MessageReceived { content, .. } => {
|
||||
assert_eq!(content.as_slice(), b"Hey from saro");
|
||||
Ok(())
|
||||
}
|
||||
other => Err(other),
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn direct_v1_standalone_integration() {
|
||||
let bus = MessageBus::default();
|
||||
|
||||
let mut reg_service = EphemeralRegistry::new();
|
||||
|
||||
@ -58,16 +103,14 @@ fn direct_v1_integration() {
|
||||
let mut raya_delegate = DelegateSigner::random();
|
||||
raya_delegate.associate(hex::encode(raya_account.public_key().as_ref()));
|
||||
publish_device_bundle(&mut reg_service, &raya_account, raya_delegate.public_key());
|
||||
let raya_delegate_id = raya_delegate.id().clone();
|
||||
|
||||
let (mut saro, _saro_events) =
|
||||
ChatClient::new_ephemeral(saro_delegate, saro_delivery, reg_service.clone()).unwrap();
|
||||
let (_raya, raya_events) =
|
||||
ChatClient::new_ephemeral(raya_delegate, raya_delivery, reg_service.clone()).unwrap();
|
||||
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
|
||||
let (raya, raya_events) =
|
||||
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
|
||||
|
||||
let convo_id = saro
|
||||
.create_direct_conversation(raya_delegate_id.as_str())
|
||||
.unwrap();
|
||||
let raya_addr = raya.addr();
|
||||
let convo_id = saro.create_direct_conversation(raya_addr).unwrap();
|
||||
|
||||
// The invite payload yields ConversationStarted then MessageReceived.
|
||||
expect_event(&raya_events, "ConversationStarted", |e| match e {
|
||||
@ -89,22 +132,26 @@ fn direct_v1_integration() {
|
||||
#[test]
|
||||
fn saro_raya_message_exchange() {
|
||||
let bus = MessageBus::default();
|
||||
let saro_delivery = InProcessDelivery::new(bus.clone());
|
||||
let raya_delivery = InProcessDelivery::new(bus);
|
||||
let reg_service = EphemeralRegistry::new();
|
||||
|
||||
let (mut saro, saro_events) = ChatClient::new("saro", saro_delivery);
|
||||
let (mut raya, raya_events) = ChatClient::new("raya", raya_delivery);
|
||||
let (mut saro, saro_events) =
|
||||
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
|
||||
let (mut raya, raya_events) =
|
||||
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
|
||||
|
||||
let raya_bundle = raya.create_intro_bundle().unwrap();
|
||||
let saro_convo_id = saro
|
||||
.create_conversation(&raya_bundle, b"hello raya")
|
||||
.unwrap();
|
||||
.create_direct_conversation(raya.addr())
|
||||
.expect("convo create");
|
||||
|
||||
// The invite payload yields ConversationStarted then MessageReceived.
|
||||
// Wait for raya to process the Welcome and subscribe to the convo delivery
|
||||
// address before saro sends — MessageBus only fans out to current subscribers,
|
||||
// so a message sent before raya subscribes would be silently dropped.
|
||||
let raya_convo_id = expect_event(&raya_events, "ConversationStarted", |e| match e {
|
||||
Event::ConversationStarted { convo_id, .. } => Ok(convo_id),
|
||||
other => Err(other),
|
||||
});
|
||||
|
||||
saro.send_message(&saro_convo_id, b"hello raya").unwrap();
|
||||
expect_event(&raya_events, "MessageReceived", |e| match e {
|
||||
Event::MessageReceived { convo_id, content } => {
|
||||
assert_eq!(convo_id, raya_convo_id);
|
||||
@ -201,8 +248,9 @@ impl Transport for FailingDelivery {
|
||||
|
||||
#[test]
|
||||
fn dropping_client_shuts_down_worker() {
|
||||
let delivery = InProcessDelivery::new(MessageBus::default());
|
||||
let (client, events) = ChatClient::new("saro", delivery);
|
||||
let (client, events) =
|
||||
create_test_client(MessageBus::default(), EphemeralRegistry::new()).expect("client create");
|
||||
|
||||
drop(client);
|
||||
// Drop joins the worker; once joined its Sender<Event> is gone, so recv
|
||||
// reports the channel as disconnected.
|
||||
@ -213,32 +261,17 @@ fn dropping_client_shuts_down_worker() {
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn publish_failure_surfaces_as_error() {
|
||||
// A real raya just to mint a valid intro bundle.
|
||||
let raya_delivery = InProcessDelivery::new(MessageBus::default());
|
||||
let (mut raya, _raya_events) = ChatClient::new("raya", raya_delivery);
|
||||
let bundle = raya.create_intro_bundle().unwrap();
|
||||
|
||||
// FailingDelivery never receives; keep the inbound sender alive so the
|
||||
// worker doesn't exit early on a disconnected channel.
|
||||
let delivery = FailingDelivery::new();
|
||||
let _keep_inbound = delivery.inbound_sender();
|
||||
let (mut saro, _saro_events) = ChatClient::new("saro", delivery);
|
||||
let result = saro.create_conversation(&bundle, b"hello");
|
||||
assert!(
|
||||
result.is_err(),
|
||||
"publish failure should surface as an error on the synchronous call"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn malformed_inbound_surfaces_as_error_event() {
|
||||
// Feed the worker's inbound channel bytes that can't be decoded and assert
|
||||
// it emits an InboundError instead of silently dropping the failure.
|
||||
let delivery = FailingDelivery::new();
|
||||
let inbound_tx = delivery.inbound_sender();
|
||||
let (_saro, events) = ChatClient::new("saro", delivery);
|
||||
|
||||
let (_client, events) = ChatClientBuilder::new()
|
||||
.transport(delivery)
|
||||
.build()
|
||||
.expect("client create");
|
||||
|
||||
inbound_tx.send(b"not a valid payload".to_vec()).unwrap();
|
||||
|
||||
@ -250,24 +283,3 @@ fn malformed_inbound_surfaces_as_error_event() {
|
||||
other => Err(other),
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn open_persistent_client() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let db_path = dir.path().join("test.db").to_string_lossy().to_string();
|
||||
let config = StorageConfig::File(db_path);
|
||||
|
||||
let delivery1 = InProcessDelivery::new(MessageBus::default());
|
||||
let (client1, _events1) = ChatClient::open("saro", config.clone(), delivery1).unwrap();
|
||||
let name1 = client1.installation_name();
|
||||
drop(client1);
|
||||
|
||||
let delivery2 = InProcessDelivery::new(MessageBus::default());
|
||||
let (client2, _events2) = ChatClient::open("saro", config, delivery2).unwrap();
|
||||
let name2 = client2.installation_name();
|
||||
|
||||
assert_eq!(
|
||||
name1, name2,
|
||||
"installation name should persist across restarts"
|
||||
);
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user