feat(client): add threaded transport polling (#125)

The client, not the app, now drives the transport; events are delivered
asynchronously, per ADR 0001.

- ChatClient owns Arc<Mutex<Core>> + a worker thread.
- The worker select!s over the inbound and shutdown channels; Drop joins it.
  Outbound runs on the caller's thread.
- A single Transport (DeliveryService + inbound()) owns both directions of the
  boundary, so the client takes one transport rather than a (delivery, inbound)
  pair. InProcessDelivery::new, CDelivery, and chat-cli's transports implement it.
- FFI replaces client_receive with client_push_inbound + client_poll_events.
- chat-cli drains Receiver<Event>; inbound and event channels are both crossbeam.
- Corrects ADR 0001's inbound sequence to push — the worker parks on select!,
  it never polls.
This commit is contained in:
osmaczko 2026-06-11 10:08:07 +02:00 committed by GitHub
parent a610117e81
commit 7838d43b30
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 601 additions and 338 deletions

14
Cargo.lock generated
View File

@ -356,6 +356,7 @@ dependencies = [
"arboard",
"base64",
"clap",
"crossbeam-channel",
"crossterm 0.29.0",
"logos-chat",
"ratatui",
@ -441,6 +442,7 @@ checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9"
name = "client-ffi"
version = "0.1.0"
dependencies = [
"crossbeam-channel",
"libchat",
"logos-chat",
"safer-ffi",
@ -534,6 +536,15 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.6"
@ -2124,10 +2135,13 @@ version = "0.1.0"
dependencies = [
"chat-sqlite",
"components",
"crossbeam-channel",
"libchat",
"logos-account",
"parking_lot",
"tempfile",
"thiserror",
"tracing",
]
[[package]]

View File

@ -44,6 +44,7 @@ storage = { path = "core/storage" }
# External Workspace dependency declarations (sorted)
blake2 = "0.10"
crossbeam-channel = "0.5"
# Panicking across FFI boundaries is UB; abort is the correct strategy for a
# C FFI library.

View File

@ -9,6 +9,7 @@ path = "src/main.rs"
[dependencies]
# Workspace dependencies (sorted)
crossbeam-channel = { workspace = true }
logos-chat = { workspace = true }
# External dependencies (sorted)
@ -16,7 +17,6 @@ anyhow = "1.0"
arboard = "3"
base64 = "0.22"
clap = { version = "4", features = ["derive"] }
crossterm = "0.29"
ratatui = "0.29"
serde = { version = "1.0", features = ["derive"] }

View File

@ -1,10 +1,10 @@
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use anyhow::Result;
use arboard::Clipboard;
use crossbeam_channel::Receiver;
use logos_chat::{ChatClient, DeliveryService, EphemeralRegistry, Event, RegistrationService};
use serde::{Deserialize, Serialize};
@ -41,9 +41,9 @@ pub struct AppState {
pub active_chat: Option<String>,
}
pub struct ChatApp<D: DeliveryService, R: RegistrationService = EphemeralRegistry> {
pub client: ChatClient<D, R>,
inbound: mpsc::Receiver<Vec<u8>>,
pub struct ChatApp<T: DeliveryService, R: RegistrationService = EphemeralRegistry> {
pub client: ChatClient<T, R>,
events: Receiver<Event>,
pub state: AppState,
/// Ephemeral command output — not persisted, cleared on chat switch.
command_output: Vec<DisplayMessage>,
@ -53,14 +53,14 @@ pub struct ChatApp<D: DeliveryService, R: RegistrationService = EphemeralRegistr
state_path: PathBuf,
}
impl<D, R> ChatApp<D, R>
impl<T, R> ChatApp<T, R>
where
D: DeliveryService + 'static,
R: RegistrationService + 'static,
T: DeliveryService + Send + 'static,
R: RegistrationService + Send + 'static,
{
pub fn new(
client: ChatClient<D, R>,
inbound: mpsc::Receiver<Vec<u8>>,
client: ChatClient<T, R>,
events: Receiver<Event>,
user_name: &str,
data_dir: &Path,
) -> Result<Self> {
@ -80,7 +80,7 @@ where
Ok(Self {
client,
inbound,
events,
state,
command_output: Vec::new(),
input: String::new(),
@ -146,19 +146,13 @@ where
}
pub fn process_incoming(&mut self) -> Result<()> {
while let Ok(payload) = self.inbound.try_recv() {
match self.client.receive(&payload) {
Ok(events) => {
for event in events {
self.handle_event(event);
}
self.save_state()?;
}
Err(e) => {
tracing::warn!("receive error: {e:?}");
self.status = format!("Could not decrypt incoming message: {e}");
}
}
let mut received = false;
while let Ok(event) = self.events.try_recv() {
self.handle_event(event);
received = true;
}
if received {
self.save_state()?;
}
Ok(())
}
@ -195,6 +189,9 @@ where
timestamp: now(),
});
}
Event::InboundError { message } => {
self.status = format!("Could not process incoming message: {message}");
}
_ => {}
}
}

View File

@ -4,11 +4,13 @@ mod ui;
mod utils;
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use anyhow::{Context, Result};
use clap::{Parser, ValueEnum};
use logos_chat::{ChatClient, DeliveryService, HttpRegistry, RegistrationService, StorageConfig};
use crossbeam_channel::Receiver;
use logos_chat::{
ChatClient, DeliveryService, Event, HttpRegistry, RegistrationService, StorageConfig, Transport,
};
use app::ChatApp;
@ -72,9 +74,9 @@ fn main() -> Result<()> {
match cli.transport {
TransportKind::File => {
let transport_dir = cli.data.join("transport");
let (transport, inbound) = transport::file::FileTransport::new(&transport_dir)
let transport = transport::file::FileTransport::new(&transport_dir)
.context("failed to create file transport")?;
run(transport, inbound, &cli)
run(transport, &cli)
}
#[cfg(logos_delivery)]
TransportKind::LogosDelivery => {
@ -88,20 +90,15 @@ fn main() -> Result<()> {
tcp_port: cli.port,
..Default::default()
};
let (transport, inbound) =
Service::start(cfg).context("failed to start logos-delivery")?;
let transport = Service::start(cfg).context("failed to start logos-delivery")?;
println!("Node connected. Initializing chat client...");
run(transport, inbound, &cli)
run(transport, &cli)
}
}
}
fn run<D: DeliveryService + 'static>(
transport: D,
inbound: mpsc::Receiver<Vec<u8>>,
cli: &Cli,
) -> Result<()> {
fn run<T: Transport>(transport: T, cli: &Cli) -> Result<()> {
let db_path = cli
.db
.clone()
@ -118,31 +115,27 @@ fn run<D: DeliveryService + 'static>(
match cli.registry_url.as_deref() {
Some(url) => {
let registry = HttpRegistry::new(url);
let client =
let (client, events) =
ChatClient::open_with_registry(cli.name.clone(), storage, transport, registry)
.map_err(|e| anyhow::anyhow!("{e:?}"))
.context("failed to open chat client with HTTP registry")?;
launch_tui(client, inbound, cli)
launch_tui(client, events, cli)
}
None => {
let client = ChatClient::open(cli.name.clone(), storage, transport)
let (client, events) = ChatClient::open(cli.name.clone(), storage, transport)
.map_err(|e| anyhow::anyhow!("{e:?}"))
.context("failed to open chat client")?;
launch_tui(client, inbound, cli)
launch_tui(client, events, cli)
}
}
}
fn launch_tui<D, R>(
client: ChatClient<D, R>,
inbound: mpsc::Receiver<Vec<u8>>,
cli: &Cli,
) -> Result<()>
fn launch_tui<T, R>(client: ChatClient<T, R>, events: Receiver<Event>, cli: &Cli) -> Result<()>
where
D: DeliveryService + 'static,
R: RegistrationService + 'static,
T: DeliveryService + Send + 'static,
R: RegistrationService + Send + 'static,
{
let mut app = ChatApp::new(client, inbound, &cli.name, &cli.data)?;
let mut app = ChatApp::new(client, events, &cli.name, &cli.data)?;
if cli.smoketest {
return Ok(());
@ -168,8 +161,7 @@ fn run_logos_delivery(cli: Cli) -> Result<()> {
tcp_port: cli.port,
..Default::default()
};
let (delivery, inbound) =
Service::start(logos_cfg).context("failed to start logos-delivery")?;
let delivery = Service::start(logos_cfg).context("failed to start logos-delivery")?;
eprintln!("Node connected. Initializing chat client...");
@ -180,7 +172,7 @@ fn run_logos_delivery(cli: Cli) -> Result<()> {
.map(|p| p.to_path_buf())
.unwrap_or_else(|| cli.data.clone());
let client = match cli.db {
let (client, events) = match cli.db {
Some(ref path) => {
let db_str = path
.to_str()
@ -200,7 +192,7 @@ fn run_logos_delivery(cli: Cli) -> Result<()> {
None => logos_chat::ChatClient::new(cli.name.clone(), delivery),
};
let mut app = ChatApp::new(client, inbound, &cli.name, &data_dir)?;
let mut app = ChatApp::new(client, events, &cli.name, &data_dir)?;
if cli.smoketest {
return Ok(());
@ -219,10 +211,10 @@ fn run_logos_delivery(cli: Cli) -> Result<()> {
)
}
fn run_app<D, R>(terminal: &mut ui::Tui, app: &mut ChatApp<D, R>) -> Result<()>
fn run_app<T, R>(terminal: &mut ui::Tui, app: &mut ChatApp<T, R>) -> Result<()>
where
D: DeliveryService + 'static,
R: RegistrationService + 'static,
T: DeliveryService + Send + 'static,
R: RegistrationService + Send + 'static,
{
loop {
app.process_incoming()?;

View File

@ -2,11 +2,11 @@ use std::collections::BTreeMap;
use std::fs::{self, File, OpenOptions};
use std::io::{self, BufReader, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use logos_chat::{AddressedEnvelope, DeliveryService};
use crossbeam_channel::{Receiver, Sender, bounded};
use logos_chat::{AddressedEnvelope, DeliveryService, Transport};
#[derive(Debug, thiserror::Error)]
pub enum FileTransportError {
@ -17,31 +17,31 @@ pub enum FileTransportError {
#[derive(Debug)]
pub struct FileTransport {
transport_dir: PathBuf,
inbound_rx: Option<Receiver<Vec<u8>>>,
}
impl FileTransport {
/// All instances pointing at the same `transport_dir` share one broadcast bus.
///
/// Messages are written to `{transport_dir}/{delivery_address}/{hours_since_epoch}.bin`
/// as length-prefixed frames (`[u32 BE length][payload bytes]`). The background
/// thread reads all files under `transport_dir` and forwards every frame to
/// the returned channel; `client.receive()` discards frames it cannot decrypt.
pub fn new(transport_dir: &Path) -> io::Result<(Self, mpsc::Receiver<Vec<u8>>)> {
/// as length-prefixed frames (`[u32 BE length][payload bytes]`). A background
/// thread reads all files under `transport_dir` and forwards every frame to the
/// inbound stream the client drains via [`Transport::inbound`] (discarding frames
/// it cannot decrypt).
pub fn new(transport_dir: &Path) -> io::Result<Self> {
fs::create_dir_all(transport_dir)?;
let (tx, rx) = mpsc::sync_channel(1024);
let (tx, rx) = bounded(1024);
let dir = transport_dir.to_path_buf();
thread::Builder::new()
.name("file-transport".into())
.spawn(move || poll_reader(dir, tx))?;
Ok((
Self {
transport_dir: transport_dir.to_path_buf(),
},
rx,
))
Ok(Self {
transport_dir: transport_dir.to_path_buf(),
inbound_rx: Some(rx),
})
}
}
@ -68,6 +68,14 @@ impl DeliveryService for FileTransport {
}
}
impl Transport for FileTransport {
fn inbound(&mut self) -> Receiver<Vec<u8>> {
self.inbound_rx
.take()
.expect("FileTransport::inbound called more than once")
}
}
/// Hours since Unix epoch — used as the rolling filename.
fn current_hour() -> u64 {
SystemTime::now()
@ -77,7 +85,7 @@ fn current_hour() -> u64 {
/ 3600
}
fn poll_reader(transport_dir: PathBuf, tx: mpsc::SyncSender<Vec<u8>>) {
fn poll_reader(transport_dir: PathBuf, tx: Sender<Vec<u8>>) {
// Maps absolute file path → number of bytes already consumed.
let mut offsets: BTreeMap<PathBuf, u64> = BTreeMap::new();

View File

@ -18,7 +18,8 @@ use std::time::Duration;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64;
use logos_chat::{AddressedEnvelope, DeliveryService};
use crossbeam_channel::{Receiver, Sender};
use logos_chat::{AddressedEnvelope, DeliveryService, Transport};
use tracing::{error, info, warn};
use wrapper::LogosNodeCtx;
@ -46,7 +47,7 @@ struct OutboundCmd {
reply: mpsc::SyncSender<Result<(), DeliveryError>>,
}
type SubscriberList = Arc<Mutex<Vec<mpsc::SyncSender<Vec<u8>>>>>;
type SubscriberList = Arc<Mutex<Vec<Sender<Vec<u8>>>>>;
// ── Config ───────────────────────────────────────────────────────────────────
@ -123,18 +124,19 @@ pub struct Service {
outbound: mpsc::SyncSender<OutboundCmd>,
#[allow(dead_code)]
subscribers: SubscriberList,
inbound_rx: Option<Receiver<Vec<u8>>>,
}
impl Service {
/// Start the embedded logos-delivery node. Returns the service and a
/// receiver for inbound raw payloads.
pub fn start(cfg: Config) -> Result<(Self, mpsc::Receiver<Vec<u8>>), DeliveryError> {
/// Start the embedded logos-delivery node. The client drains inbound
/// payloads via [`Transport::inbound`].
pub fn start(cfg: Config) -> Result<Self, DeliveryError> {
let (out_tx, out_rx) = mpsc::sync_channel::<OutboundCmd>(256);
let subscribers: SubscriberList = Arc::new(Mutex::new(Vec::new()));
let (ready_tx, ready_rx) = mpsc::channel::<Result<(), DeliveryError>>();
// Create the inbound channel before spawning so the receiver is
// registered inside the thread, before any event callback fires.
let (inbound_tx, inbound_rx) = mpsc::sync_channel::<Vec<u8>>(1024);
let (inbound_tx, inbound_rx) = crossbeam_channel::bounded::<Vec<u8>>(1024);
let subs_for_thread = subscribers.clone();
@ -167,20 +169,18 @@ impl Service {
return Err(e);
}
Ok((
Self {
outbound: out_tx,
subscribers,
},
inbound_rx,
))
Ok(Self {
outbound: out_tx,
subscribers,
inbound_rx: Some(inbound_rx),
})
}
fn node_thread(
cfg: Config,
out_rx: mpsc::Receiver<OutboundCmd>,
subscribers: SubscriberList,
inbound_tx: mpsc::SyncSender<Vec<u8>>,
inbound_tx: Sender<Vec<u8>>,
ready_tx: mpsc::Sender<Result<(), DeliveryError>>,
) {
// discv5UdpPort defaults to 9000 in libwaku, so a second instance with
@ -220,8 +220,8 @@ impl Service {
};
guard.retain(|tx| match tx.try_send(payload.clone()) {
Ok(()) => true,
Err(mpsc::TrySendError::Full(_)) => true,
Err(mpsc::TrySendError::Disconnected(_)) => false,
Err(crossbeam_channel::TrySendError::Full(_)) => true,
Err(crossbeam_channel::TrySendError::Disconnected(_)) => false,
});
}
};
@ -306,3 +306,11 @@ impl DeliveryService for Service {
Ok(())
}
}
impl Transport for Service {
fn inbound(&mut self) -> Receiver<Vec<u8>> {
self.inbound_rx
.take()
.expect("Service::inbound called more than once")
}
}

View File

@ -38,7 +38,7 @@ pub fn restore() -> io::Result<()> {
}
/// Draw the UI.
pub fn draw<D: DeliveryService + 'static, R: RegistrationService + 'static>(
pub fn draw<D: DeliveryService + Send + 'static, R: RegistrationService + Send + 'static>(
frame: &mut Frame,
app: &ChatApp<D, R>,
) {
@ -58,7 +58,7 @@ pub fn draw<D: DeliveryService + 'static, R: RegistrationService + 'static>(
draw_status(frame, app, chunks[3]);
}
fn draw_header<D: DeliveryService + 'static, R: RegistrationService + 'static>(
fn draw_header<D: DeliveryService + Send + 'static, R: RegistrationService + Send + 'static>(
frame: &mut Frame,
app: &ChatApp<D, R>,
area: Rect,
@ -85,7 +85,7 @@ fn draw_header<D: DeliveryService + 'static, R: RegistrationService + 'static>(
frame.render_widget(header, area);
}
fn draw_messages<D: DeliveryService + 'static, R: RegistrationService + 'static>(
fn draw_messages<D: DeliveryService + Send + 'static, R: RegistrationService + Send + 'static>(
frame: &mut Frame,
app: &ChatApp<D, R>,
area: Rect,
@ -175,7 +175,7 @@ fn draw_messages<D: DeliveryService + 'static, R: RegistrationService + 'static>
frame.render_stateful_widget(messages_widget, area, &mut list_state);
}
fn draw_input<D: DeliveryService + 'static, R: RegistrationService + 'static>(
fn draw_input<D: DeliveryService + Send + 'static, R: RegistrationService + Send + 'static>(
frame: &mut Frame,
app: &ChatApp<D, R>,
area: Rect,
@ -206,7 +206,7 @@ fn draw_input<D: DeliveryService + 'static, R: RegistrationService + 'static>(
frame.set_cursor_position((cursor_x, area.y + 1));
}
fn draw_status<D: DeliveryService + 'static, R: RegistrationService + 'static>(
fn draw_status<D: DeliveryService + Send + 'static, R: RegistrationService + Send + 'static>(
frame: &mut Frame,
app: &ChatApp<D, R>,
area: Rect,
@ -220,7 +220,10 @@ fn draw_status<D: DeliveryService + 'static, R: RegistrationService + 'static>(
}
/// Handle keyboard events.
pub fn handle_events<D: DeliveryService + 'static, R: RegistrationService + 'static>(
pub fn handle_events<
D: DeliveryService + Send + 'static,
R: RegistrationService + Send + 'static,
>(
app: &mut ChatApp<D, R>,
) -> io::Result<bool> {
// Poll for events with a short timeout to allow checking incoming messages

View File

@ -12,6 +12,7 @@ required-features = ["headers"]
[dependencies]
# Workspace dependencies (sorted)
crossbeam-channel = { workspace = true }
libchat = { workspace = true }
logos-chat = { workspace = true }

View File

@ -83,9 +83,9 @@ static int32_t deliver_cb(
}
/* ------------------------------------------------------------------
* Helper: pop one envelope from the bus and push it into receiver.
* Returns a heap-allocated event list; caller frees with
* event_list_free().
* Helper: pop one envelope from the bus, hand it to receiver's worker,
* then wait for the worker to produce events. Returns a heap-allocated
* event list; caller frees with event_list_free().
* ------------------------------------------------------------------ */
static EventList_t *route(ClientHandle_t *receiver)
@ -94,8 +94,11 @@ static EventList_t *route(ClientHandle_t *receiver)
size_t len;
int ok = queue_pop(&bus, &data, &len);
assert(ok && "expected an envelope in the bus");
EventList_t *evs = client_receive(receiver, SLICE(data, len));
assert(event_list_error_code(evs) == 0 && "client_receive failed");
client_push_inbound(receiver, SLICE(data, len));
/* Block until the worker decrypts the payload and produces events. */
EventList_t *evs = client_wait_events(receiver, 5000);
assert(event_list_len(evs) > 0 && "timed out waiting for events");
return evs;
}

View File

@ -1,5 +1,7 @@
use safer_ffi::prelude::*;
use crossbeam_channel::{Receiver, Sender};
use crate::delivery::{CDelivery, DeliverFn};
use libchat::ChatError;
use logos_chat::{ChatClient, ClientError, ConversationClass, Event};
@ -10,7 +12,11 @@ use logos_chat::{ChatClient, ClientError, ConversationClass, Event};
#[derive_ReprC]
#[repr(opaque)]
pub struct ClientHandle(pub(crate) ChatClient<CDelivery>);
pub struct ClientHandle {
client: ChatClient<CDelivery>,
events: Receiver<Event>,
inbound: Sender<Vec<u8>>,
}
// ---------------------------------------------------------------------------
// Error codes
@ -155,8 +161,17 @@ fn client_create(
Err(_) => return None,
};
callback?;
let delivery = CDelivery { callback };
Some(Box::new(ClientHandle(ChatClient::new(name_str, delivery))).into())
let (inbound_tx, inbound_rx) = crossbeam_channel::unbounded();
let delivery = CDelivery::new(callback, inbound_rx);
let (client, events) = ChatClient::new(name_str, delivery);
Some(
Box::new(ClientHandle {
client,
events,
inbound: inbound_tx,
})
.into(),
)
}
/// Free a client handle. Must not be used after this call.
@ -174,7 +189,7 @@ fn client_destroy(handle: repr_c::Box<ClientHandle>) {
#[ffi_export]
fn client_installation_name(handle: &ClientHandle) -> c_slice::Box<u8> {
handle
.0
.client
.installation_name()
.as_bytes()
.to_vec()
@ -195,7 +210,7 @@ fn client_installation_name_free(name: c_slice::Box<u8>) {
/// Free with `create_intro_result_free`.
#[ffi_export]
fn client_create_intro_bundle(handle: &mut ClientHandle) -> repr_c::Box<CreateIntroResult> {
let result = match handle.0.create_intro_bundle() {
let result = match handle.client.create_intro_bundle() {
Ok(bytes) => CreateIntroResult {
error_code: ErrorCode::None as i32,
data: Some(bytes),
@ -239,7 +254,7 @@ fn client_create_conversation(
content: c_slice::Ref<'_, u8>,
) -> repr_c::Box<CreateConvoResult> {
let result = match handle
.0
.client
.create_conversation(bundle.as_slice(), content.as_slice())
{
Ok(convo_id) => CreateConvoResult {
@ -290,7 +305,7 @@ fn client_send_message(
Ok(s) => s,
Err(_) => return ErrorCode::BadUtf8,
};
match handle.0.send_message(id_str, content.as_slice()) {
match handle.client.send_message(id_str, content.as_slice()) {
Ok(()) => ErrorCode::None,
Err(ClientError::Chat(ChatError::Delivery(_))) => ErrorCode::DeliveryFail,
Err(_) => ErrorCode::UnknownError,
@ -298,31 +313,49 @@ fn client_send_message(
}
// ---------------------------------------------------------------------------
// Receive (process inbound, get event list back)
// Inbound (push wire payloads in, drain events out)
// ---------------------------------------------------------------------------
/// Decrypt an inbound payload. Returns the events the payload produced;
/// the list may be empty for protocol-only frames. Free with
/// `event_list_free`.
/// Feed an inbound payload (read off the wire by the host) to the client's
/// worker, which decrypts it and produces events for `client_poll_events`.
#[ffi_export]
fn client_receive(
handle: &mut ClientHandle,
payload: c_slice::Ref<'_, u8>,
) -> repr_c::Box<EventList> {
let result = match handle.0.receive(payload.as_slice()) {
Ok(events) => EventList {
error_code: ErrorCode::None as i32,
events: events
.into_iter()
.filter_map(EventRow::from_event)
.collect(),
},
Err(ClientError::Chat(_)) => EventList {
error_code: ErrorCode::BadPayload as i32,
events: Vec::new(),
},
};
Box::new(result).into()
fn client_push_inbound(handle: &ClientHandle, payload: c_slice::Ref<'_, u8>) {
// Disconnected only if the worker has stopped; nothing to do then.
let _ = handle.inbound.send(payload.as_slice().to_vec());
}
/// Drain every event the worker has produced since the last call. The list may
/// be empty. Free with `event_list_free`.
#[ffi_export]
fn client_poll_events(handle: &ClientHandle) -> repr_c::Box<EventList> {
let events = handle
.events
.try_iter()
.filter_map(EventRow::from_event)
.collect();
Box::new(EventList {
error_code: ErrorCode::None as i32,
events,
})
.into()
}
/// Block until the worker produces an event or `timeout_ms` elapses, then drain
/// everything available. Parks on the channel (no busy-wait); an empty list
/// means timeout or a stopped worker. Free with `event_list_free`.
#[ffi_export]
fn client_wait_events(handle: &ClientHandle, timeout_ms: u64) -> repr_c::Box<EventList> {
let timeout = std::time::Duration::from_millis(timeout_ms);
let mut events = Vec::new();
if let Ok(first) = handle.events.recv_timeout(timeout) {
events.extend(EventRow::from_event(first));
events.extend(handle.events.try_iter().filter_map(EventRow::from_event));
}
Box::new(EventList {
error_code: ErrorCode::None as i32,
events,
})
.into()
}
#[ffi_export]

View File

@ -1,5 +1,6 @@
use crossbeam_channel::Receiver;
use libchat::AddressedEnvelope;
use logos_chat::DeliveryService;
use logos_chat::{DeliveryService, Transport};
/// C callback invoked for each outbound envelope. Return 0 or positive on success, negative on
/// error. `addr_ptr/addr_len` is the delivery address; `data_ptr/data_len` is the encrypted
@ -15,9 +16,18 @@ pub type DeliverFn = Option<
>;
#[derive(Debug)]
pub struct CDelivery {
pub callback: DeliverFn,
inbound_rx: Option<Receiver<Vec<u8>>>,
}
impl CDelivery {
pub fn new(callback: DeliverFn, inbound: Receiver<Vec<u8>>) -> Self {
Self {
callback,
inbound_rx: Some(inbound),
}
}
}
impl DeliveryService for CDelivery {
@ -36,3 +46,11 @@ impl DeliveryService for CDelivery {
Ok(())
}
}
impl Transport for CDelivery {
fn inbound(&mut self) -> Receiver<Vec<u8>> {
self.inbound_rx
.take()
.expect("CDelivery::inbound called more than once")
}
}

View File

@ -9,12 +9,15 @@ crate-type = ["rlib"]
[dependencies]
# Workspace dependencies (sorted)
chat-sqlite = { workspace = true }
components = { workspace = true}
components = { workspace = true }
crossbeam-channel = { workspace = true }
libchat = { workspace = true }
logos-account = { workspace = true, features = ["dev"]}
# External dependencies (sorted)
parking_lot = "0.12"
thiserror = "2"
tracing = "0.1"
[dev-dependencies]
# External dependencies (sorted)

View File

@ -1,39 +1,41 @@
use logos_chat::{ChatClient, ConversationId, Event, InProcessDelivery};
use logos_chat::{ChatClient, Event, InProcessDelivery, MessageBus};
use std::time::Duration;
fn main() {
let delivery = InProcessDelivery::new(Default::default());
let mut cursor = delivery.cursor_at_tail("delivery_address");
let bus = MessageBus::default();
let saro_delivery = InProcessDelivery::new(bus.clone());
let raya_delivery = InProcessDelivery::new(bus);
let mut saro = ChatClient::new("saro", delivery.clone());
let mut raya = ChatClient::new("raya", delivery);
let (mut saro, saro_events) = ChatClient::new("saro", saro_delivery);
let (mut raya, raya_events) = ChatClient::new("raya", raya_delivery);
let raya_bundle = raya.create_intro_bundle().unwrap();
saro.create_conversation(&raya_bundle, b"hello raya")
.unwrap();
let raw = cursor.next().unwrap();
let events = raya.receive(&raw).unwrap();
let raya_convo_id: ConversationId = events
.iter()
.find_map(|e| match e {
Event::ConversationStarted { convo_id, .. } => Some(convo_id.to_string()),
_ => None,
})
.expect("expected ConversationStarted");
for event in &events {
if let Event::MessageReceived { content, .. } = event {
println!("Raya received: {:?}", std::str::from_utf8(content).unwrap());
}
// Raya's worker delivers the new conversation, then its initial message.
let raya_convo_id = match raya_events.recv_timeout(Duration::from_secs(5)).unwrap() {
Event::ConversationStarted { convo_id, .. } => convo_id,
other => panic!("expected ConversationStarted, got {other:?}"),
};
if let Event::MessageReceived { content, .. } =
raya_events.recv_timeout(Duration::from_secs(5)).unwrap()
{
println!(
"Raya received: {:?}",
std::str::from_utf8(&content).unwrap()
);
}
raya.send_message(&raya_convo_id, b"hi saro").unwrap();
let raw = cursor.next().unwrap();
let events = saro.receive(&raw).unwrap();
for event in &events {
if let Event::MessageReceived { content, .. } = event {
println!("Saro received: {:?}", std::str::from_utf8(content).unwrap());
}
if let Event::MessageReceived { content, .. } =
saro_events.recv_timeout(Duration::from_secs(5)).unwrap()
{
println!(
"Saro received: {:?}",
std::str::from_utf8(&content).unwrap()
);
}
println!("Message exchange complete.");

View File

@ -1,31 +1,65 @@
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use components::EphemeralRegistry;
use crossbeam_channel::{Receiver, Sender, select};
use libchat::{
ChatError, ChatStorage, ConversationId, ConvoOutcome, Core, DeliveryService, InboxOutcome,
Introduction, PayloadOutcome, RegistrationService, StorageConfig,
};
use components::EphemeralRegistry;
use logos_account::TestLogosAccount;
use parking_lot::Mutex;
use crate::errors::ClientError;
use crate::event::Event;
pub struct ChatClient<D: DeliveryService, R: RegistrationService = EphemeralRegistry> {
core: Core<(TestLogosAccount, D, R, ChatStorage)>,
type ClientCore<T, R> = Core<(TestLogosAccount, T, R, ChatStorage)>;
/// The transport as the client sees it: a [`DeliveryService`] for outbound
/// publishing plus the inbound payload stream the worker drains. One object owns
/// both directions of the boundary.
pub trait Transport: DeliveryService + Send + 'static {
/// Hand over the inbound payload stream. Called once, at client construction,
/// before the [`Core`] takes ownership of the service.
fn inbound(&mut self) -> Receiver<Vec<u8>>;
}
/// High-level chat client.
///
/// Owns the synchronous [`Core`] behind an `Arc<Mutex<…>>` and a background
/// worker that consumes inbound payloads off the transport's channel, drives
/// the core, and forwards observations as [`Event`]s. Construction returns the
/// handle together with the `Receiver<Event>` the application drains on its own
/// schedule.
///
/// Outbound calls (`send_message`, `create_conversation`, …) run on the
/// caller's thread: they briefly lock the core, invoke it, and return — no
/// message-passing round-trip. The `Arc`/`Mutex`/threads live entirely here;
/// the core never mentions threads.
pub struct ChatClient<T: DeliveryService, R: RegistrationService = EphemeralRegistry> {
/// `parking_lot::Mutex` for its eventual fairness: an inbound burst can't
/// starve caller operations of the lock.
core: Arc<Mutex<ClientCore<T, R>>>,
/// Dropped on `Drop` to wake the worker's `select!` and shut it down.
shutdown: Option<Sender<()>>,
worker: Option<JoinHandle<()>>,
}
// ── Default-registry constructors ────────────────────────────────────────────
impl<D: DeliveryService + 'static> ChatClient<D, EphemeralRegistry> {
impl<T: Transport> ChatClient<T, EphemeralRegistry> {
/// Create an in-memory, ephemeral client. Identity is lost on drop.
pub fn new(name: impl Into<String>, delivery: D) -> Self {
let registry = EphemeralRegistry::new();
let store = ChatStorage::in_memory();
pub fn new(name: impl Into<String>, mut transport: T) -> (Self, Receiver<Event>) {
let inbound = transport.inbound();
let ident = TestLogosAccount::new(name);
Self {
core: Core::new_with_name(ident, delivery, registry, store).unwrap(),
}
let core = Core::new_with_name(
ident,
transport,
EphemeralRegistry::new(),
ChatStorage::in_memory(),
)
.unwrap();
Self::spawn(core, inbound)
}
/// Open or create a persistent client backed by `StorageConfig`.
@ -35,22 +69,22 @@ impl<D: DeliveryService + 'static> ChatClient<D, EphemeralRegistry> {
pub fn open(
name: impl Into<String>,
config: StorageConfig,
delivery: D,
) -> Result<Self, ClientError> {
mut transport: T,
) -> Result<(Self, Receiver<Event>), ClientError> {
let store = ChatStorage::new(config).map_err(ChatError::from)?;
let registry = EphemeralRegistry::new();
let inbound = transport.inbound();
let ident = TestLogosAccount::new(name);
let core = Core::new_from_store(ident, delivery, registry, store)?;
Ok(Self { core })
let core = Core::new_from_store(ident, transport, EphemeralRegistry::new(), store)?;
Ok(Self::spawn(core, inbound))
}
}
// ── Caller-supplied registry + shared methods ────────────────────────────────
impl<D, R> ChatClient<D, R>
impl<T, R> ChatClient<T, R>
where
D: DeliveryService + 'static,
R: RegistrationService + 'static,
T: DeliveryService + Send + 'static,
R: RegistrationService + Send + 'static,
{
/// Open or create a persistent client with a caller-supplied registration
/// service. Use this to swap in a network-backed registry (e.g. the
@ -63,29 +97,52 @@ where
pub fn open_with_registry(
name: impl Into<String>,
config: StorageConfig,
delivery: D,
mut transport: T,
registry: R,
) -> Result<Self, ClientError> {
) -> Result<(Self, Receiver<Event>), ClientError>
where
T: Transport,
{
let store = ChatStorage::new(config).map_err(ChatError::from)?;
let inbound = transport.inbound();
let ident = TestLogosAccount::new(name);
let mut core = Core::new_from_store(ident, delivery, registry, store)?;
let mut core = Core::new_from_store(ident, transport, registry, store)?;
core.register_keypackage()?;
Ok(Self { core })
Ok(Self::spawn(core, inbound))
}
fn spawn(core: ClientCore<T, R>, inbound: Receiver<Vec<u8>>) -> (Self, Receiver<Event>) {
let core = Arc::new(Mutex::new(core));
let (event_tx, event_rx) = crossbeam_channel::unbounded();
let (shutdown_tx, shutdown_rx) = crossbeam_channel::bounded::<()>(0);
let worker = thread::spawn({
let core = Arc::clone(&core);
move || worker_loop(core, inbound, shutdown_rx, event_tx)
});
(
Self {
core,
shutdown: Some(shutdown_tx),
worker: Some(worker),
},
event_rx,
)
}
/// Returns the installation name (identity label) of this client.
pub fn installation_name(&self) -> &str {
self.core.installation_name()
pub fn installation_name(&self) -> String {
self.core.lock().installation_name().to_string()
}
/// Produce a serialised introduction bundle for sharing out-of-band.
pub fn create_intro_bundle(&mut self) -> Result<Vec<u8>, ClientError> {
self.core.create_intro_bundle().map_err(Into::into)
self.core.lock().create_intro_bundle().map_err(Into::into)
}
/// Parse intro bundle bytes and initiate a private conversation. Returns
/// this side's conversation ID.
/// Parse intro bundle bytes and initiate a private conversation. Outbound
/// envelopes are published by the core. Returns this side's conversation ID.
pub fn create_conversation(
&mut self,
intro_bundle: &[u8],
@ -93,31 +150,79 @@ where
) -> Result<ConversationId, ClientError> {
let intro = Introduction::try_from(intro_bundle)?;
self.core
.lock()
.create_private_convo(&intro, initial_content)
.map_err(Into::into)
}
/// List all conversation IDs known to this client.
pub fn list_conversations(&self) -> Result<Vec<ConversationId>, ClientError> {
self.core.list_conversations().map_err(Into::into)
self.core.lock().list_conversations().map_err(Into::into)
}
/// Encrypt and send `content` to an existing conversation.
/// Encrypt and send `content` to an existing conversation. The core
/// publishes the outbound envelope.
pub fn send_message(&mut self, convo_id: &str, content: &[u8]) -> Result<(), ClientError> {
self.core
.lock()
.send_content(convo_id, content)
.map_err(Into::into)
}
}
/// Decrypt an inbound payload. Returns the events the payload produced,
/// in causal order. May be empty for protocol-only frames.
pub fn receive(&mut self, payload: &[u8]) -> Result<Vec<Event>, ClientError> {
let result = self.core.handle_payload(payload)?;
Ok(events_from_inbound(result))
impl<T: DeliveryService, R: RegistrationService> Drop for ChatClient<T, R> {
fn drop(&mut self) {
// Dropping the sender disconnects the worker's shutdown channel, waking
// its `select!` so it can exit; then we join it.
self.shutdown.take();
if let Some(handle) = self.worker.take() {
let _ = handle.join();
}
}
}
/// Walk an [`PayloadOutcome`] in causal order and emit one `Event` per
/// Background loop: block until an inbound payload or shutdown arrives, drive
/// the core on each payload, and forward events. No polling — `select!` parks
/// the thread until one of the channels is ready.
fn worker_loop<T, R>(
core: Arc<Mutex<ClientCore<T, R>>>,
inbound: Receiver<Vec<u8>>,
shutdown: Receiver<()>,
event_tx: Sender<Event>,
) where
T: DeliveryService + Send + 'static,
R: RegistrationService + Send + 'static,
{
loop {
select! {
recv(inbound) -> msg => {
let Ok(bytes) = msg else {
return; // transport's sender dropped
};
let events = {
let mut core = core.lock();
match core.handle_payload(&bytes) {
Ok(outcome) => events_from_inbound(outcome),
Err(e) => {
tracing::warn!("inbound handle_payload failed: {e:?}");
vec![Event::InboundError {
message: e.to_string(),
}]
}
}
};
for event in events {
if event_tx.send(event).is_err() {
return; // application dropped the receiver
}
}
}
recv(shutdown) -> _ => return,
}
}
}
/// Walk a [`PayloadOutcome`] in causal order and emit one `Event` per
/// observation. For an `Inbox` outcome, [`Event::ConversationStarted`]
/// precedes the message event. The convo id is wrapped into `Arc<str>` once
/// per outcome and shared across the events it produces.

View File

@ -1,103 +1,62 @@
use crate::{AddressedEnvelope, DeliveryService};
use crate::{AddressedEnvelope, DeliveryService, Transport};
use crossbeam_channel::{Receiver, Sender, unbounded};
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex};
type Message = Vec<u8>;
/// Shared in-process message bus. Cheap to clone — all clones share the same log.
///
/// Messages are stored in an append-only log per delivery address. Readers hold
/// independent [`Cursor`]s and advance their position without consuming messages,
/// so multiple consumers on the same address each see every message.
/// Shared in-process message bus. Cheap to clone — all clones share one routing
/// table. On `publish`, a message is fanned out to every endpoint subscribed to
/// its delivery address.
#[derive(Clone, Default, Debug)]
pub struct MessageBus {
log: Arc<RwLock<HashMap<String, Vec<Message>>>>,
routes: Arc<Mutex<HashMap<String, Vec<Sender<Message>>>>>,
}
impl MessageBus {
/// Returns a cursor positioned at the beginning of `address`.
/// The cursor will see all messages — past and future.
pub fn cursor(&self, address: &str) -> Cursor {
Cursor {
bus: self.clone(),
address: address.to_string(),
pos: 0,
fn register(&self, address: &str, sender: Sender<Message>) {
let mut routes = self.routes.lock().unwrap();
let senders = routes.entry(address.to_string()).or_default();
// Idempotent per endpoint: the core re-subscribes an address whenever it
// rebuilds a conversation, so skip senders already registered for it —
// otherwise each payload reaches that endpoint more than once.
if senders.iter().any(|s| s.same_channel(&sender)) {
return;
}
senders.push(sender);
}
/// Returns a cursor positioned at the current tail of `address`.
/// The cursor will only see messages delivered after this call.
pub fn cursor_at_tail(&self, address: &str) -> Cursor {
let pos = self.log.read().unwrap().get(address).map_or(0, |v| v.len());
Cursor {
bus: self.clone(),
address: address.to_string(),
pos,
fn publish(&self, address: &str, data: Message) {
if let Some(senders) = self.routes.lock().unwrap().get_mut(address) {
// Prune endpoints whose receiver was dropped: a disconnected endpoint
// is harmless, but keeping its sender would leak it in `routes`.
senders.retain(|tx| tx.send(data.clone()).is_ok());
}
}
fn get(&self, address: &str, pos: usize) -> Option<Message> {
// Unwrap produces a panic when the lock is poisoned.
// It would most likely indicate log corruption (e.g. incomplete write from another thread),
// so panic propagation seems appropriate.
self.log.read().unwrap().get(address)?.get(pos).cloned()
}
fn push(&self, address: String, data: Message) {
self.log
.write()
.unwrap()
.entry(address)
.or_default()
.push(data);
}
}
/// Per-consumer read cursor into a [`MessageBus`] address slot.
/// One client's endpoint onto a shared [`MessageBus`].
///
/// Reads are non-destructive: the underlying log is never modified.
/// Multiple cursors on the same address each advance independently.
pub struct Cursor {
/// `publish` fans the message out through the bus; `subscribe` registers this
/// endpoint's inbound sender for an address, so subsequent publishes to it are
/// delivered. The client obtains the inbound stream via [`Transport::inbound`].
#[derive(Debug)]
pub struct InProcessDelivery {
bus: MessageBus,
address: String,
pos: usize,
inbound_tx: Sender<Message>,
inbound_rx: Option<Receiver<Message>>,
}
impl Iterator for Cursor {
type Item = Message;
fn next(&mut self) -> Option<Message> {
let msg = self.bus.get(&self.address, self.pos)?;
self.pos += 1;
Some(msg)
}
}
/// In-process delivery service backed by a [`MessageBus`].
///
/// Cheap to clone — all clones share the same underlying bus, so multiple
/// clients can share one logical delivery service. Construct with a
/// [`MessageBus`] and use [`cursor`](InProcessDelivery::cursor) /
/// [`cursor_at_tail`](InProcessDelivery::cursor_at_tail) to read messages.
#[derive(Clone, Default, Debug)]
pub struct InProcessDelivery(MessageBus);
impl InProcessDelivery {
/// Create a delivery service backed by `bus`.
/// Create an endpoint on `bus`.
pub fn new(bus: MessageBus) -> Self {
Self(bus)
}
/// Returns a cursor positioned at the beginning of `address`.
pub fn cursor(&self, address: &str) -> Cursor {
self.0.cursor(address)
}
/// Returns a cursor positioned at the current tail of `address`.
/// The cursor will only see messages delivered after this call.
pub fn cursor_at_tail(&self, address: &str) -> Cursor {
self.0.cursor_at_tail(address)
let (tx, rx) = unbounded();
Self {
bus,
inbound_tx: tx,
inbound_rx: Some(rx),
}
}
}
@ -105,12 +64,20 @@ impl DeliveryService for InProcessDelivery {
type Error = Infallible;
fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Infallible> {
self.0.push(envelope.delivery_address, envelope.data);
self.bus.publish(&envelope.delivery_address, envelope.data);
Ok(())
}
fn subscribe(&mut self, _delivery_address: &str) -> Result<(), Self::Error> {
// TODO: (P1) implement subscribe
fn subscribe(&mut self, delivery_address: &str) -> Result<(), Self::Error> {
self.bus.register(delivery_address, self.inbound_tx.clone());
Ok(())
}
}
impl Transport for InProcessDelivery {
fn inbound(&mut self) -> Receiver<Vec<u8>> {
self.inbound_rx
.take()
.expect("InProcessDelivery::inbound called more than once")
}
}

View File

@ -24,4 +24,7 @@ pub enum Event {
convo_id: Arc<str>,
content: Vec<u8>,
},
InboundError {
message: String,
},
}

View File

@ -3,8 +3,8 @@ mod delivery_in_process;
mod errors;
mod event;
pub use client::ChatClient;
pub use delivery_in_process::{Cursor, InProcessDelivery, MessageBus};
pub use client::{ChatClient, Transport};
pub use delivery_in_process::{InProcessDelivery, MessageBus};
pub use errors::ClientError;
pub use event::Event;

View File

@ -1,95 +1,201 @@
use std::time::Duration;
use crossbeam_channel::{Receiver, Sender};
use logos_chat::{
ChatClient, ConversationClass, ConversationId, Cursor, Event, InProcessDelivery, StorageConfig,
AddressedEnvelope, ChatClient, DeliveryService, Event, InProcessDelivery, MessageBus,
StorageConfig, Transport,
};
/// Pulls one envelope, decrypts, and returns the events emitted.
fn receive(receiver: &mut ChatClient<InProcessDelivery>, cursor: &mut Cursor) -> Vec<Event> {
let raw = cursor.next().expect("expected envelope");
receiver.receive(&raw).expect("receive failed")
}
fn expect_message(event: &Event) -> (&str, &[u8]) {
match event {
Event::MessageReceived {
convo_id, content, ..
} => (convo_id.as_ref(), content.as_slice()),
other => panic!("expected MessageReceived, got {other:?}"),
}
}
fn expect_conversation_started(event: &Event) -> (&str, ConversationClass) {
match event {
Event::ConversationStarted {
convo_id, class, ..
} => (convo_id.as_ref(), *class),
other => panic!("expected ConversationStarted, got {other:?}"),
}
/// Block until the next event arrives and matches; panic on timeout/mismatch.
fn expect_event<F, T>(events: &Receiver<Event>, label: &str, mut f: F) -> T
where
F: FnMut(Event) -> Result<T, Event>,
{
let event = events
.recv_timeout(Duration::from_secs(5))
.unwrap_or_else(|_| panic!("timed out waiting for {label}"));
f(event).unwrap_or_else(|other| panic!("expected {label}, got {other:?}"))
}
#[test]
fn saro_raya_message_exchange() {
let delivery = InProcessDelivery::new(Default::default());
let mut cursor = delivery.cursor_at_tail("delivery_address");
let bus = MessageBus::default();
let saro_delivery = InProcessDelivery::new(bus.clone());
let raya_delivery = InProcessDelivery::new(bus);
let mut saro = ChatClient::new("saro", delivery.clone());
let mut raya = ChatClient::new("raya", delivery);
let (mut saro, saro_events) = ChatClient::new("saro", saro_delivery);
let (mut raya, raya_events) = ChatClient::new("raya", raya_delivery);
let raya_bundle = raya.create_intro_bundle().unwrap();
let saro_convo_id = saro
.create_conversation(&raya_bundle, b"hello raya")
.unwrap();
let events = receive(&mut raya, &mut cursor);
assert_eq!(
events.len(),
2,
"expected ConversationStarted + MessageReceived"
);
let (started_id, class) = expect_conversation_started(&events[0]);
assert_eq!(class, ConversationClass::Private);
let (msg_id, content) = expect_message(&events[1]);
assert_eq!(content, b"hello raya");
assert_eq!(started_id, msg_id);
let raya_convo_id: ConversationId = started_id.to_owned();
// The invite payload yields ConversationStarted then MessageReceived.
let raya_convo_id = expect_event(&raya_events, "ConversationStarted", |e| match e {
Event::ConversationStarted { convo_id, .. } => Ok(convo_id),
other => Err(other),
});
expect_event(&raya_events, "MessageReceived", |e| match e {
Event::MessageReceived { convo_id, content } => {
assert_eq!(convo_id, raya_convo_id);
assert_eq!(content.as_slice(), b"hello raya");
Ok(())
}
other => Err(other),
});
raya.send_message(&raya_convo_id, b"hi saro").unwrap();
let events = receive(&mut saro, &mut cursor);
assert_eq!(events.len(), 1);
let (_, content) = expect_message(&events[0]);
assert_eq!(content, b"hi saro");
expect_event(&saro_events, "MessageReceived", |e| match e {
Event::MessageReceived { content, .. } => {
assert_eq!(content.as_slice(), b"hi saro");
Ok(())
}
other => Err(other),
});
for i in 0u8..5 {
let msg = format!("msg {i}");
saro.send_message(&saro_convo_id, msg.as_bytes()).unwrap();
let events = receive(&mut raya, &mut cursor);
assert_eq!(events.len(), 1);
let (_, content) = expect_message(&events[0]);
assert_eq!(content, msg.as_bytes());
expect_event(
&raya_events,
&format!("MessageReceived(msg {i})"),
|e| match e {
Event::MessageReceived { content, .. } => {
assert_eq!(content.as_slice(), msg.as_bytes());
Ok(())
}
other => Err(other),
},
);
let reply = format!("reply {i}");
raya.send_message(&raya_convo_id, reply.as_bytes()).unwrap();
let events = receive(&mut saro, &mut cursor);
assert_eq!(events.len(), 1);
let (_, content) = expect_message(&events[0]);
assert_eq!(content, reply.as_bytes());
expect_event(
&saro_events,
&format!("MessageReceived(reply {i})"),
|e| match e {
Event::MessageReceived { content, .. } => {
assert_eq!(content.as_slice(), reply.as_bytes());
Ok(())
}
other => Err(other),
},
);
}
assert_eq!(saro.list_conversations().unwrap().len(), 1);
assert_eq!(raya.list_conversations().unwrap().len(), 1);
}
#[derive(Debug)]
struct FailingDelivery {
inbound_tx: Sender<Vec<u8>>,
inbound_rx: Option<Receiver<Vec<u8>>>,
}
impl FailingDelivery {
fn new() -> Self {
let (inbound_tx, inbound_rx) = crossbeam_channel::unbounded();
Self {
inbound_tx,
inbound_rx: Some(inbound_rx),
}
}
/// A sender into this transport's inbound stream — for tests to feed the
/// worker, or to hold open so it doesn't see a disconnect.
fn inbound_sender(&self) -> Sender<Vec<u8>> {
self.inbound_tx.clone()
}
}
impl DeliveryService for FailingDelivery {
type Error = &'static str;
fn publish(&mut self, _: AddressedEnvelope) -> Result<(), Self::Error> {
Err("simulated transport failure")
}
fn subscribe(&mut self, _: &str) -> Result<(), Self::Error> {
Ok(())
}
}
impl Transport for FailingDelivery {
fn inbound(&mut self) -> Receiver<Vec<u8>> {
self.inbound_rx
.take()
.expect("FailingDelivery::inbound called more than once")
}
}
#[test]
fn dropping_client_shuts_down_worker() {
let delivery = InProcessDelivery::new(MessageBus::default());
let (client, events) = ChatClient::new("saro", delivery);
drop(client);
// Drop joins the worker; once joined its Sender<Event> is gone, so recv
// reports the channel as disconnected.
let res = events.recv_timeout(Duration::from_secs(5));
assert!(matches!(
res,
Err(crossbeam_channel::RecvTimeoutError::Disconnected)
));
}
#[test]
fn publish_failure_surfaces_as_error() {
// A real raya just to mint a valid intro bundle.
let raya_delivery = InProcessDelivery::new(MessageBus::default());
let (mut raya, _raya_events) = ChatClient::new("raya", raya_delivery);
let bundle = raya.create_intro_bundle().unwrap();
// FailingDelivery never receives; keep the inbound sender alive so the
// worker doesn't exit early on a disconnected channel.
let delivery = FailingDelivery::new();
let _keep_inbound = delivery.inbound_sender();
let (mut saro, _saro_events) = ChatClient::new("saro", delivery);
let result = saro.create_conversation(&bundle, b"hello");
assert!(
result.is_err(),
"publish failure should surface as an error on the synchronous call"
);
}
#[test]
fn malformed_inbound_surfaces_as_error_event() {
// Feed the worker's inbound channel bytes that can't be decoded and assert
// it emits an InboundError instead of silently dropping the failure.
let delivery = FailingDelivery::new();
let inbound_tx = delivery.inbound_sender();
let (_saro, events) = ChatClient::new("saro", delivery);
inbound_tx.send(b"not a valid payload".to_vec()).unwrap();
expect_event(&events, "InboundError", |e| match e {
Event::InboundError { message } => {
assert!(!message.is_empty(), "error event should carry a message");
Ok(())
}
other => Err(other),
});
}
#[test]
fn open_persistent_client() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.db").to_string_lossy().to_string();
let config = StorageConfig::File(db_path);
let client1 = ChatClient::open("saro", config.clone(), InProcessDelivery::default()).unwrap();
let name1 = client1.installation_name().to_string();
let delivery1 = InProcessDelivery::new(MessageBus::default());
let (client1, _events1) = ChatClient::open("saro", config.clone(), delivery1).unwrap();
let name1 = client1.installation_name();
drop(client1);
let client2 = ChatClient::open("saro", config, InProcessDelivery::default()).unwrap();
let name2 = client2.installation_name().to_string();
let delivery2 = InProcessDelivery::new(MessageBus::default());
let (client2, _events2) = ChatClient::open("saro", config, delivery2).unwrap();
let name2 = client2.installation_name();
assert_eq!(
name1, name2,

View File

@ -5,7 +5,7 @@
| Status | Accepted |
| Issue | https://github.com/logos-messaging/libchat/issues/97 |
| Date | 2026-05-19 |
| Last revised | 2026-05-28 |
| Last revised | 2026-06-09 |
## Context and Problem
@ -26,7 +26,7 @@ Three layers. Calls flow downward. Sync results return through method returns; e
```mermaid
flowchart TB
A["<b>app</b><br/>drains Receiver&lt;Event&gt;"]
B["<b>client</b><br/>owns transport poller + services<br/>translates PayloadOutcome → Event values<br/>pushes onto channel"]
B["<b>client</b><br/>owns worker thread + services<br/>translates PayloadOutcome → Event values<br/>pushes onto channel"]
C["<b>core</b><br/>strict sync, caller-driven<br/>returns PayloadOutcome"]
A -- "method calls" --> B
@ -41,7 +41,7 @@ Crates: **app** — `bin/chat-cli`, future `logos-chat-module`; **client** — `
1. **Core returns `PayloadOutcome`, a dispatcher-level enum.** Each inbound path inside the core yields its own concrete outcome type: `ConvoOutcome` (`Convo::handle_frame`) carries decrypted contents on an existing conversation; `InboxOutcome` (inbox / inbox_v2 handlers) carries a newly observed conversation plus an optional initial `ConvoOutcome`. `PayloadOutcome` is the dispatcher-level union (`Empty`, `Convo(ConvoOutcome)`, `Inbox(InboxOutcome)`) and is the single type `Context::handle_payload` returns; `From<ConvoOutcome>` / `From<InboxOutcome>` impls keep the per-path handlers free of `PayloadOutcome` in their signatures. The split encodes at the type level what each producer can populate — a `Convo` cannot manufacture a new conversation, so its signature precludes the possibility.
2. **`Event` is an asynchronous notification.** The client's constructor returns a `Receiver<Event>` alongside the client handle. A background poller drives the transport, calls into the core for each inbound payload, translates the resulting `PayloadOutcome` into one event per observation, and pushes them onto the channel. Background work that has no synchronous trigger at all (delivery retry timeouts, future protocol timers) pushes onto the same channel.
2. **`Event` is an asynchronous notification.** The client's constructor returns a `Receiver<Event>` alongside the client handle. A background worker receives inbound payloads pushed from the transport (the Delivery Service's inbound side) — it is never polled — calls into the core for each, translates the resulting `PayloadOutcome` into one event per observation, and pushes them onto the channel. Background work that has no synchronous trigger at all (delivery retry timeouts, future protocol timers) pushes onto the same channel.
3. **Two enums, mapping at the client boundary.** `PayloadOutcome` is the dispatcher-level sum of observations from one payload; `Event` is a discrete app-facing notification. The two enums are allowed to diverge: a protocol-internal observation the app does not need lives only on a core outcome type; a client-only event like `DeliveryFailed { Timeout }` lives only on `Event`. Translation is an explicit per-variant `match` inside the client — not a blanket `From` impl — to preserve that divergence as both sides grow.
@ -55,13 +55,13 @@ Synchronous failures — publish, parse, store, MLS — stay on `Result<_, ChatE
## Sequence
Two flows cover everything the application observes: a synchronous send initiated by the app, and inbound bytes carried by the client's transport poller.
Two flows cover everything the application observes: a synchronous send initiated by the app, and inbound bytes the transport pushes to the client's worker.
```mermaid
sequenceDiagram
participant App
participant Client
participant Poller as Client poller (background)
participant Worker as Client worker (background)
participant Core
participant Delivery as DeliveryService
@ -73,13 +73,12 @@ sequenceDiagram
Core-->>Client: Ok(()) / Err
Client-->>App: Ok(()) / Err
Note over Poller,Delivery: Inbound — background poller pushes events
Poller->>Delivery: poll
Delivery-->>Poller: payload bytes
Poller->>Core: handle_payload(payload)
Core-->>Poller: Ok(PayloadOutcome)
Poller->>Poller: translate fields → Event values
Poller-)App: events via Receiver<Event>
Note over Worker,Delivery: Inbound — transport pushes, worker drives the core
Delivery-)Worker: inbound payload (subscribed address)
Worker->>Core: handle_payload(payload)
Core-->>Worker: Ok(PayloadOutcome)
Worker->>Worker: translate fields → Event values
Worker-)App: events via Receiver<Event>
Note over App: App drains on its own schedule
App->>App: for event in receiver.try_iter() { handle(event) }