Simplify client

This commit is contained in:
Jazz Turner-Baggs 2026-06-22 00:41:13 -07:00
parent e1921b944d
commit e055b16761
No known key found for this signature in database
7 changed files with 283 additions and 155 deletions

1
Cargo.lock generated
View File

@ -3605,6 +3605,7 @@ dependencies = [
"logos-account",
"parking_lot",
"shared-traits",
"storage",
"tempfile",
"thiserror",
"tracing",

View File

@ -13,6 +13,7 @@ components = { workspace = true }
crossbeam-channel = { workspace = true }
crypto = { workspace = true }
libchat = { workspace = true }
storage = { workspace = true }
logos-account = { workspace = true, features = ["dev"]}
shared-traits = { workspace = true }

View File

@ -1,19 +1,28 @@
use logos_chat::{ChatClient, Event, InProcessDelivery, MessageBus};
use components::EphemeralRegistry;
use logos_chat::{ChatClientBuilder, Event, InProcessDelivery, MessageBus};
use std::time::Duration;
fn main() {
let bus = MessageBus::default();
let saro_delivery = InProcessDelivery::new(bus.clone());
let raya_delivery = InProcessDelivery::new(bus);
let reg = EphemeralRegistry::new();
let (mut saro, saro_events) = ChatClient::new("saro", saro_delivery);
let (mut raya, raya_events) = ChatClient::new("raya", raya_delivery);
let (mut saro, saro_events) = ChatClientBuilder::new()
.transport(InProcessDelivery::new(bus.clone()))
.registration(reg.clone())
.build()
.unwrap();
let (mut raya, raya_events) = ChatClientBuilder::new()
.transport(InProcessDelivery::new(bus))
.registration(reg)
.build()
.unwrap();
let raya_bundle = raya.create_intro_bundle().unwrap();
#[allow(deprecated)]
saro.create_conversation(&raya_bundle, b"hello raya")
.unwrap();
// Raya's worker delivers the new conversation, then its initial message.
let raya_convo_id = match raya_events.recv_timeout(Duration::from_secs(5)).unwrap() {
Event::ConversationStarted { convo_id, .. } => convo_id,
other => panic!("expected ConversationStarted, got {other:?}"),
@ -21,10 +30,7 @@ fn main() {
if let Event::MessageReceived { content, .. } =
raya_events.recv_timeout(Duration::from_secs(5)).unwrap()
{
println!(
"Raya received: {:?}",
std::str::from_utf8(&content).unwrap()
);
println!("Raya received: {:?}", std::str::from_utf8(&content).unwrap());
}
raya.send_message(&raya_convo_id, b"hi saro").unwrap();
@ -32,10 +38,7 @@ fn main() {
if let Event::MessageReceived { content, .. } =
saro_events.recv_timeout(Duration::from_secs(5)).unwrap()
{
println!(
"Saro received: {:?}",
std::str::from_utf8(&content).unwrap()
);
println!("Saro received: {:?}", std::str::from_utf8(&content).unwrap());
}
println!("Message exchange complete.");

View File

@ -0,0 +1,147 @@
use crossbeam_channel::Receiver;
use components::EphemeralRegistry;
use libchat::{ChatStorage, IdentityProvider, RegistrationService};
use storage::ChatStore;
use crate::client::ChatClient;
use crate::delegate::DelegateSigner;
use crate::errors::ClientError;
use crate::event::Event;
use crate::Transport;
/// Marker for a builder field that has not been configured; the corresponding
/// component will be filled in with a sensible default when `build()` is called.
pub struct Unset;
pub struct ChatClientBuilder<I = Unset, T = Unset, R = Unset, S = Unset> {
ident: I,
transport: T,
registration: R,
storage: S,
}
impl Default for ChatClientBuilder {
fn default() -> Self {
Self {
ident: Unset,
transport: Unset,
registration: Unset,
storage: Unset,
}
}
}
impl ChatClientBuilder {
pub fn new() -> Self {
Self::default()
}
}
impl<I, T, R, S> ChatClientBuilder<I, T, R, S> {
pub fn ident<NI>(self, ident: NI) -> ChatClientBuilder<NI, T, R, S> {
ChatClientBuilder { ident, transport: self.transport, registration: self.registration, storage: self.storage }
}
pub fn transport<NT>(self, transport: NT) -> ChatClientBuilder<I, NT, R, S> {
ChatClientBuilder { ident: self.ident, transport, registration: self.registration, storage: self.storage }
}
pub fn registration<NR>(self, registration: NR) -> ChatClientBuilder<I, T, NR, S> {
ChatClientBuilder { ident: self.ident, transport: self.transport, registration, storage: self.storage }
}
pub fn storage<NS>(self, storage: NS) -> ChatClientBuilder<I, T, R, NS> {
ChatClientBuilder { ident: self.ident, transport: self.transport, registration: self.registration, storage }
}
}
type Built<I, T, R, S> = Result<(ChatClient<I, T, R, S>, Receiver<Event>), ClientError>;
// All four explicitly provided.
impl<I, T, R, S> ChatClientBuilder<I, T, R, S>
where
I: IdentityProvider + Send + 'static,
T: Transport + Send + 'static,
R: RegistrationService + Send + 'static,
S: ChatStore + Send + 'static,
{
pub fn build(self) -> Built<I, T, R, S> {
ChatClient::new(self.ident, self.transport, self.registration, self.storage)
}
}
// Transport only; I, R, S all default.
impl<T: Transport + Send + 'static> ChatClientBuilder<Unset, T, Unset, Unset> {
pub fn build(self) -> Built<DelegateSigner, T, EphemeralRegistry, ChatStorage> {
ChatClient::new(DelegateSigner::random(), self.transport, EphemeralRegistry::new(), ChatStorage::in_memory())
}
}
// I and T; R and S default.
impl<I, T> ChatClientBuilder<I, T, Unset, Unset>
where
I: IdentityProvider + Send + 'static,
T: Transport + Send + 'static,
{
pub fn build(self) -> Built<I, T, EphemeralRegistry, ChatStorage> {
ChatClient::new(self.ident, self.transport, EphemeralRegistry::new(), ChatStorage::in_memory())
}
}
// T and R; I and S default.
impl<T, R> ChatClientBuilder<Unset, T, R, Unset>
where
T: Transport + Send + 'static,
R: RegistrationService + Send + 'static,
{
pub fn build(self) -> Built<DelegateSigner, T, R, ChatStorage> {
ChatClient::new(DelegateSigner::random(), self.transport, self.registration, ChatStorage::in_memory())
}
}
// T and S; I and R default.
impl<T, S> ChatClientBuilder<Unset, T, Unset, S>
where
T: Transport + Send + 'static,
S: ChatStore + Send + 'static,
{
pub fn build(self) -> Built<DelegateSigner, T, EphemeralRegistry, S> {
ChatClient::new(DelegateSigner::random(), self.transport, EphemeralRegistry::new(), self.storage)
}
}
// I, T, and R; S defaults.
impl<I, T, R> ChatClientBuilder<I, T, R, Unset>
where
I: IdentityProvider + Send + 'static,
T: Transport + Send + 'static,
R: RegistrationService + Send + 'static,
{
pub fn build(self) -> Built<I, T, R, ChatStorage> {
ChatClient::new(self.ident, self.transport, self.registration, ChatStorage::in_memory())
}
}
// T, R, and S; I defaults.
impl<T, R, S> ChatClientBuilder<Unset, T, R, S>
where
T: Transport + Send + 'static,
R: RegistrationService + Send + 'static,
S: ChatStore + Send + 'static,
{
pub fn build(self) -> Built<DelegateSigner, T, R, S> {
ChatClient::new(DelegateSigner::random(), self.transport, self.registration, self.storage)
}
}
// I, T, and S; R defaults.
impl<I, T, S> ChatClientBuilder<I, T, Unset, S>
where
I: IdentityProvider + Send + 'static,
T: Transport + Send + 'static,
S: ChatStore + Send + 'static,
{
pub fn build(self) -> Built<I, T, EphemeralRegistry, S> {
ChatClient::new(self.ident, self.transport, EphemeralRegistry::new(), self.storage)
}
}

View File

@ -1,21 +1,21 @@
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use components::{EphemeralRegistry, ThreadedWakeupService, WakeupEvent};
use components::{ThreadedWakeupService, WakeupEvent};
use crossbeam_channel::{Receiver, Sender, select};
use crypto::Ed25519VerifyingKey;
use libchat::{
AccountDirectory, ChatError, ChatStorage, ConversationId, ConvoOutcome, Core, DeliveryService,
IdentId, IdentIdRef, InboxOutcome, Introduction, PayloadOutcome, RegistrationService,
StorageConfig,
AccountDirectory, ConversationId, ConvoOutcome, Core, DeliveryService, IdentId, IdentIdRef,
IdentityProvider, InboxOutcome, Introduction, PayloadOutcome, RegistrationService,
};
use parking_lot::Mutex;
use storage::ChatStore;
use crate::delegate::{DelegateCredential, DelegateSigner};
use crate::delegate::DelegateCredential;
use crate::errors::ClientError;
use crate::event::Event;
type ClientCore<T, R> = Core<(DelegateSigner, T, R, ThreadedWakeupService, ChatStorage)>;
type ClientCore<I, T, R, S> = Core<(I, T, R, ThreadedWakeupService, S)>;
type AccountAddressRef<'a> = &'a str;
type LocalSignerId = IdentId;
@ -40,120 +40,50 @@ pub trait Transport: DeliveryService + Send + 'static {
/// caller's thread: they briefly lock the core, invoke it, and return — no
/// message-passing round-trip. The `Arc`/`Mutex`/threads live entirely here;
/// the core never mentions threads.
pub struct ChatClient<T: DeliveryService, R: RegistrationService = EphemeralRegistry> {
pub struct ChatClient<I, T, R, S>
where
I: IdentityProvider + Send + 'static,
T: Transport + Send + 'static,
R: RegistrationService + Send + 'static,
S: ChatStore + Send + 'static,
{
/// `parking_lot::Mutex` for its eventual fairness: an inbound burst can't
/// starve caller operations of the lock.
core: Arc<Mutex<ClientCore<T, R>>>,
core: Arc<Mutex<ClientCore<I, T, R, S>>>,
/// Dropped on `Drop` to wake the worker's `select!` and shut it down.
shutdown: Option<Sender<()>>,
worker: Option<JoinHandle<()>>,
address: String,
}
// ── Default-registry constructors ────────────────────────────────────────────
impl<T: Transport> ChatClient<T, EphemeralRegistry> {
/// Create an in-memory, ephemeral client. Identity is lost on drop.
pub fn new(_: impl Into<String>, mut transport: T) -> (Self, Receiver<Event>) {
let inbound = transport.inbound();
let delegate = DelegateSigner::random();
let (wakeup_tx, wakeup_rx) = crossbeam_channel::unbounded();
let wakeup_service = ThreadedWakeupService::new(wakeup_tx);
let core = Core::new_with_name(
delegate,
transport,
EphemeralRegistry::new(),
wakeup_service,
ChatStorage::in_memory(),
)
.unwrap();
Self::spawn(core, inbound, wakeup_rx)
}
/// Open or create a persistent client backed by `StorageConfig`.
///
/// If an identity already exists in storage it is loaded; otherwise a new
/// one is created and saved.
pub fn open(
_: impl Into<String>,
config: StorageConfig,
mut transport: T,
) -> Result<(Self, Receiver<Event>), ClientError> {
let store = ChatStorage::new(config).map_err(ChatError::from)?;
let inbound = transport.inbound();
let delegate = DelegateSigner::random();
let (wakeup_tx, wakeup_rx) = crossbeam_channel::unbounded();
let wakeup_service = ThreadedWakeupService::new(wakeup_tx);
let core = Core::new_from_store(
delegate,
transport,
EphemeralRegistry::new(),
wakeup_service,
store,
)?;
Ok(Self::spawn(core, inbound, wakeup_rx))
}
}
// ── Caller-supplied registry + shared methods ────────────────────────────────
impl<T, R> ChatClient<T, R>
// -- GenericChatClient
impl<I, T, R, S> ChatClient<I, T, R, S>
where
I: IdentityProvider + Send + 'static,
T: Transport + Send + 'static,
R: RegistrationService + Send + 'static,
S: ChatStore + Send + 'static,
{
/// Open or create a persistent client with a caller-supplied registration
/// service. Use this to swap in a network-backed registry (e.g. the
/// testnet KeyPackage Registry) in place of the default in-memory store.
///
/// Submits this account's KeyPackage to the registry as the last step of
/// construction. The default in-memory `open` path skips this call, but
/// when a real registry is wired in we want each session to publish so
/// other clients can fetch it.
pub fn open_with_registry(
_: impl Into<String>,
config: StorageConfig,
mut transport: T,
registry: R,
) -> Result<(Self, Receiver<Event>), ClientError>
where
T: Transport,
{
let store = ChatStorage::new(config).map_err(ChatError::from)?;
let inbound = transport.inbound();
let delegate = DelegateSigner::random();
let (wakeup_tx, wakeup_rx) = crossbeam_channel::unbounded();
let wakeup_service = ThreadedWakeupService::new(wakeup_tx);
let mut core = Core::new_from_store(delegate, transport, registry, wakeup_service, store)?;
core.register_keypackage()?;
Ok(Self::spawn(core, inbound, wakeup_rx))
}
/// Create a client with ephemeral storage with the provided Transport and RegistrationService.
pub fn new_ephemeral(
delegate: DelegateSigner,
pub fn new(
ident: I,
mut transport: T,
reg: R,
storage: S,
) -> Result<(Self, Receiver<Event>), ClientError> {
let inbound = transport.inbound();
let (wakeup_tx, wakeup_rx) = crossbeam_channel::unbounded();
let wakeup_service = ThreadedWakeupService::new(wakeup_tx);
let core = Core::new_with_name(
delegate,
transport,
reg,
wakeup_service,
ChatStorage::in_memory(),
)?;
let core = Core::new_with_name(ident, transport, reg, wakeup_service, storage)?;
Ok(Self::spawn(core, inbound, wakeup_rx))
}
fn spawn(
core: ClientCore<T, R>,
core: ClientCore<I, T, R, S>,
inbound: Receiver<Vec<u8>>,
wakeup_events: Receiver<WakeupEvent>,
) -> (Self, Receiver<Event>) {
let address = core.ident_id().to_string();
let core = Arc::new(Mutex::new(core));
let (event_tx, event_rx) = crossbeam_channel::unbounded();
let (shutdown_tx, shutdown_rx) = crossbeam_channel::bounded::<()>(0);
@ -168,11 +98,16 @@ where
core,
shutdown: Some(shutdown_tx),
worker: Some(worker),
address,
},
event_rx,
)
}
pub fn addr(&self) -> AccountAddressRef<'_> {
&self.address
}
/// Returns the installation name (identity label) of this client.
pub fn installation_name(&self) -> String {
self.core.lock().installation_name().to_string()
@ -201,6 +136,7 @@ where
/// envelopes are published by the core. Returns this side's conversation ID.
///
/// This function will be deprecated in the future. Use `create_direct_conversation`
#[deprecated(note = "use create_direct_conversation")]
pub fn create_conversation(
&mut self,
intro_bundle: &[u8],
@ -237,7 +173,13 @@ where
}
}
impl<T: DeliveryService, R: RegistrationService> Drop for ChatClient<T, R> {
impl<I, T, R, S> Drop for ChatClient<I, T, R, S>
where
I: IdentityProvider + Send + 'static,
T: Transport + Send + 'static,
R: RegistrationService + Send + 'static,
S: ChatStore + Send + 'static,
{
fn drop(&mut self) {
// Dropping the sender disconnects the worker's shutdown channel, waking
// its `select!` so it can exit; then we join it.
@ -251,8 +193,8 @@ impl<T: DeliveryService, R: RegistrationService> Drop for ChatClient<T, R> {
/// Background loop: block until an inbound payload or shutdown arrives, drive
/// the core on each payload, and forward events. No polling — `select!` parks
/// the thread until one of the channels is ready.
fn worker_loop<T, R>(
core: Arc<Mutex<ClientCore<T, R>>>,
fn worker_loop<I: IdentityProvider + 'static, T, R, S: ChatStore + 'static>(
core: Arc<Mutex<ClientCore<I, T, R, S>>>,
inbound: Receiver<Vec<u8>>,
wakeup_events: Receiver<WakeupEvent>,
shutdown: Receiver<()>,

View File

@ -1,9 +1,11 @@
mod builder;
mod client;
mod delegate;
mod delivery_in_process;
mod errors;
mod event;
pub use builder::{ChatClientBuilder, Unset};
pub use client::{ChatClient, Transport};
pub use delegate::DelegateSigner;
pub use delivery_in_process::{InProcessDelivery, MessageBus};

View File

@ -6,8 +6,8 @@ use crypto::Ed25519VerifyingKey;
use libchat::{AccountDirectory, IdentityProvider, SignedDeviceBundle, encode_bundle_payload};
use logos_account::TestLogosAccount;
use logos_chat::{
AddressedEnvelope, ChatClient, DelegateSigner, DeliveryService, Event, InProcessDelivery,
MessageBus, StorageConfig, Transport,
AddressedEnvelope, ChatClient, ChatClientBuilder, DelegateSigner, DeliveryService, Event,
InProcessDelivery, MessageBus, Transport,
};
/// Publish a signed device bundle endorsing `device` as a device of `account`,
@ -27,6 +27,23 @@ fn publish_device_bundle(
reg.publish(&bundle).unwrap();
}
fn create_test_client(
message_bus: MessageBus,
reg: EphemeralRegistry,
) -> Result<
(
ChatClient<DelegateSigner, InProcessDelivery, EphemeralRegistry, libchat::ChatStorage>,
Receiver<Event>,
),
logos_chat::ClientError,
> {
let d = InProcessDelivery::new(message_bus);
ChatClientBuilder::new()
.transport(d)
.registration(reg)
.build()
}
/// Block until the next event arrives and matches; panic on timeout/mismatch.
fn expect_event<F, T>(events: &Receiver<Event>, label: &str, mut f: F) -> T
where
@ -41,8 +58,35 @@ where
#[test]
fn direct_v1_integration() {
let bus = MessageBus::default();
let saro_delivery = InProcessDelivery::new(bus.clone());
let raya_delivery = InProcessDelivery::new(bus);
let reg_service = EphemeralRegistry::new();
let (mut saro, _saro_events) =
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
let (raya, raya_events) =
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
let convo_id = saro.create_direct_conversation(raya.addr()).unwrap();
// The invite payload yields ConversationStarted then MessageReceived.
expect_event(&raya_events, "ConversationStarted", |e| match e {
Event::ConversationStarted { convo_id, .. } => Ok(convo_id),
other => Err(other),
});
saro.send_message(&convo_id, b"Hey from saro")
.expect("payload mismatch");
expect_event(&raya_events, "MessageReceived", |e| match e {
Event::MessageReceived { content, .. } => {
assert_eq!(content.as_slice(), b"Hey from saro");
Ok(())
}
other => Err(other),
});
}
#[test]
fn direct_v1_standalone_integration() {
let bus = MessageBus::default();
let mut reg_service = EphemeralRegistry::new();
@ -61,13 +105,12 @@ fn direct_v1_integration() {
let raya_delegate_id = raya_delegate.id().clone();
let (mut saro, _saro_events) =
ChatClient::new_ephemeral(saro_delegate, saro_delivery, reg_service.clone()).unwrap();
let (_raya, raya_events) =
ChatClient::new_ephemeral(raya_delegate, raya_delivery, reg_service.clone()).unwrap();
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
let (raya, raya_events) =
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
let convo_id = saro
.create_direct_conversation(raya_delegate_id.as_str())
.unwrap();
let raya_addr = raya.addr();
let convo_id = saro.create_direct_conversation(raya_addr).unwrap();
// The invite payload yields ConversationStarted then MessageReceived.
expect_event(&raya_events, "ConversationStarted", |e| match e {
@ -89,11 +132,12 @@ fn direct_v1_integration() {
#[test]
fn saro_raya_message_exchange() {
let bus = MessageBus::default();
let saro_delivery = InProcessDelivery::new(bus.clone());
let raya_delivery = InProcessDelivery::new(bus);
let reg_service = EphemeralRegistry::new();
let (mut saro, saro_events) = ChatClient::new("saro", saro_delivery);
let (mut raya, raya_events) = ChatClient::new("raya", raya_delivery);
let (mut saro, saro_events) =
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
let (mut raya, raya_events) =
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
let raya_bundle = raya.create_intro_bundle().unwrap();
let saro_convo_id = saro
@ -201,8 +245,9 @@ impl Transport for FailingDelivery {
#[test]
fn dropping_client_shuts_down_worker() {
let delivery = InProcessDelivery::new(MessageBus::default());
let (client, events) = ChatClient::new("saro", delivery);
let (client, events) =
create_test_client(MessageBus::default(), EphemeralRegistry::new()).expect("client create");
drop(client);
// Drop joins the worker; once joined its Sender<Event> is gone, so recv
// reports the channel as disconnected.
@ -216,16 +261,20 @@ fn dropping_client_shuts_down_worker() {
#[test]
fn publish_failure_surfaces_as_error() {
// A real raya just to mint a valid intro bundle.
let raya_delivery = InProcessDelivery::new(MessageBus::default());
let (mut raya, _raya_events) = ChatClient::new("raya", raya_delivery);
let bundle = raya.create_intro_bundle().unwrap();
let (mut saro, _events) =
create_test_client(MessageBus::default(), EphemeralRegistry::new()).expect("client create");
// FailingDelivery never receives; keep the inbound sender alive so the
// worker doesn't exit early on a disconnected channel.
let delivery = FailingDelivery::new();
let _keep_inbound = delivery.inbound_sender();
let (mut saro, _saro_events) = ChatClient::new("saro", delivery);
let result = saro.create_conversation(&bundle, b"hello");
let (raya, _raya_events) = ChatClientBuilder::new()
.transport(delivery)
.build()
.expect("create client");
let result = saro.create_direct_conversation(raya.addr());
assert!(
result.is_err(),
"publish failure should surface as an error on the synchronous call"
@ -238,7 +287,11 @@ fn malformed_inbound_surfaces_as_error_event() {
// it emits an InboundError instead of silently dropping the failure.
let delivery = FailingDelivery::new();
let inbound_tx = delivery.inbound_sender();
let (_saro, events) = ChatClient::new("saro", delivery);
let (_client, events) = ChatClientBuilder::new()
.transport(delivery)
.build()
.expect("client create");
inbound_tx.send(b"not a valid payload".to_vec()).unwrap();
@ -250,24 +303,3 @@ fn malformed_inbound_surfaces_as_error_event() {
other => Err(other),
});
}
#[test]
fn open_persistent_client() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.db").to_string_lossy().to_string();
let config = StorageConfig::File(db_path);
let delivery1 = InProcessDelivery::new(MessageBus::default());
let (client1, _events1) = ChatClient::open("saro", config.clone(), delivery1).unwrap();
let name1 = client1.installation_name();
drop(client1);
let delivery2 = InProcessDelivery::new(MessageBus::default());
let (client2, _events2) = ChatClient::open("saro", config, delivery2).unwrap();
let name2 = client2.installation_name();
assert_eq!(
name1, name2,
"installation name should persist across restarts"
);
}