feat: introduce client event system

This commit is contained in:
osmaczko 2026-05-20 13:53:43 +02:00
parent 65e103ab1d
commit f21ffcad95
30 changed files with 1186 additions and 744 deletions

2
.gitignore vendored
View File

@ -39,4 +39,4 @@ result
crates/client-ffi/client_ffi.h
# Compiled C FFI example binary
examples/c-ffi/c-client
crates/client-ffi/examples/message-exchange/c-client

2
Cargo.lock generated
View File

@ -1422,6 +1422,7 @@ dependencies = [
"storage",
"tempfile",
"thiserror",
"tracing",
"x25519-dalek",
]
@ -1742,6 +1743,7 @@ dependencies = [
"libchat",
"tempfile",
"thiserror",
"tracing",
]
[[package]]

View File

@ -5,7 +5,7 @@ use std::sync::mpsc;
use anyhow::Result;
use arboard::Clipboard;
use logos_chat::{ChatClient, ConversationIdOwned, DeliveryService};
use logos_chat::{ChatClient, ConversationIdOwned, DeliveryService, Event};
use serde::{Deserialize, Serialize};
use crate::utils::now;
@ -43,7 +43,7 @@ pub struct AppState {
pub struct ChatApp<D: DeliveryService> {
pub client: ChatClient<D>,
inbound: mpsc::Receiver<Vec<u8>>,
events: mpsc::Receiver<Event>,
pub state: AppState,
/// Ephemeral command output — not persisted, cleared on chat switch.
command_output: Vec<DisplayMessage>,
@ -56,7 +56,7 @@ pub struct ChatApp<D: DeliveryService> {
impl<D: DeliveryService + 'static> ChatApp<D> {
pub fn new(
client: ChatClient<D>,
inbound: mpsc::Receiver<Vec<u8>>,
events: mpsc::Receiver<Event>,
user_name: &str,
data_dir: &Path,
) -> Result<Self> {
@ -76,7 +76,7 @@ impl<D: DeliveryService + 'static> ChatApp<D> {
Ok(Self {
client,
inbound,
events,
state,
command_output: Vec::new(),
input: String::new(),
@ -142,43 +142,61 @@ impl<D: DeliveryService + 'static> ChatApp<D> {
}
pub fn process_incoming(&mut self) -> Result<()> {
while let Ok(payload) = self.inbound.try_recv() {
match self.client.receive(&payload) {
Ok(Some(content)) => {
let chat_id = &content.conversation_id;
if !self.state.chats.contains_key(chat_id) && content.is_new_convo {
let session = ChatSession {
chat_id: chat_id.clone(),
nickname: None,
messages: Vec::new(),
};
self.state.chats.insert(chat_id.clone(), session);
let label = chat_id[..8.min(chat_id.len())].to_string();
self.set_active_chat(Some(chat_id.clone()));
self.status = format!("New chat ({label})! Use /nickname to name it.");
}
if !content.data.is_empty() {
let text = String::from_utf8_lossy(&content.data).to_string();
if let Some(session) = self.state.chats.get_mut(chat_id) {
session.messages.push(DisplayMessage {
from_self: false,
content: text,
timestamp: now(),
});
}
}
self.save_state()?;
}
Ok(None) => {}
Err(e) => tracing::warn!("receive error: {e:?}"),
}
let mut had_event = false;
while let Ok(event) = self.events.try_recv() {
self.handle_event(event);
had_event = true;
}
if had_event {
self.save_state()?;
}
Ok(())
}
fn handle_event(&mut self, event: Event) {
match event {
Event::ConversationStarted {
conversation_id, ..
} => {
let chat_id = conversation_id.to_string();
if !self.state.chats.contains_key(&chat_id) {
let session = ChatSession {
chat_id: chat_id.clone(),
nickname: None,
messages: Vec::new(),
};
self.state.chats.insert(chat_id.clone(), session);
let label = chat_id[..8.min(chat_id.len())].to_string();
self.set_active_chat(Some(chat_id));
self.status = format!("New chat ({label})! Use /nickname to name it.");
}
}
Event::MessageReceived {
conversation_id,
data,
..
} => {
if data.is_empty() {
return;
}
let chat_id = conversation_id.as_ref();
let text = String::from_utf8_lossy(&data).to_string();
if let Some(session) = self.state.chats.get_mut(chat_id) {
session.messages.push(DisplayMessage {
from_self: false,
content: text,
timestamp: now(),
});
}
}
Event::DeliveryFailed { reason, .. } => {
tracing::warn!("delivery failed: {reason:?}");
self.status = format!("Delivery failed ({reason:?}); peer may not receive.");
}
_ => {}
}
}
pub fn send_message(&mut self, content: &str) -> Result<()> {
let chat_id = self
.state
@ -188,9 +206,13 @@ impl<D: DeliveryService + 'static> ChatApp<D> {
let convo_id: ConversationIdOwned = chat_id.as_str().into();
self.client
let events = self
.client
.send_message(&convo_id, content.as_bytes())
.map_err(|e| anyhow::anyhow!("{e:?}"))?;
for event in events {
self.handle_event(event);
}
if let Some(session) = self.state.chats.get_mut(&chat_id) {
session.messages.push(DisplayMessage {
@ -253,10 +275,13 @@ impl<D: DeliveryService + 'static> ChatApp<D> {
return Ok(Some("Usage: /connect <bundle>".to_string()));
}
let initial = format!("Hello from {}!", self.user_name);
let convo_id = self
let (convo_id, events) = self
.client
.create_conversation(args.as_bytes(), initial.as_bytes())
.map_err(|e| anyhow::anyhow!("{e:?}"))?;
for event in events {
self.handle_event(event);
}
let chat_id = convo_id.to_string();
let label = chat_id[..8.min(chat_id.len())].to_string();

View File

@ -4,7 +4,6 @@ mod ui;
mod utils;
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use anyhow::{Context, Result};
use clap::{Parser, ValueEnum};
@ -66,9 +65,9 @@ fn main() -> Result<()> {
match cli.transport {
TransportKind::File => {
let transport_dir = cli.data.join("transport");
let (transport, inbound) = transport::file::FileTransport::new(&transport_dir)
let transport = transport::file::FileTransport::new(&transport_dir)
.context("failed to create file transport")?;
run(transport, inbound, &cli)
run(transport, &cli)
}
#[cfg(logos_delivery)]
TransportKind::LogosDelivery => {
@ -82,20 +81,15 @@ fn main() -> Result<()> {
tcp_port: cli.port,
..Default::default()
};
let (transport, inbound) =
Service::start(cfg).context("failed to start logos-delivery")?;
let transport = Service::start(cfg).context("failed to start logos-delivery")?;
println!("Node connected. Initializing chat client...");
run(transport, inbound, &cli)
run(transport, &cli)
}
}
}
fn run<D: DeliveryService + 'static>(
transport: D,
inbound: mpsc::Receiver<Vec<u8>>,
cli: &Cli,
) -> Result<()> {
fn run<D: DeliveryService + 'static>(transport: D, cli: &Cli) -> Result<()> {
let db_path = cli
.db
.clone()
@ -105,7 +99,7 @@ fn run<D: DeliveryService + 'static>(
.context("db path contains non-UTF-8 characters")?
.to_string();
let client = logos_chat::ChatClient::open(
let (client, events) = logos_chat::ChatClient::open(
cli.name.clone(),
logos_chat::StorageConfig::Encrypted {
path: db_str,
@ -116,7 +110,7 @@ fn run<D: DeliveryService + 'static>(
.map_err(|e| anyhow::anyhow!("{e:?}"))
.context("failed to open chat client")?;
let mut app = ChatApp::new(client, inbound, &cli.name, &cli.data)?;
let mut app = ChatApp::new(client, events, &cli.name, &cli.data)?;
if cli.smoketest {
return Ok(());
@ -128,71 +122,6 @@ fn run<D: DeliveryService + 'static>(
result
}
#[cfg_attr(not(logos_delivery), allow(dead_code, unused_variables))]
fn run_logos_delivery(cli: Cli) -> Result<()> {
#[cfg(logos_delivery)]
{
use transport::logos_delivery::{Config, Service};
eprintln!("Starting logos-delivery node (preset={})...", cli.preset);
eprintln!("This may take a few seconds while connecting to the network.");
let logos_cfg = Config {
preset: cli.preset.clone(),
tcp_port: cli.port,
..Default::default()
};
let (delivery, inbound) =
Service::start(logos_cfg).context("failed to start logos-delivery")?;
eprintln!("Node connected. Initializing chat client...");
let data_dir = cli
.db
.as_ref()
.and_then(|p| p.parent())
.map(|p| p.to_path_buf())
.unwrap_or_else(|| cli.data.clone());
let client = match cli.db {
Some(ref path) => {
let db_str = path
.to_str()
.context("db path contains non-UTF-8 characters")?
.to_string();
logos_chat::ChatClient::open(
cli.name.clone(),
logos_chat::StorageConfig::Encrypted {
path: db_str,
key: "chat-cli".to_string(),
},
delivery,
)
.map_err(|e| anyhow::anyhow!("{e:?}"))
.context("failed to open persistent client")?
}
None => logos_chat::ChatClient::new(cli.name.clone(), delivery),
};
let mut app = ChatApp::new(client, inbound, &cli.name, &data_dir)?;
if cli.smoketest {
return Ok(());
}
let mut terminal = ui::init().context("failed to initialize terminal")?;
let result = run_app(&mut terminal, &mut app);
ui::restore().context("failed to restore terminal")?;
return result;
}
#[cfg(not(logos_delivery))]
anyhow::bail!(
"logos-delivery transport is not available in this build.\n\
Build with LOGOS_DELIVERY_LIB_DIR set to enable it."
)
}
fn run_app<D: DeliveryService + 'static>(
terminal: &mut ui::Tui,
app: &mut ChatApp<D>,

View File

@ -2,11 +2,11 @@ use std::collections::BTreeMap;
use std::fs::{self, File, OpenOptions};
use std::io::{self, BufReader, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use std::sync::{Mutex, mpsc};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use logos_chat::{AddressedEnvelope, DeliveryService};
use logos_chat::{AddressedEnvelope, DeliveryService, drain_inbound};
#[derive(Debug, thiserror::Error)]
pub enum FileTransportError {
@ -17,16 +17,17 @@ pub enum FileTransportError {
#[derive(Debug)]
pub struct FileTransport {
transport_dir: PathBuf,
inbound: Mutex<mpsc::Receiver<Vec<u8>>>,
}
impl FileTransport {
/// All instances pointing at the same `transport_dir` share one broadcast bus.
///
/// Messages are written to `{transport_dir}/{delivery_address}/{hours_since_epoch}.bin`
/// as length-prefixed frames (`[u32 BE length][payload bytes]`). The background
/// as length-prefixed frames (`[u32 BE length][payload bytes]`). A background
/// thread reads all files under `transport_dir` and forwards every frame to
/// the returned channel; `client.receive()` discards frames it cannot decrypt.
pub fn new(transport_dir: &Path) -> io::Result<(Self, mpsc::Receiver<Vec<u8>>)> {
/// the internal inbound channel; the client discards frames it cannot decrypt.
pub fn new(transport_dir: &Path) -> io::Result<Self> {
fs::create_dir_all(transport_dir)?;
let (tx, rx) = mpsc::sync_channel(1024);
@ -36,19 +37,17 @@ impl FileTransport {
.name("file-transport".into())
.spawn(move || poll_reader(dir, tx))?;
Ok((
Self {
transport_dir: transport_dir.to_path_buf(),
},
rx,
))
Ok(Self {
transport_dir: transport_dir.to_path_buf(),
inbound: Mutex::new(rx),
})
}
}
impl DeliveryService for FileTransport {
type Error = FileTransportError;
fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), FileTransportError> {
fn publish(&self, envelope: AddressedEnvelope) -> Result<(), FileTransportError> {
let addr_dir = self.transport_dir.join(&envelope.delivery_address);
fs::create_dir_all(&addr_dir)?;
@ -62,10 +61,14 @@ impl DeliveryService for FileTransport {
Ok(())
}
fn subscribe(&mut self, _delivery_address: &str) -> Result<(), Self::Error> {
fn subscribe(&self, _delivery_address: &str) -> Result<(), Self::Error> {
// FileTransport does not support filtering
Ok(())
}
fn pull(&self) -> Vec<Vec<u8>> {
drain_inbound(&self.inbound)
}
}
/// Hours since Unix epoch — used as the rolling filename.

View File

@ -18,7 +18,7 @@ use std::time::Duration;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64;
use logos_chat::{AddressedEnvelope, DeliveryService};
use logos_chat::{AddressedEnvelope, DeliveryService, drain_inbound};
use tracing::{error, info, warn};
use wrapper::LogosNodeCtx;
@ -123,12 +123,14 @@ pub struct Service {
outbound: mpsc::SyncSender<OutboundCmd>,
#[allow(dead_code)]
subscribers: SubscriberList,
inbound: Arc<Mutex<mpsc::Receiver<Vec<u8>>>>,
}
impl Service {
/// Start the embedded logos-delivery node. Returns the service and a
/// receiver for inbound raw payloads.
pub fn start(cfg: Config) -> Result<(Self, mpsc::Receiver<Vec<u8>>), DeliveryError> {
/// Start the embedded logos-delivery node. Returns the service handle —
/// inbound payloads are drained via the `DeliveryService::pull` trait
/// method.
pub fn start(cfg: Config) -> Result<Self, DeliveryError> {
let (out_tx, out_rx) = mpsc::sync_channel::<OutboundCmd>(256);
let subscribers: SubscriberList = Arc::new(Mutex::new(Vec::new()));
let (ready_tx, ready_rx) = mpsc::channel::<Result<(), DeliveryError>>();
@ -167,13 +169,11 @@ impl Service {
return Err(e);
}
Ok((
Self {
outbound: out_tx,
subscribers,
},
inbound_rx,
))
Ok(Self {
outbound: out_tx,
subscribers,
inbound: Arc::new(Mutex::new(inbound_rx)),
})
}
fn node_thread(
@ -281,7 +281,7 @@ impl Service {
impl DeliveryService for Service {
type Error = DeliveryError;
fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), DeliveryError> {
fn publish(&self, envelope: AddressedEnvelope) -> Result<(), DeliveryError> {
let msg = WakuMessage {
content_topic: content_topic_for(&envelope.delivery_address),
payload: BASE64.encode(&envelope.data),
@ -301,8 +301,12 @@ impl DeliveryService for Service {
reply_rx.recv().map_err(|_| DeliveryError::ChannelClosed)?
}
fn subscribe(&mut self, _: &str) -> Result<(), <Self as DeliveryService>::Error> {
fn subscribe(&self, _: &str) -> Result<(), <Self as DeliveryService>::Error> {
// This Service does not support filtering
Ok(())
}
fn pull(&self) -> Vec<Vec<u8>> {
drain_inbound(&self.inbound)
}
}

View File

@ -25,6 +25,7 @@ prost = "0.14.1"
rand_core = { version = "0.6" }
safer-ffi = "0.1.13"
thiserror = "2.0.17"
tracing = "0.1"
x25519-dalek = { version = "2.0.1", features = ["static_secrets", "reusable_secrets", "getrandom"] }
[dev-dependencies]

View File

@ -1,6 +1,4 @@
use std::cell::{Ref, RefMut};
use std::sync::Arc;
use std::{cell::RefCell, rc::Rc};
use std::sync::{Arc, Mutex, MutexGuard};
use crate::account::LogosAccount;
use crate::conversation::{Convo, GroupConvo};
@ -9,10 +7,11 @@ use crate::{DeliveryService, RegistrationService};
use crate::{
conversation::{Conversation, Id, PrivateV1Convo},
errors::ChatError,
event::Event,
inbox::Inbox,
inbox_v2::InboxV2,
proto::{EncryptedPayload, EnvelopeV1, Message},
types::{AccountId, AddressedEnvelope, ContentData},
types::AccountId,
};
use crypto::{Identity, PublicKey};
use storage::{ChatStore, ConversationKind};
@ -20,12 +19,17 @@ use storage::{ChatStore, ConversationKind};
pub use crate::conversation::{ConversationId, ConversationIdOwned};
pub use crate::inbox::Introduction;
/// Delivery address used by the legacy PrivateV1 inbox path. Consumers must
/// subscribe to this address to receive private-conversation invitations and
/// follow-up frames.
pub(crate) const PRIVATE_V1_INBOX_ADDRESS: &str = "delivery_address";
// This is the main entry point to the conversations api.
// Ctx manages lifetimes of objects to process and generate payloads.
pub struct Context<DS: DeliveryService, RS: RegistrationService, CS: ChatStore> {
identity: Rc<Identity>,
ds: Rc<RefCell<DS>>,
store: Rc<RefCell<CS>>,
identity: Arc<Identity>,
ds: Arc<DS>,
store: Arc<Mutex<CS>>,
inbox: Inbox<CS>,
pq_inbox: InboxV2<DS, RS, CS>,
}
@ -48,33 +52,34 @@ where
) -> Result<Self, ChatError> {
let name = name.into();
// Services for sharing with Converastions/Inboxes
let ds = Rc::new(RefCell::new(delivery));
let contact_registry = Rc::new(RefCell::new(registration));
let store = Rc::new(RefCell::new(store));
// Services for sharing with Conversations/Inboxes
let ds = Arc::new(delivery);
let contact_registry = Arc::new(Mutex::new(registration));
let store = Arc::new(Mutex::new(store));
// Load or create identity
let identity = if let Some(identity) = store.borrow().load_identity()? {
let identity = if let Some(identity) = store.lock().unwrap().load_identity()? {
identity
} else {
let identity = Identity::new(&name);
store.borrow_mut().save_identity(&identity)?;
store.lock().unwrap().save_identity(&identity)?;
identity
};
let identity = Rc::new(identity);
let inbox = Inbox::new(Rc::clone(&store), Rc::clone(&identity));
let identity = Arc::new(identity);
let inbox = Inbox::new(Arc::clone(&store), Arc::clone(&identity));
let pq_inbox = InboxV2::new(
LogosAccount::new_test(name),
ds.clone(),
Arc::clone(&ds),
contact_registry.clone(),
store.clone(),
);
// Subscribe
ds.borrow_mut()
.subscribe(&pq_inbox.delivery_address())
// Subscribe to both inbox addresses so DS::pull yields their traffic.
ds.subscribe(&pq_inbox.delivery_address())
.map_err(ChatError::generic)?;
ds.subscribe(PRIVATE_V1_INBOX_ADDRESS)
.map_err(ChatError::generic)?;
Ok(Self {
@ -98,21 +103,22 @@ where
let name = name.into();
let identity = Identity::new(&name);
// Services for sharing with Converastions/Inboxes
let ds = Rc::new(RefCell::new(delivery));
let contact_registry = Rc::new(RefCell::new(registration));
let store = Rc::new(RefCell::new(chat_store));
// Services for sharing with Conversations/Inboxes
let ds = Arc::new(delivery);
let contact_registry = Arc::new(Mutex::new(registration));
let store = Arc::new(Mutex::new(chat_store));
store
.borrow_mut()
.lock()
.unwrap()
.save_identity(&identity)
.expect("in-memory storage should not fail");
let identity = Rc::new(identity);
let inbox = Inbox::new(store.clone(), Rc::clone(&identity));
let identity = Arc::new(identity);
let inbox = Inbox::new(store.clone(), Arc::clone(&identity));
let mut pq_inbox = InboxV2::new(
LogosAccount::new_test(name),
ds.clone(),
Arc::clone(&ds),
contact_registry.clone(),
store.clone(),
);
@ -120,8 +126,9 @@ where
// TODO: (P2) Initialize Account in Context or upper client.
pq_inbox.register()?;
ds.borrow_mut()
.subscribe(&pq_inbox.delivery_address())
ds.subscribe(&pq_inbox.delivery_address())
.map_err(ChatError::generic)?;
ds.subscribe(PRIVATE_V1_INBOX_ADDRESS)
.map_err(ChatError::generic)?;
Ok(Self {
@ -133,19 +140,22 @@ where
})
}
pub fn ds(&self) -> RefMut<'_, DS> {
self.ds.borrow_mut()
pub fn ds(&self) -> &DS {
&self.ds
}
pub fn store(&self) -> Ref<'_, CS> {
self.store.borrow()
pub fn delivery_arc(&self) -> Arc<DS> {
Arc::clone(&self.ds)
}
pub fn store(&self) -> MutexGuard<'_, CS> {
self.store.lock().unwrap()
}
pub fn identity(&self) -> &Identity {
&self.identity
}
/// Returns the unique identifier associated with the account
pub fn account_id(&self) -> &AccountId {
self.pq_inbox.account_id()
}
@ -162,43 +172,48 @@ where
&mut self,
remote_bundle: &Introduction,
content: &[u8],
) -> Result<(ConversationIdOwned, Vec<AddressedEnvelope>), ChatError> {
) -> Result<(ConversationIdOwned, Vec<Event>), ChatError> {
let (mut convo, payloads) = self
.inbox
.invite_to_private_convo(remote_bundle, content, Rc::clone(&self.store))
.invite_to_private_convo(remote_bundle, content, Arc::clone(&self.store))
.unwrap_or_else(|_| todo!("Log/Surface Error"));
let remote_id = Inbox::<CS>::inbox_identifier_for_key(*remote_bundle.installation_key());
let payload_bytes = payloads
.into_iter()
.map(|p| p.into_envelope(remote_id.clone()))
.collect();
let convo_id = convo.persist()?;
Ok((convo_id, payload_bytes))
let mut events = Vec::new();
for payload in payloads {
let envelope = payload.into_envelope(remote_id.clone());
if let Err(e) = self.ds.publish(envelope) {
tracing::warn!("publish failed for convo {convo_id}: {e}");
events.push(Event::transport_failure(convo_id.clone()));
}
}
Ok((convo_id, events))
}
#[allow(clippy::type_complexity)]
pub fn create_group_convo(
&mut self,
participants: &[&AccountId],
) -> Result<Box<dyn GroupConvo<DS, RS>>, ChatError> {
) -> Result<(Box<dyn GroupConvo<DS, RS>>, Vec<Event>), ChatError> {
// TODO: (P1) Ensure errors are handled propertly. This is a high chance for desynchronized state.
// MlsGroup persistence, conversation persistence, and invite delivery all happen seperately
let mut convo = self.pq_inbox.create_group_v1()?;
self.store
.borrow_mut()
.lock()
.unwrap()
.save_conversation(&storage::ConversationMeta {
local_convo_id: convo.id().to_string(),
remote_convo_id: "0".into(),
kind: ConversationKind::GroupV1,
})?;
convo.add_member(participants)?;
Ok(Box::new(convo))
let events = convo.add_member(participants)?;
Ok((Box::new(convo), events))
}
pub fn list_conversations(&self) -> Result<Vec<ConversationIdOwned>, ChatError> {
let records = self.store.borrow().load_conversations()?;
let records = self.store.lock().unwrap().load_conversations()?;
Ok(records
.into_iter()
.map(|r| Arc::from(r.local_convo_id.as_str()))
@ -209,18 +224,25 @@ where
&mut self,
convo_id: ConversationId,
content: &[u8],
) -> Result<Vec<AddressedEnvelope>, ChatError> {
) -> Result<Vec<Event>, ChatError> {
let mut convo = self.load_convo(convo_id)?;
let payloads = convo.send_message(content)?;
let remote_id = convo.remote_id();
Ok(payloads
.into_iter()
.map(|p| p.into_envelope(remote_id.clone()))
.collect())
let convo_id_owned: ConversationIdOwned = Arc::from(convo_id);
let mut events = Vec::new();
for payload in payloads {
let envelope = payload.into_envelope(remote_id.clone());
if let Err(e) = self.ds.publish(envelope) {
tracing::warn!("publish failed for convo {convo_id}: {e}");
events.push(Event::transport_failure(convo_id_owned.clone()));
}
}
Ok(events)
}
// Decode bytes and send to protocol for processing.
pub fn handle_payload(&mut self, payload: &[u8]) -> Result<Option<ContentData>, ChatError> {
pub fn handle_payload(&mut self, payload: &[u8]) -> Result<Vec<Event>, ChatError> {
let env = EnvelopeV1::decode(payload)?;
// TODO: Impl Conversation hinting
@ -229,45 +251,39 @@ where
match convo_id {
c if c == self.inbox.id() => self.dispatch_to_inbox(&env.payload),
c if c == self.pq_inbox.id() => self.dispatch_to_inbox2(&env.payload),
c if self.store.borrow().has_conversation(&c)? => {
c if self.store.lock().unwrap().has_conversation(&c)? => {
self.dispatch_to_convo(&c, &env.payload)
}
_ => Ok(Some(ContentData {
conversation_id: "".into(),
data: vec![],
is_new_convo: false,
})),
c => {
tracing::warn!("dropping payload for unknown conversation hint {c}");
Ok(Vec::new())
}
}
}
// Dispatch encrypted payload to Inbox, and register the created Conversation
fn dispatch_to_inbox(
&mut self,
enc_payload_bytes: &[u8],
) -> Result<Option<ContentData>, ChatError> {
fn dispatch_to_inbox(&mut self, enc_payload_bytes: &[u8]) -> Result<Vec<Event>, ChatError> {
// EncryptedPayloads are not used by GroupConvos at this time, else this can be performed in `handle_payload`
// TODO: (P1) reconcile envelope parsing between Covno and GroupConvo
let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?;
let public_key_hex = Inbox::<CS>::extract_ephemeral_key_hex(&enc_payload)?;
let (convo, content) =
let (convo, events) =
self.inbox
.handle_frame(enc_payload, &public_key_hex, Rc::clone(&self.store))?;
.handle_frame(enc_payload, &public_key_hex, Arc::clone(&self.store))?;
match convo {
Conversation::Private(mut convo) => convo.persist()?,
};
self.store
.borrow_mut()
.lock()
.unwrap()
.remove_ephemeral_key(&public_key_hex)?;
Ok(content)
Ok(events)
}
// Dispatch encrypted payload to Inbox, and register the created Conversation
fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result<Option<ContentData>, ChatError> {
self.pq_inbox.handle_frame(payload)?;
Ok(None)
fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result<Vec<Event>, ChatError> {
self.pq_inbox.handle_frame(payload)
}
// Dispatch encrypted payload to its corresponding conversation
@ -275,7 +291,7 @@ where
&mut self,
convo_id: ConversationId,
enc_payload_bytes: &[u8],
) -> Result<Option<ContentData>, ChatError> {
) -> Result<Vec<Event>, ChatError> {
let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?;
let mut convo = self.load_convo(convo_id)?;
convo.handle_frame(enc_payload)
@ -297,7 +313,8 @@ where
fn load_convo(&mut self, convo_id: ConversationId) -> Result<Box<dyn Convo>, ChatError> {
let record = self
.store
.borrow()
.lock()
.unwrap()
.load_conversation(convo_id)?
.ok_or_else(|| ChatError::NoConvo(convo_id.into()))?;
@ -326,7 +343,8 @@ where
) -> Result<Box<dyn GroupConvo<DS, RS>>, ChatError> {
let record = self
.store
.borrow()
.lock()
.unwrap()
.load_conversation(convo_id)?
.ok_or_else(|| ChatError::NoConvo(convo_id.into()))?;

View File

@ -3,8 +3,9 @@ mod privatev1;
use crate::{
DeliveryService,
event::Event,
service_traits::KeyPackageProvider,
types::{AccountId, AddressedEncryptedPayload, ContentData},
types::{AccountId, AddressedEncryptedPayload},
};
use chat_proto::logoschat::encryption::EncryptedPayload;
use std::fmt::Debug;
@ -28,13 +29,11 @@ pub trait Convo: Id + Debug {
/// Decrypts and processes an incoming encrypted frame.
///
/// Returns `Ok(Some(ContentData))` if the frame contains user content,
/// `Ok(None)` for protocol frames (e.g., placeholders), or an error if
/// decryption or frame parsing fails.
fn handle_frame(
&mut self,
enc_payload: EncryptedPayload,
) -> Result<Option<ContentData>, ChatError>;
/// Returns the events generated by processing the frame: typically a
/// single `Event::MessageReceived` for user content, an empty vector for
/// protocol frames (e.g. placeholders), or an error if decryption or
/// frame parsing fails.
fn handle_frame(&mut self, enc_payload: EncryptedPayload) -> Result<Vec<Event>, ChatError>;
fn remote_id(&self) -> String;
@ -43,11 +42,15 @@ pub trait Convo: Id + Debug {
}
pub trait GroupConvo<DS: DeliveryService, RS: KeyPackageProvider>: Convo {
fn add_member(&mut self, members: &[&AccountId]) -> Result<(), ChatError>;
/// Invite new members and publish the resulting MLS commit + welcome
/// messages. Returns the observation events generated while publishing —
/// typically empty on success, `DeliveryFailed` if a publish errored.
fn add_member(&mut self, members: &[&AccountId]) -> Result<Vec<Event>, ChatError>;
// This is intended to replace `send_message`. The trait change is that it automatically
// sends the payload directly.
fn send_content(&mut self, content: &[u8]) -> Result<(), ChatError>;
/// Encrypt and publish `content`. This is the publish-side counterpart of
/// `Convo::send_message`. Returns the observation events generated while
/// publishing — typically empty on success.
fn send_content(&mut self, content: &[u8]) -> Result<Vec<Event>, ChatError>;
}
pub enum Conversation<S: ConversationStore + RatchetStore> {

View File

@ -2,8 +2,7 @@
/// Properties:
/// - Harvest Now Decrypt Later (HNDL) protection provided by XWING
/// - Multiple
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use blake2::{Blake2b, Digest, digest::consts::U6};
use chat_proto::logoschat::encryption::{EncryptedPayload, Plaintext, encrypted_payload};
@ -18,8 +17,9 @@ use crate::types::AccountId;
use crate::{
DeliveryService,
conversation::{ChatError, ConversationId, Convo, GroupConvo, Id},
event::Event,
service_traits::KeyPackageProvider,
types::{AddressedEncryptedPayload, ContentData},
types::AddressedEncryptedPayload,
};
/// Provides the identity information needed to participate in an MLS group.
@ -58,16 +58,16 @@ pub trait MlsContext {
fn invite_user<DS: DeliveryService>(
&self,
ds: &mut DS,
ds: &DS,
account_id: &AccountId,
welcome: &MlsMessageOut,
) -> Result<(), ChatError>;
}
pub struct GroupV1Convo<MlsCtx, DS, KP> {
ctx: Rc<RefCell<MlsCtx>>,
ds: Rc<RefCell<DS>>,
keypkg_provider: Rc<RefCell<KP>>,
ctx: Arc<Mutex<MlsCtx>>,
ds: Arc<DS>,
keypkg_provider: Arc<Mutex<KP>>,
mls_group: MlsGroup,
convo_id: String,
}
@ -80,7 +80,7 @@ where
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GroupV1Convo")
.field("name", &self.ctx.borrow().ident().friendly_name())
.field("name", &self.ctx.lock().unwrap().ident().friendly_name())
.field("convo_id", &self.convo_id)
.field("mls_epoch", &self.mls_group.epoch())
.finish_non_exhaustive()
@ -95,13 +95,13 @@ where
{
// Create a new conversation with the creator as the only participant.
pub fn new(
ctx: Rc<RefCell<MlsCtx>>,
ds: Rc<RefCell<DS>>,
keypkg_provider: Rc<RefCell<KP>>,
ctx: Arc<Mutex<MlsCtx>>,
ds: Arc<DS>,
keypkg_provider: Arc<Mutex<KP>>,
) -> Result<Self, ChatError> {
let config = Self::mls_create_config();
let mls_group = {
let ctx_ref = ctx.borrow();
let ctx_ref = ctx.lock().unwrap();
MlsGroup::new(
ctx_ref.provider(),
ctx_ref.ident(),
@ -111,7 +111,7 @@ where
.unwrap()
};
let convo_id = hex::encode(mls_group.group_id().as_slice());
Self::subscribe(&mut ds.borrow_mut(), &convo_id)?;
Self::subscribe(&*ds, &convo_id)?;
Ok(Self {
ctx,
@ -124,13 +124,13 @@ where
// Constructs a new conversation upon receiving a MlsWelcome message.
pub fn new_from_welcome(
ctx: Rc<RefCell<MlsCtx>>,
ds: Rc<RefCell<DS>>,
keypkg_provider: Rc<RefCell<KP>>,
ctx: Arc<Mutex<MlsCtx>>,
ds: Arc<DS>,
keypkg_provider: Arc<Mutex<KP>>,
welcome: Welcome,
) -> Result<Self, ChatError> {
let mls_group = {
let ctx_borrow = ctx.borrow();
let ctx_borrow = ctx.lock().unwrap();
let provider = ctx_borrow.provider();
StagedWelcome::build_from_welcome(provider, &Self::mls_join_config(), welcome)
@ -142,7 +142,7 @@ where
};
let convo_id = hex::encode(mls_group.group_id().as_slice());
Self::subscribe(&mut *ds.borrow_mut(), &convo_id)?;
Self::subscribe(&*ds, &convo_id)?;
Ok(Self {
ctx,
@ -154,17 +154,17 @@ where
}
pub fn load(
ctx: Rc<RefCell<MlsCtx>>,
ds: Rc<RefCell<DS>>,
keypkg_provider: Rc<RefCell<KP>>,
ctx: Arc<Mutex<MlsCtx>>,
ds: Arc<DS>,
keypkg_provider: Arc<Mutex<KP>>,
convo_id: String,
group_id: GroupId,
) -> Result<Self, ChatError> {
let mls_group = MlsGroup::load(ctx.borrow().provider().storage(), &group_id)
let mls_group = MlsGroup::load(ctx.lock().unwrap().provider().storage(), &group_id)
.map_err(ChatError::generic)?
.ok_or_else(|| ChatError::NoConvo("mls group not found".into()))?;
Self::subscribe(&mut *ds.borrow_mut(), &convo_id)?;
Self::subscribe(&*ds, &convo_id)?;
Ok(GroupV1Convo {
ctx,
@ -176,7 +176,7 @@ where
}
// Configure the delivery service to listen for the required delivery addresses.
fn subscribe(ds: &mut DS, convo_id: &str) -> Result<(), ChatError> {
fn subscribe(ds: &DS, convo_id: &str) -> Result<(), ChatError> {
ds.subscribe(&Self::delivery_address_from_id(convo_id))
.map_err(ChatError::generic)?;
ds.subscribe(&Self::ctrl_delivery_address_from_id(convo_id))
@ -220,23 +220,28 @@ where
Self::ctrl_delivery_address_from_id(&self.convo_id)
}
fn key_package_for_account(&self, ident: &AccountId) -> Result<KeyPackage, ChatError> {
// `provider` is passed in rather than acquired from `self.ctx` so callers
// that already hold the `ctx` lock can use this; `std::sync::Mutex` is not
// reentrant.
fn key_package_for_account(
&self,
ident: &AccountId,
provider: &LibcruxProvider,
) -> Result<KeyPackage, ChatError> {
let retrieved_bytes = self
.keypkg_provider
.borrow()
.lock()
.unwrap()
.retrieve(ident)
.map_err(|e: KP::Error| ChatError::Generic(e.to_string()))?;
// dbg!(ctx.contact_registry());
let Some(keypkg_bytes) = retrieved_bytes else {
return Err(ChatError::Protocol("Contact Not Found".into()));
};
let key_package_in = KeyPackageIn::tls_deserialize(&mut keypkg_bytes.as_slice())?;
let keypkg = key_package_in.validate(
self.ctx.borrow().provider().crypto(),
ProtocolVersion::Mls10,
)?; //TODO: P3 - Hardcoded Protocol Version
let keypkg = key_package_in.validate(provider.crypto(), ProtocolVersion::Mls10)?;
//TODO: P3 - Hardcoded Protocol Version
Ok(keypkg)
}
}
@ -262,7 +267,7 @@ where
&mut self,
content: &[u8],
) -> Result<Vec<AddressedEncryptedPayload>, ChatError> {
let ctx_ref = self.ctx.borrow();
let ctx_ref = self.ctx.lock().unwrap();
let provider = ctx_ref.provider();
let mls_message_out = self
.mls_group
@ -281,10 +286,7 @@ where
Ok(vec![a])
}
fn handle_frame(
&mut self,
encoded_payload: EncryptedPayload,
) -> Result<Option<ContentData>, ChatError> {
fn handle_frame(&mut self, encoded_payload: EncryptedPayload) -> Result<Vec<Event>, ChatError> {
let bytes = match encoded_payload.encryption {
Some(encrypted_payload::Encryption::Plaintext(pt)) => pt.payload,
_ => {
@ -302,12 +304,12 @@ where
.try_into_protocol_message()
.map_err(ChatError::generic)?;
let ctx_borrow = self.ctx.borrow();
let ctx_borrow = self.ctx.lock().unwrap();
let provider = ctx_borrow.provider();
if protocol_message.epoch() < self.mls_group.epoch() {
// TODO: (P1) Add logging for messages arriving from past epoch.
return Ok(None);
return Ok(Vec::new());
}
let processed = self
@ -316,20 +318,19 @@ where
.map_err(ChatError::generic)?;
match processed.into_content() {
ProcessedMessageContent::ApplicationMessage(msg) => Ok(Some(ContentData {
conversation_id: hex::encode(self.mls_group.group_id().as_slice()),
ProcessedMessageContent::ApplicationMessage(msg) => Ok(vec![Event::MessageReceived {
conversation_id: Arc::from(hex::encode(self.mls_group.group_id().as_slice())),
data: msg.into_bytes(),
is_new_convo: false,
})),
}]),
ProcessedMessageContent::StagedCommitMessage(commit) => {
self.mls_group
.merge_staged_commit(provider, *commit)
.map_err(ChatError::generic)?;
Ok(None)
Ok(Vec::new())
}
_ => {
// TODO: (P2) Log unknown message type
Ok(None)
Ok(Vec::new())
}
}
}
@ -354,8 +355,8 @@ where
// commit — the Commit message Alice broadcasts to all members
// welcome — the Welcome message sent privately to each new joiner
// _group_info — used for external joins; ignore for now
fn add_member(&mut self, members: &[&AccountId]) -> Result<(), ChatError> {
let ctx_ref = self.ctx.borrow();
fn add_member(&mut self, members: &[&AccountId]) -> Result<Vec<Event>, ChatError> {
let ctx_ref = self.ctx.lock().unwrap();
let provider = ctx_ref.provider();
if members.len() > 50 {
@ -369,7 +370,7 @@ where
// The account_id is kept so invites can be addressed properly
let keypkgs = members
.iter()
.map(|ident| self.key_package_for_account(ident))
.map(|ident| self.key_package_for_account(ident, provider))
.collect::<Result<Vec<_>, ChatError>>()?;
let (commit, welcome, _group_info) = self
@ -381,7 +382,7 @@ where
// TODO: (P3) Evaluate privacy/performance implications of an aggregated Welcome for multiple users
for account_id in members {
ctx_ref.invite_user(&mut *self.ds.borrow_mut(), account_id, &welcome)?;
ctx_ref.invite_user(&*self.ds, account_id, &welcome)?;
}
let encrypted_payload = EncryptedPayload {
@ -398,20 +399,24 @@ where
// TODO: (P1) Make GroupConvos agnostic to framing so its less error prone and more
let env = addr_enc_payload.into_envelope(self.convo_id.clone());
self.ds
.borrow_mut()
.publish(env)
.map_err(|e| ChatError::Generic(format!("Publish: {e}")))
let mut events = Vec::new();
if let Err(e) = self.ds.publish(env) {
tracing::warn!("commit publish failed for group {}: {e}", self.convo_id);
events.push(Event::transport_failure(Arc::from(self.id())));
}
Ok(events)
}
fn send_content(&mut self, content: &[u8]) -> Result<(), ChatError> {
fn send_content(&mut self, content: &[u8]) -> Result<Vec<Event>, ChatError> {
let payloads = self.send_message(content)?;
let mut events = Vec::new();
for payload in payloads {
self.ds
.borrow_mut()
.publish(payload.into_envelope(self.id().into()))
.map_err(|e| ChatError::Delivery(e.to_string()))?;
let envelope = payload.into_envelope(self.id().into());
if let Err(e) = self.ds.publish(envelope) {
tracing::warn!("publish failed for group {}: {e}", self.convo_id);
events.push(Event::transport_failure(Arc::from(self.id())));
}
}
Ok(())
Ok(events)
}
}

View File

@ -9,15 +9,17 @@ use chat_proto::logoschat::{
use crypto::{PrivateKey, PublicKey, SymmetricKey32};
use double_ratchets::{Header, InstallationKeyPair, RatchetState, restore_ratchet_state};
use prost::{Message, bytes::Bytes};
use std::{cell::RefCell, fmt::Debug, rc::Rc, sync::Arc};
use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use storage::{ConversationKind, ConversationMeta, ConversationStore};
use crate::{
context::ConversationIdOwned,
context::{ConversationIdOwned, PRIVATE_V1_INBOX_ADDRESS},
conversation::{ChatError, ConversationId, Convo, Id},
errors::EncryptionError,
event::Event,
proto,
types::{AddressedEncryptedPayload, ContentData},
types::AddressedEncryptedPayload,
utils::timestamp_millis,
};
use double_ratchets::{to_ratchet_record, to_skipped_key_records};
@ -60,18 +62,18 @@ pub struct PrivateV1Convo<S: ConversationStore + RatchetStore> {
local_convo_id: String,
remote_convo_id: String,
dr_state: RatchetState,
store: Rc<RefCell<S>>,
store: Arc<Mutex<S>>,
}
impl<S: ConversationStore + RatchetStore> PrivateV1Convo<S> {
/// Reconstructs a PrivateV1Convo from persisted metadata and ratchet state.
pub fn new(
store: Rc<RefCell<S>>,
store: Arc<Mutex<S>>,
local_convo_id: String,
remote_convo_id: String,
) -> Result<Self, ChatError> {
let dr_record = store.borrow().load_ratchet_state(&local_convo_id)?;
let skipped_keys = store.borrow().load_skipped_keys(&local_convo_id)?;
let dr_record = store.lock().unwrap().load_ratchet_state(&local_convo_id)?;
let skipped_keys = store.lock().unwrap().load_skipped_keys(&local_convo_id)?;
let dr_state: RatchetState = restore_ratchet_state(dr_record, skipped_keys);
Ok(Self {
@ -83,7 +85,7 @@ impl<S: ConversationStore + RatchetStore> PrivateV1Convo<S> {
}
pub fn new_initiator(
store: Rc<RefCell<S>>,
store: Arc<Mutex<S>>,
seed_key: SymmetricKey32,
remote: PublicKey,
) -> Self {
@ -106,7 +108,7 @@ impl<S: ConversationStore + RatchetStore> PrivateV1Convo<S> {
}
pub fn new_responder(
store: Rc<RefCell<S>>,
store: Arc<Mutex<S>>,
seed_key: SymmetricKey32,
dh_self: &PrivateKey,
) -> Self {
@ -182,12 +184,11 @@ impl<S: ConversationStore + RatchetStore> PrivateV1Convo<S> {
}
// Handler for application content
fn handle_content(&self, data: Vec<u8>) -> Option<ContentData> {
Some(ContentData {
conversation_id: self.id().into(),
fn handle_content(&self, data: Vec<u8>) -> Vec<Event> {
vec![Event::MessageReceived {
conversation_id: Arc::from(self.id()),
data,
is_new_convo: false,
})
}]
}
/// Persists a conversation's metadata and ratchet state to DB.
@ -197,8 +198,8 @@ impl<S: ConversationStore + RatchetStore> PrivateV1Convo<S> {
remote_convo_id: self.remote_id(),
kind: self.convo_type(),
};
self.store.borrow_mut().save_conversation(&convo_info)?;
self.save_ratchet_state(&mut *self.store.borrow_mut())?;
self.store.lock().unwrap().save_conversation(&convo_info)?;
self.save_ratchet_state(&mut *self.store.lock().unwrap())?;
Ok(Arc::from(self.id()))
}
@ -230,18 +231,15 @@ impl<S: ConversationStore + RatchetStore> Convo for PrivateV1Convo<S> {
let data = self.encrypt(frame);
self.save_ratchet_state::<S>(&mut *self.store.borrow_mut())?;
self.save_ratchet_state::<S>(&mut *self.store.lock().unwrap())?;
Ok(vec![AddressedEncryptedPayload {
delivery_address: "delivery_address".into(),
delivery_address: PRIVATE_V1_INBOX_ADDRESS.into(),
data,
}])
}
fn handle_frame(
&mut self,
encoded_payload: EncryptedPayload,
) -> Result<Option<ContentData>, ChatError> {
fn handle_frame(&mut self, encoded_payload: EncryptedPayload) -> Result<Vec<Event>, ChatError> {
// Extract expected frame
let frame = self
.decrypt(encoded_payload)
@ -251,12 +249,12 @@ impl<S: ConversationStore + RatchetStore> Convo for PrivateV1Convo<S> {
return Err(ChatError::ProtocolExpectation("None", "Some".into()));
};
self.save_ratchet_state(&mut *self.store.borrow_mut())?;
self.save_ratchet_state(&mut *self.store.lock().unwrap())?;
// Handle FrameTypes
let output = match frame_type {
FrameType::Content(bytes) => self.handle_content(bytes.into()),
FrameType::Placeholder(_) => None,
FrameType::Placeholder(_) => Vec::new(),
};
Ok(output)
@ -291,11 +289,11 @@ mod tests {
let saro = PrivateKey::random();
let raya = PrivateKey::random();
let saro_storage = Rc::new(RefCell::new(
let saro_storage = Arc::new(Mutex::new(
ChatStorage::new(StorageConfig::InMemory).unwrap(),
));
let raya_storage = Rc::new(RefCell::new(
let raya_storage = Arc::new(Mutex::new(
ChatStorage::new(StorageConfig::InMemory).unwrap(),
));

View File

@ -0,0 +1,56 @@
//! Observable events surfaced to the application layer. See
//! `docs/adr/0001-client-event-system.md`.
use crate::conversation::ConversationIdOwned;
/// Opaque correlation handle for outbound envelopes. Reserved for future
/// delivery-receipt support; no path produces a non-`None` value yet.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct EnvelopeId([u8; 16]);
impl EnvelopeId {
pub fn as_bytes(&self) -> &[u8; 16] {
&self.0
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum Event {
#[non_exhaustive]
ConversationStarted {
conversation_id: ConversationIdOwned,
},
#[non_exhaustive]
MessageReceived {
conversation_id: ConversationIdOwned,
data: Vec<u8>,
},
#[non_exhaustive]
DeliveryFailed {
conversation_id: ConversationIdOwned,
/// `None` when the failure isn't tied to a specific outbound envelope.
envelope_id: Option<EnvelopeId>,
reason: FailureReason,
},
}
impl Event {
pub fn transport_failure(conversation_id: ConversationIdOwned) -> Self {
Self::DeliveryFailed {
conversation_id,
envelope_id: None,
reason: FailureReason::Transport,
}
}
pub fn is_delivery_failure(&self) -> bool {
matches!(self, Self::DeliveryFailed { .. })
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum FailureReason {
Transport,
}

View File

@ -3,30 +3,30 @@ use chat_proto::logoschat::encryption::EncryptedPayload;
use prost::Message;
use prost::bytes::Bytes;
use rand_core::OsRng;
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use storage::{ConversationStore, EphemeralKeyStore, RatchetStore};
use crypto::{PrekeyBundle, SymmetricKey32};
use crate::context::Introduction;
use crate::context::{Introduction, PRIVATE_V1_INBOX_ADDRESS};
use crate::conversation::{ChatError, Conversation, ConversationId, Convo, Id, PrivateV1Convo};
use crate::crypto::{CopyBytes, PrivateKey, PublicKey};
use crate::event::Event;
use crate::inbox::handshake::InboxHandshake;
use crate::proto;
use crate::types::{AddressedEncryptedPayload, ContentData};
use crate::types::AddressedEncryptedPayload;
use crypto::Identity;
/// Compute the deterministic Delivery_address for an installation
// TODO: Derive per-installation address; today every PrivateV1 client shares
// the same one.
fn delivery_address_for_installation(_: PublicKey) -> String {
// TODO: Implement Delivery Address
"delivery_address".into()
PRIVATE_V1_INBOX_ADDRESS.into()
}
pub struct Inbox<S: EphemeralKeyStore> {
ident: Rc<Identity>,
ident: Arc<Identity>,
local_convo_id: String,
store: Rc<RefCell<S>>,
store: Arc<Mutex<S>>,
}
impl<S: EphemeralKeyStore> std::fmt::Debug for Inbox<S> {
@ -39,7 +39,7 @@ impl<S: EphemeralKeyStore> std::fmt::Debug for Inbox<S> {
}
impl<S: EphemeralKeyStore> Inbox<S> {
pub fn new(store: Rc<RefCell<S>>, ident: Rc<Identity>) -> Self {
pub fn new(store: Arc<Mutex<S>>, ident: Arc<Identity>) -> Self {
let local_convo_id = Self::inbox_identifier_for_key(ident.public_key());
Self {
ident,
@ -57,7 +57,8 @@ impl<S: EphemeralKeyStore> Inbox<S> {
let public_key_hex = hex::encode(ephemeral_key.as_bytes());
self.store
.borrow_mut()
.lock()
.unwrap()
.save_ephemeral_key(&public_key_hex, &ephemeral)?;
let intro = Introduction::new(self.ident.secret(), ephemeral_key, OsRng);
@ -68,7 +69,7 @@ impl<S: EphemeralKeyStore> Inbox<S> {
&self,
remote_bundle: &Introduction,
initial_message: &[u8],
private_store: Rc<RefCell<PS>>,
private_store: Arc<Mutex<PS>>,
) -> Result<(PrivateV1Convo<PS>, Vec<AddressedEncryptedPayload>), ChatError> {
let mut rng = OsRng;
@ -120,16 +121,19 @@ impl<S: EphemeralKeyStore> Inbox<S> {
}
/// Handles an incoming inbox frame. The caller must provide the ephemeral private key
/// looked up from storage. Returns the created conversation and optional content data.
/// looked up from storage. Returns the created conversation and the events
/// observed while processing the invite (a `ConversationStarted` followed by
/// any events from the embedded initial frame).
pub fn handle_frame<PS: ConversationStore + RatchetStore>(
&self,
enc_payload: EncryptedPayload,
public_key_hex: &str,
private_store: Rc<RefCell<PS>>,
) -> Result<(Conversation<PS>, Option<ContentData>), ChatError> {
private_store: Arc<Mutex<PS>>,
) -> Result<(Conversation<PS>, Vec<Event>), ChatError> {
let ephemeral_key = self
.store
.borrow()
.lock()
.unwrap()
.load_ephemeral_key(public_key_hex)?
.ok_or(ChatError::UnknownEphemeralKey())?;
@ -152,16 +156,12 @@ impl<S: EphemeralKeyStore> Inbox<S> {
return Err(ChatError::Protocol("missing initial encpayload".into()));
};
// Set is_new_convo for content data
let content = match convo.handle_frame(enc_payload)? {
Some(v) => ContentData {
is_new_convo: true,
..v
},
None => return Err(ChatError::Protocol("expected contentData".into())),
};
let mut events = vec![Event::ConversationStarted {
conversation_id: Arc::from(convo.id()),
}];
events.extend(convo.handle_frame(enc_payload)?);
Ok((Conversation::Private(convo), Some(content)))
Ok((Conversation::Private(convo), events))
}
}
}
@ -257,25 +257,23 @@ impl<S: EphemeralKeyStore> Id for Inbox<S> {
#[cfg(test)]
mod tests {
use std::cell::RefCell;
use super::*;
use chat_sqlite::{ChatStorage, StorageConfig};
#[test]
fn test_invite_privatev1_roundtrip() {
let saro_storage = Rc::new(RefCell::new(
let saro_storage = Arc::new(Mutex::new(
ChatStorage::new(StorageConfig::InMemory).unwrap(),
));
let raya_storage = Rc::new(RefCell::new(
let raya_storage = Arc::new(Mutex::new(
ChatStorage::new(StorageConfig::InMemory).unwrap(),
));
let saro_ident = Identity::new("saro");
let saro_inbox = Inbox::new(Rc::clone(&saro_storage), saro_ident.into());
let saro_inbox = Inbox::new(Arc::clone(&saro_storage), saro_ident.into());
let raya_ident = Identity::new("raya");
let raya_inbox = Inbox::new(Rc::clone(&raya_storage), raya_ident.into());
let raya_inbox = Inbox::new(Arc::clone(&raya_storage), raya_ident.into());
let bundle = raya_inbox.create_intro_bundle().unwrap();

View File

@ -1,5 +1,4 @@
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use chat_proto::logoschat::envelope::EnvelopeV1;
use openmls::prelude::tls_codec::Serialize;
@ -16,7 +15,8 @@ use crate::RegistrationService;
use crate::account::LogosAccount;
use crate::conversation::GroupConvo;
use crate::conversation::group_v1::MlsContext;
use crate::conversation::{GroupV1Convo, IdentityProvider};
use crate::conversation::{GroupV1Convo, Id, IdentityProvider};
use crate::event::Event;
use crate::types::AccountId;
use crate::utils::{blake2b_hex, hash_size};
pub struct PqMlsContext {
@ -37,7 +37,7 @@ impl MlsContext for PqMlsContext {
fn invite_user<DS: DeliveryService>(
&self,
ds: &mut DS,
ds: &DS,
account_id: &AccountId,
welcome: &MlsMessageOut,
) -> Result<(), ChatError> {
@ -79,10 +79,10 @@ fn conversation_id_for(account_id: &AccountId) -> String {
/// such as MLS.
pub struct InboxV2<DS, RS, CS> {
account_id: AccountId,
ds: Rc<RefCell<DS>>,
reg_service: Rc<RefCell<RS>>,
store: Rc<RefCell<CS>>,
ctx: Rc<RefCell<PqMlsContext>>,
ds: Arc<DS>,
reg_service: Arc<Mutex<RS>>,
store: Arc<Mutex<CS>>,
ctx: Arc<Mutex<PqMlsContext>>,
}
impl<DS, CS, RS> InboxV2<DS, RS, CS>
@ -93,9 +93,9 @@ where
{
pub fn new(
account: LogosAccount,
ds: Rc<RefCell<DS>>,
reg_service: Rc<RefCell<RS>>,
store: Rc<RefCell<CS>>,
ds: Arc<DS>,
reg_service: Arc<Mutex<RS>>,
store: Arc<Mutex<CS>>,
) -> Self {
let account_id = account.account_id().clone();
let provider = LibcruxProvider::new().unwrap();
@ -104,7 +104,7 @@ where
ds,
reg_service,
store,
ctx: Rc::new(RefCell::new(PqMlsContext {
ctx: Arc::new(Mutex::new(PqMlsContext {
ident_provider: account,
provider,
})),
@ -122,9 +122,10 @@ where
// TODO: (P3) Each keypackage can only be used once either enable...
// "LastResort" package or publish multiple
self.reg_service
.borrow_mut()
.lock()
.unwrap()
.register(
&self.ctx.borrow().ident_provider.friendly_name(),
&self.ctx.lock().unwrap().ident_provider.friendly_name(),
keypackage_bytes,
)
.map_err(ChatError::generic)
@ -142,7 +143,7 @@ where
GroupV1Convo::new(self.ctx.clone(), self.ds.clone(), self.reg_service.clone())
}
pub fn handle_frame(&self, payload_bytes: &[u8]) -> Result<(), ChatError> {
pub fn handle_frame(&self, payload_bytes: &[u8]) -> Result<Vec<Event>, ChatError> {
let inbox_frame = InboxV2Frame::decode(payload_bytes)?;
let Some(payload) = inbox_frame.payload else {
@ -156,7 +157,7 @@ where
}
}
fn persist_convo(&self, convo: impl GroupConvo<DS, RS>) -> Result<(), ChatError> {
fn persist_convo(&self, convo: &impl GroupConvo<DS, RS>) -> Result<(), ChatError> {
// TODO: (P2) Remove remote_convo_id this is an implementation detail specific to PrivateV1
// TODO: (P3) Implement From<Convo> for ConversationMeta
let meta = ConversationMeta {
@ -164,12 +165,12 @@ where
remote_convo_id: "0".into(),
kind: storage::ConversationKind::GroupV1,
};
self.store.borrow_mut().save_conversation(&meta)?;
self.store.lock().unwrap().save_conversation(&meta)?;
// TODO: (P1) Persist state
Ok(())
}
fn handle_heavy_invite(&self, invite: GroupV1HeavyInvite) -> Result<(), ChatError> {
fn handle_heavy_invite(&self, invite: GroupV1HeavyInvite) -> Result<Vec<Event>, ChatError> {
let (msg_in, _rest) = MlsMessageIn::tls_deserialize_bytes(invite.welcome_bytes.as_slice())?;
let MlsMessageBodyIn::Welcome(welcome) = msg_in.extract() else {
@ -185,11 +186,13 @@ where
self.reg_service.clone(),
welcome,
)?;
self.persist_convo(convo)
let conversation_id = Arc::from(convo.id());
self.persist_convo(&convo)?;
Ok(vec![Event::ConversationStarted { conversation_id }])
}
fn create_keypackage(&self) -> Result<KeyPackage, ChatError> {
let ctx_borrow = self.ctx.borrow();
let ctx_borrow = self.ctx.lock().unwrap();
let capabilities = Capabilities::builder()
.ciphersuites(vec![
Ciphersuite::MLS_256_XWING_CHACHA20POLY1305_SHA256_Ed25519,

View File

@ -3,6 +3,7 @@ mod context;
mod conversation;
mod crypto;
mod errors;
mod event;
mod inbox;
mod inbox_v2;
mod proto;
@ -16,6 +17,7 @@ pub use chat_sqlite::StorageConfig;
pub use context::{Context, ConversationId, ConversationIdOwned, Introduction};
pub use conversation::GroupConvo;
pub use errors::ChatError;
pub use service_traits::{DeliveryService, RegistrationService};
pub use types::{AccountId, AddressedEnvelope, ContentData};
pub use event::{EnvelopeId, Event, FailureReason};
pub use service_traits::{DeliveryService, RegistrationService, drain_inbound};
pub use types::{AccountId, AddressedEnvelope};
pub use utils::hex_trunc;

View File

@ -1,18 +1,31 @@
/// Service traits define the functionality which must be externally supplied by
/// platform clients. Platforms can alter the behaviour of the chat core by supplying
/// different implementations.
use std::sync::{Mutex, mpsc};
use std::{fmt::Debug, fmt::Display};
use crate::types::{AccountId, AddressedEnvelope};
pub fn drain_inbound(rx: &Mutex<mpsc::Receiver<Vec<u8>>>) -> Vec<Vec<u8>> {
let rx = rx.lock().unwrap();
let mut out = Vec::new();
while let Ok(bytes) = rx.try_recv() {
out.push(bytes);
}
out
}
/// A Delivery service is responsible for payload transport.
/// This interface allows Conversations to send payloads on the wire as well as
/// register interest in delivery_addresses. Client implementations are responsible
/// for providing the inbound payloads to Context::handle_payload.
pub trait DeliveryService: Debug {
/// This interface allows Conversations to send payloads on the wire, register
/// interest in delivery_addresses, and pull inbound payloads.
pub trait DeliveryService: Debug + Send + Sync {
type Error: Display + Debug;
fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Self::Error>;
fn subscribe(&mut self, delivery_address: &str) -> Result<(), Self::Error>;
fn publish(&self, envelope: AddressedEnvelope) -> Result<(), Self::Error>;
fn subscribe(&self, delivery_address: &str) -> Result<(), Self::Error>;
/// Return every inbound payload that has arrived since the last call.
/// Non-blocking; returns an empty vec when nothing is available.
fn pull(&self) -> Vec<Vec<u8>>;
}
/// Manages key bundle storage for MLS group creation/addition while contacts are

View File

@ -48,15 +48,6 @@ impl Debug for AddressedEnvelope {
}
}
// This struct represents the result of processed inbound data.
// It wraps content payload with a conversation_id
#[derive(Debug)]
pub struct ContentData {
pub conversation_id: String,
pub data: Vec<u8>,
pub is_new_convo: bool,
}
// Internal type Definitions
// Used by Conversations to attach addresses to outbound encrypted payloads

View File

@ -1,37 +1,37 @@
use std::ops::{Deref, DerefMut};
use components::{EphemeralRegistry, LocalBroadcaster, MemStore};
use libchat::{ContentData, Context, GroupConvo, hex_trunc};
use libchat::{Context, DeliveryService, Event, GroupConvo, hex_trunc};
// Simple client Functionality for testing
struct Client {
inner: Context<LocalBroadcaster, EphemeralRegistry, MemStore>,
on_content: Option<Box<dyn Fn(ContentData)>>,
on_event: Option<Box<dyn Fn(Event)>>,
}
impl Client {
fn init(
ctx: Context<LocalBroadcaster, EphemeralRegistry, MemStore>,
cb: Option<impl Fn(ContentData) + 'static>,
cb: Option<impl Fn(Event) + 'static>,
) -> Self {
Client {
inner: ctx,
on_content: cb.map(|f| Box::new(f) as Box<dyn Fn(ContentData)>),
on_event: cb.map(|f| Box::new(f) as Box<dyn Fn(Event)>),
}
}
fn process_messages(&mut self) {
let messages: Vec<_> = {
let mut ds = self.ds();
std::iter::from_fn(|| ds.poll()).collect()
let ds = self.ds();
ds.pull()
};
for data in messages {
let res = self.handle_payload(&data).unwrap();
if let Some(cb) = &self.on_content
&& let Some(content_data) = res
{
cb(content_data);
let events = self.handle_payload(&data).unwrap();
if let Some(cb) = &self.on_event {
for event in events {
cb(event);
}
}
}
}
@ -60,12 +60,25 @@ impl DerefMut for Client {
}
// Higher order function to handle printing
fn pretty_print(prefix: impl Into<String>) -> Box<dyn Fn(ContentData)> {
fn pretty_print(prefix: impl Into<String>) -> Box<dyn Fn(Event)> {
let prefix = prefix.into();
Box::new(move |c: ContentData| {
let cid = hex_trunc(c.conversation_id.as_bytes());
let content = String::from_utf8(c.data).unwrap();
println!("{} ({:?}) {}", prefix, cid, content)
Box::new(move |e: Event| match e {
Event::ConversationStarted {
conversation_id, ..
} => {
let cid = hex_trunc(conversation_id.as_bytes());
println!("{prefix} ({cid:?}) [conversation started]");
}
Event::MessageReceived {
conversation_id,
data,
..
} => {
let cid = hex_trunc(conversation_id.as_bytes());
let content = String::from_utf8(data).unwrap();
println!("{prefix} ({cid:?}) {content}");
}
_ => {}
})
}
@ -93,7 +106,7 @@ fn create_group() {
const RAYA: usize = 1;
let raya_id = clients[RAYA].account_id().clone();
let s_convo = clients[SARO].create_group_convo(&[&raya_id]).unwrap();
let (s_convo, _events) = clients[SARO].create_group_convo(&[&raya_id]).unwrap();
let convo_id = s_convo.id();
@ -141,3 +154,49 @@ fn create_group() {
process(&mut clients);
}
/// Regression for the silent-group-join bug fixed by the event system: when
/// Saro creates a group with Raya, Raya processes a Welcome message that
/// carries no application content. The application must still observe a
/// `ConversationStarted` event so the new group becomes visible.
#[test]
fn group_join_emits_conversation_started() {
let ds = LocalBroadcaster::new();
let rs = EphemeralRegistry::new();
let mut saro =
Context::new_with_name("saro", ds.new_consumer(), rs.clone(), MemStore::new()).unwrap();
let mut raya = Context::new_with_name("raya", ds, rs, MemStore::new()).unwrap();
let raya_account_id = raya.account_id().clone();
let (group_convo, _events) = saro.create_group_convo(&[&raya_account_id]).unwrap();
let expected_group_id = group_convo.id().to_string();
// Drain everything Raya's transport produced and collect every event.
let payloads: Vec<_> = {
let ds = raya.ds();
ds.pull()
};
let mut events = Vec::new();
for data in payloads {
events.extend(raya.handle_payload(&data).unwrap());
}
// Welcome carries no content, so we expect exactly one ConversationStarted
// and nothing else. Prior to the bug fix Raya received Ok(None) and the
// new group was invisible to the application layer.
assert_eq!(
events.len(),
1,
"expected exactly one event, got {events:?}"
);
match &events[0] {
Event::ConversationStarted {
conversation_id, ..
} => {
assert_eq!(conversation_id.as_ref(), expected_group_id.as_str());
}
other => panic!("expected ConversationStarted, got {other:?}"),
}
}

View File

@ -1,24 +1,54 @@
use chat_sqlite::{ChatStorage, StorageConfig};
use libchat::{Context, Introduction};
use libchat::{Context, ConversationIdOwned, DeliveryService, Event, Introduction};
use storage::{ConversationStore, IdentityStore};
use tempfile::tempdir;
use components::{EphemeralRegistry, LocalBroadcaster};
fn poll_one(ctx: &Context<LocalBroadcaster, EphemeralRegistry, ChatStorage>) -> Vec<u8> {
ctx.ds()
.pull()
.into_iter()
.next()
.expect("expected payload in delivery queue")
}
fn send_and_verify(
sender: &mut Context<LocalBroadcaster, EphemeralRegistry, ChatStorage>,
receiver: &mut Context<LocalBroadcaster, EphemeralRegistry, ChatStorage>,
convo_id: &str,
content: &[u8],
) {
let payloads = sender.send_content(convo_id, content).unwrap();
let payload = payloads.first().unwrap();
let received = receiver
.handle_payload(&payload.data)
.unwrap()
.expect("expected content");
assert_eq!(content, received.data.as_slice());
assert!(!received.is_new_convo);
let events = sender.send_content(convo_id, content).unwrap();
assert!(events.is_empty(), "unexpected send events: {events:?}");
let payload = poll_one(receiver);
let events = receiver.handle_payload(&payload).unwrap();
match events.as_slice() {
[Event::MessageReceived { data, .. }] => assert_eq!(data.as_slice(), content),
other => panic!("expected [MessageReceived], got {other:?}"),
}
}
fn expect_invite(events: &[Event], expected_data: &[u8]) -> ConversationIdOwned {
match events {
[
Event::ConversationStarted {
conversation_id: started,
..
},
Event::MessageReceived {
conversation_id: received,
data,
..
},
] => {
assert_eq!(started, received);
assert_eq!(data.as_slice(), expected_data);
started.clone()
}
other => panic!("expected [ConversationStarted, MessageReceived], got {other:?}"),
}
}
#[test]
@ -36,18 +66,13 @@ fn ctx_integration() {
// Saro initiates conversation with Raya
let mut content = vec![10];
let (saro_convo_id, payloads) = saro.create_private_convo(&intro, &content).unwrap();
let (saro_convo_id, events) = saro.create_private_convo(&intro, &content).unwrap();
assert!(events.is_empty(), "unexpected create events: {events:?}");
// Raya receives initial message
let payload = payloads.first().unwrap();
let initial_content = raya
.handle_payload(&payload.data)
.unwrap()
.expect("expected initial content");
let raya_convo_id = initial_content.conversation_id;
assert_eq!(content, initial_content.data);
assert!(initial_content.is_new_convo);
let payload = poll_one(&raya);
let events = raya.handle_payload(&payload).unwrap();
let raya_convo_id = expect_invite(&events, &content);
// Exchange messages back and forth
for _ in 0..10 {
@ -68,8 +93,6 @@ fn identity_persistence() {
let pubkey1 = ctx1.identity().public_key();
let name1 = ctx1.installation_name().to_string();
// For persistence tests with file-based storage, we'd need a shared db.
// With in-memory, we just verify the identity was created.
assert_eq!(name1, "alice");
assert!(!pubkey1.as_bytes().iter().all(|&b| b == 0));
}
@ -104,11 +127,12 @@ fn conversation_metadata_persistence() {
let bundle = alice.create_intro_bundle().unwrap();
let intro = Introduction::try_from(bundle.as_slice()).unwrap();
let (_, payloads) = bob.create_private_convo(&intro, b"hi").unwrap();
let (_, events) = bob.create_private_convo(&intro, b"hi").unwrap();
assert!(events.is_empty());
let payload = payloads.first().unwrap();
let content = alice.handle_payload(&payload.data).unwrap().unwrap();
assert!(content.is_new_convo);
let payload = poll_one(&alice);
let events = alice.handle_payload(&payload).unwrap();
expect_invite(&events, b"hi");
let convos = alice.store().load_conversations().unwrap();
assert_eq!(convos.len(), 1);
@ -125,39 +149,44 @@ fn conversation_full_flow() {
let bundle = alice.create_intro_bundle().unwrap();
let intro = Introduction::try_from(bundle.as_slice()).unwrap();
let (bob_convo_id, payloads) = bob.create_private_convo(&intro, b"hello").unwrap();
let (bob_convo_id, events) = bob.create_private_convo(&intro, b"hello").unwrap();
assert!(events.is_empty());
let payload = payloads.first().unwrap();
let content = alice.handle_payload(&payload.data).unwrap().unwrap();
let alice_convo_id = content.conversation_id;
let payload = poll_one(&alice);
let events = alice.handle_payload(&payload).unwrap();
let alice_convo_id = expect_invite(&events, b"hello");
let payloads = alice.send_content(&alice_convo_id, b"reply 1").unwrap();
let payload = payloads.first().unwrap();
bob.handle_payload(&payload.data).unwrap().unwrap();
let events = alice.send_content(&alice_convo_id, b"reply 1").unwrap();
assert!(events.is_empty());
let payload = poll_one(&bob);
bob.handle_payload(&payload).unwrap();
let payloads = bob.send_content(&bob_convo_id, b"reply 2").unwrap();
let payload = payloads.first().unwrap();
alice.handle_payload(&payload.data).unwrap().unwrap();
let events = bob.send_content(&bob_convo_id, b"reply 2").unwrap();
assert!(events.is_empty());
let payload = poll_one(&alice);
alice.handle_payload(&payload).unwrap();
// Verify conversation list
let convo_ids = alice.list_conversations().unwrap();
assert_eq!(convo_ids.len(), 1);
// Continue exchanging messages
let payloads = bob.send_content(&bob_convo_id, b"more messages").unwrap();
let payload = payloads.first().unwrap();
let content = alice
.handle_payload(&payload.data)
.expect("should decrypt")
.expect("should have content");
assert_eq!(content.data, b"more messages");
let events = bob.send_content(&bob_convo_id, b"more messages").unwrap();
assert!(events.is_empty());
let payload = poll_one(&alice);
let events = alice.handle_payload(&payload).expect("should decrypt");
match events.as_slice() {
[Event::MessageReceived { data, .. }] => assert_eq!(data.as_slice(), b"more messages"),
other => panic!("expected [MessageReceived], got {other:?}"),
}
// Alice can also send back
let payloads = alice.send_content(&alice_convo_id, b"alice reply").unwrap();
let payload = payloads.first().unwrap();
let content = bob
.handle_payload(&payload.data)
.unwrap()
.expect("bob should receive");
assert_eq!(content.data, b"alice reply");
let events = alice.send_content(&alice_convo_id, b"alice reply").unwrap();
assert!(events.is_empty());
let payload = poll_one(&bob);
let events = bob.handle_payload(&payload).unwrap();
match events.as_slice() {
[Event::MessageReceived { data, .. }] => assert_eq!(data.as_slice(), b"alice reply"),
other => panic!("expected [MessageReceived], got {other:?}"),
}
}

View File

@ -1,8 +1,9 @@
/*
* message-exchange: Saro-Raya message exchange written entirely in C.
*
* Demonstrates that the client-ffi C API is straightforward to consume
* directly no Rust glue required. Build with the provided Makefile.
* Demonstrates the push-inbound / drain-events flow of the client-ffi API:
* the C consumer pushes received bytes into the client and drains observed
* events back out. The translator thread inside the client does the work.
*/
#include "client_ffi.h"
@ -16,8 +17,6 @@
/* ------------------------------------------------------------------
* Convenience macros for building slice_ref_uint8_t values.
* SLICE(p, n) arbitrary pointer + length.
* STR(s) string literal (length computed at compile time).
* ------------------------------------------------------------------ */
#define SLICE(p, n) ((slice_ref_uint8_t){ .ptr = (const uint8_t *)(p), .len = (n) })
@ -83,19 +82,20 @@ static int32_t deliver_cb(
}
/* ------------------------------------------------------------------
* Helper: pop one envelope from the bus and push it into receiver.
* Returns a heap-allocated result; caller frees with
* push_inbound_result_free().
* Helper: pop one envelope from the bus, push it into `receiver`, then
* drain whatever events come out. Caller frees the returned EventList.
* ------------------------------------------------------------------ */
static PushInboundResult_t *route(ClientHandle_t *receiver)
static EventList_t *route(ClientHandle_t *receiver)
{
const uint8_t *data;
size_t len;
int ok = queue_pop(&bus, &data, &len);
assert(ok && "expected an envelope in the bus");
PushInboundResult_t *r = client_receive(receiver, SLICE(data, len));
assert(push_inbound_result_error_code(r) == 0 && "push_inbound failed");
int rc = client_push_inbound(receiver, SLICE(data, len));
assert(rc == 0 && "client_push_inbound failed");
EventList_t *r = client_drain_events(receiver, 5000);
assert(event_list_error_code(r) == 0 && "drain_events failed");
return r;
}
@ -125,19 +125,20 @@ int main(void)
assert(create_convo_result_error_code(saro_convo) == 0);
create_intro_result_free(raya_intro);
/* Route saro -> raya */
PushInboundResult_t *recv = route(raya);
/* Route saro -> raya. Welcome carries [ConversationStarted, MessageReceived]. */
EventList_t *recv = route(raya);
assert(push_inbound_result_has_content(recv) && "expected content from saro");
assert(push_inbound_result_is_new_convo(recv) && "expected new-conversation flag");
assert(event_list_len(recv) == 2 && "expected 2 events");
assert(event_list_tag(recv, 0) == EVENT_TAG_CONVERSATION_STARTED);
assert(event_list_tag(recv, 1) == EVENT_TAG_MESSAGE_RECEIVED);
slice_ref_uint8_t content = push_inbound_result_content(recv);
slice_ref_uint8_t cid_ref = event_list_conversation_id(recv, 0);
slice_ref_uint8_t content = event_list_message_data(recv, 1);
assert(content.len == 10);
assert(memcmp(content.ptr, "hello raya", 10) == 0);
printf("Raya received: \"%.*s\"\n", (int)content.len, content.ptr);
/* Copy Raya's convo_id before freeing recv */
slice_ref_uint8_t cid_ref = push_inbound_result_convo_id(recv);
uint8_t raya_cid[256];
size_t raya_cid_len = cid_ref.len;
if (raya_cid_len >= sizeof(raya_cid)) {
@ -145,7 +146,7 @@ int main(void)
return 1;
}
memcpy(raya_cid, cid_ref.ptr, raya_cid_len);
push_inbound_result_free(recv);
event_list_free(recv);
/* Raya replies */
ErrorCode_t rc = client_send_message(
@ -153,13 +154,13 @@ int main(void)
assert(rc == ERROR_CODE_NONE);
recv = route(saro);
assert(push_inbound_result_has_content(recv) && "expected content from raya");
assert(!push_inbound_result_is_new_convo(recv) && "unexpected new-convo flag");
content = push_inbound_result_content(recv);
assert(event_list_len(recv) == 1);
assert(event_list_tag(recv, 0) == EVENT_TAG_MESSAGE_RECEIVED);
content = event_list_message_data(recv, 0);
assert(content.len == 7);
assert(memcmp(content.ptr, "hi saro", 7) == 0);
printf("Saro received: \"%.*s\"\n", (int)content.len, content.ptr);
push_inbound_result_free(recv);
event_list_free(recv);
/* Multiple back-and-forth rounds */
slice_ref_uint8_t saro_cid = create_convo_result_id(saro_convo);
@ -171,11 +172,12 @@ int main(void)
assert(rc == ERROR_CODE_NONE);
recv = route(raya);
assert(push_inbound_result_has_content(recv));
content = push_inbound_result_content(recv);
assert(event_list_len(recv) == 1);
assert(event_list_tag(recv, 0) == EVENT_TAG_MESSAGE_RECEIVED);
content = event_list_message_data(recv, 0);
assert((int)content.len == mlen);
assert(memcmp(content.ptr, msg, (size_t)mlen) == 0);
push_inbound_result_free(recv);
event_list_free(recv);
char reply[32];
int rlen = snprintf(reply, sizeof(reply), "reply %d", i);
@ -185,11 +187,12 @@ int main(void)
assert(rc == ERROR_CODE_NONE);
recv = route(saro);
assert(push_inbound_result_has_content(recv));
content = push_inbound_result_content(recv);
assert(event_list_len(recv) == 1);
assert(event_list_tag(recv, 0) == EVENT_TAG_MESSAGE_RECEIVED);
content = event_list_message_data(recv, 0);
assert((int)content.len == rlen);
assert(memcmp(content.ptr, reply, (size_t)rlen) == 0);
push_inbound_result_free(recv);
event_list_free(recv);
}
/* Cleanup */

View File

@ -1,8 +1,9 @@
use safer_ffi::prelude::*;
use std::sync::Arc;
use std::sync::{Arc, Mutex, mpsc};
use std::time::Duration;
use crate::delivery::{CDelivery, DeliverFn};
use logos_chat::{ChatClient, ClientError};
use logos_chat::{ChatClient, ClientError, Event};
// ---------------------------------------------------------------------------
// Opaque client handle
@ -10,7 +11,11 @@ use logos_chat::{ChatClient, ClientError};
#[derive_ReprC]
#[repr(opaque)]
pub struct ClientHandle(pub(crate) ChatClient<CDelivery>);
pub struct ClientHandle {
client: ChatClient<CDelivery>,
push_tx: mpsc::Sender<Vec<u8>>,
event_rx: Mutex<mpsc::Receiver<Event>>,
}
// ---------------------------------------------------------------------------
// Error codes
@ -46,12 +51,23 @@ pub struct CreateConvoResult {
#[derive_ReprC]
#[repr(opaque)]
pub struct PushInboundResult {
pub struct EventList {
error_code: i32,
has_content: bool,
is_new_convo: bool,
convo_id: Option<String>,
content: Option<Vec<u8>>,
events: Vec<Event>,
}
#[derive_ReprC]
#[repr(i32)]
pub enum EventTag {
/// A new conversation was started (responder side).
ConversationStarted = 0,
/// User content was received on an existing conversation.
MessageReceived = 1,
/// Delivery of a previously-sent envelope failed.
DeliveryFailed = 2,
/// Returned when the index is out of bounds or the variant is unknown to
/// this binary (e.g. a new `Event` variant from a newer library version).
Unknown = -1,
}
// ---------------------------------------------------------------------------
@ -60,6 +76,9 @@ pub struct PushInboundResult {
/// Create an ephemeral in-memory client. Returns NULL if `callback` is None or
/// `name` is not valid UTF-8. Free with `client_destroy`.
///
/// Inbound bytes are fed via `client_push_inbound`; events are consumed via
/// `client_drain_events`.
#[ffi_export]
fn client_create(
name: c_slice::Ref<'_, u8>,
@ -70,8 +89,16 @@ fn client_create(
Err(_) => return None,
};
callback?;
let delivery = CDelivery { callback };
Some(Box::new(ClientHandle(ChatClient::new(name_str, delivery))).into())
let (delivery, push_tx) = CDelivery::new(callback);
let (client, event_rx) = ChatClient::new(name_str, delivery);
Some(
Box::new(ClientHandle {
client,
push_tx,
event_rx: Mutex::new(event_rx),
})
.into(),
)
}
/// Free a client handle. Must not be used after this call.
@ -89,7 +116,7 @@ fn client_destroy(handle: repr_c::Box<ClientHandle>) {
#[ffi_export]
fn client_installation_name(handle: &ClientHandle) -> c_slice::Box<u8> {
handle
.0
.client
.installation_name()
.as_bytes()
.to_vec()
@ -110,7 +137,7 @@ fn client_installation_name_free(name: c_slice::Box<u8>) {
/// Free with `create_intro_result_free`.
#[ffi_export]
fn client_create_intro_bundle(handle: &mut ClientHandle) -> repr_c::Box<CreateIntroResult> {
let result = match handle.0.create_intro_bundle() {
let result = match handle.client.create_intro_bundle() {
Ok(bytes) => CreateIntroResult {
error_code: ErrorCode::None as i32,
data: Some(bytes),
@ -154,13 +181,20 @@ fn client_create_conversation(
content: c_slice::Ref<'_, u8>,
) -> repr_c::Box<CreateConvoResult> {
let result = match handle
.0
.client
.create_conversation(bundle.as_slice(), content.as_slice())
{
Ok(convo_id) => CreateConvoResult {
error_code: ErrorCode::None as i32,
convo_id: Some(convo_id.to_string()),
},
Ok((convo_id, events)) => {
let error_code = if events.iter().any(Event::is_delivery_failure) {
ErrorCode::DeliveryFail as i32
} else {
ErrorCode::None as i32
};
CreateConvoResult {
error_code,
convo_id: Some(convo_id.to_string()),
}
}
Err(ClientError::Chat(_)) => CreateConvoResult {
error_code: ErrorCode::BadIntro as i32,
convo_id: None,
@ -206,80 +240,117 @@ fn client_send_message(
Err(_) => return ErrorCode::BadUtf8,
};
let convo_id_owned: logos_chat::ConversationIdOwned = Arc::from(id_str);
match handle.0.send_message(&convo_id_owned, content.as_slice()) {
Ok(()) => ErrorCode::None,
match handle
.client
.send_message(&convo_id_owned, content.as_slice())
{
Ok(events) if events.iter().any(Event::is_delivery_failure) => ErrorCode::DeliveryFail,
Ok(_) => ErrorCode::None,
Err(ClientError::Delivery(_)) => ErrorCode::DeliveryFail,
Err(_) => ErrorCode::UnknownError,
}
}
// ---------------------------------------------------------------------------
// Push inbound
// Inbound + event drain
// ---------------------------------------------------------------------------
/// Decrypt an inbound payload. `has_content` is false for protocol frames.
/// Free with `push_inbound_result_free`.
/// Queue an inbound payload for processing. Events surfaced from it are
/// observed via `client_drain_events`. Returns 0 on success, negative on
/// shutdown.
#[ffi_export]
fn client_receive(
handle: &mut ClientHandle,
payload: c_slice::Ref<'_, u8>,
) -> repr_c::Box<PushInboundResult> {
let result = match handle.0.receive(payload.as_slice()) {
Ok(Some(cd)) => PushInboundResult {
fn client_push_inbound(handle: &mut ClientHandle, payload: c_slice::Ref<'_, u8>) -> i32 {
match handle.push_tx.send(payload.as_slice().to_vec()) {
Ok(()) => 0,
Err(_) => -1,
}
}
/// Wait up to `timeout_ms` for the next event and then drain everything else
/// that's currently buffered. Returns an `EventList` (possibly empty on
/// timeout). Free with `event_list_free`.
#[ffi_export]
fn client_drain_events(handle: &mut ClientHandle, timeout_ms: u64) -> repr_c::Box<EventList> {
let rx = handle.event_rx.lock().unwrap();
let timeout = Duration::from_millis(timeout_ms);
let Ok(first) = rx.recv_timeout(timeout) else {
return Box::new(EventList {
error_code: ErrorCode::None as i32,
has_content: true,
is_new_convo: cd.is_new_convo,
convo_id: Some(cd.conversation_id),
content: Some(cd.data),
},
Ok(None) => PushInboundResult {
error_code: ErrorCode::None as i32,
has_content: false,
is_new_convo: false,
convo_id: None,
content: None,
},
Err(_) => PushInboundResult {
error_code: ErrorCode::UnknownError as i32,
has_content: false,
is_new_convo: false,
convo_id: None,
content: None,
},
events: Vec::new(),
})
.into();
};
Box::new(result).into()
let mut events = vec![first];
// Brief settle window so events from the same payload arrive together
// rather than across separate drain calls.
std::thread::sleep(Duration::from_micros(500));
while let Ok(e) = rx.try_recv() {
events.push(e);
}
Box::new(EventList {
error_code: ErrorCode::None as i32,
events,
})
.into()
}
#[ffi_export]
fn push_inbound_result_error_code(r: &PushInboundResult) -> i32 {
fn event_list_error_code(r: &EventList) -> i32 {
r.error_code
}
#[ffi_export]
fn push_inbound_result_has_content(r: &PushInboundResult) -> bool {
r.has_content
fn event_list_len(r: &EventList) -> usize {
r.events.len()
}
/// Returns the variant tag for the event at `idx`, or `EventTag::Unknown`
/// if `idx` is out of bounds.
#[ffi_export]
fn push_inbound_result_is_new_convo(r: &PushInboundResult) -> bool {
r.is_new_convo
fn event_list_tag(r: &EventList, idx: usize) -> EventTag {
match r.events.get(idx) {
Some(Event::ConversationStarted { .. }) => EventTag::ConversationStarted,
Some(Event::MessageReceived { .. }) => EventTag::MessageReceived,
Some(Event::DeliveryFailed { .. }) => EventTag::DeliveryFailed,
_ => EventTag::Unknown,
}
}
/// Returns an empty slice when has_content is false.
/// Returns the conversation id (UTF-8 bytes) for the event at `idx`,
/// or an empty slice if `idx` is out of bounds.
/// The slice is valid only while `r` is alive.
#[ffi_export]
fn push_inbound_result_convo_id(r: &PushInboundResult) -> c_slice::Ref<'_, u8> {
r.convo_id.as_deref().unwrap_or("").as_bytes().into()
fn event_list_conversation_id(r: &EventList, idx: usize) -> c_slice::Ref<'_, u8> {
let bytes: &[u8] = match r.events.get(idx) {
Some(
Event::ConversationStarted {
conversation_id, ..
}
| Event::MessageReceived {
conversation_id, ..
}
| Event::DeliveryFailed {
conversation_id, ..
},
) => conversation_id.as_bytes(),
_ => &[],
};
bytes.into()
}
/// Returns an empty slice when has_content is false.
/// Returns the message bytes for a `MessageReceived` event at `idx`.
/// Returns an empty slice for any other variant or out-of-bounds index.
/// The slice is valid only while `r` is alive.
#[ffi_export]
fn push_inbound_result_content(r: &PushInboundResult) -> c_slice::Ref<'_, u8> {
r.content.as_deref().unwrap_or(&[]).into()
fn event_list_message_data(r: &EventList, idx: usize) -> c_slice::Ref<'_, u8> {
let bytes: &[u8] = match r.events.get(idx) {
Some(Event::MessageReceived { data, .. }) => data.as_slice(),
_ => &[],
};
bytes.into()
}
#[ffi_export]
fn push_inbound_result_free(r: repr_c::Box<PushInboundResult>) {
fn event_list_free(r: repr_c::Box<EventList>) {
drop(r)
}

View File

@ -1,5 +1,7 @@
use std::sync::{Mutex, mpsc};
use libchat::AddressedEnvelope;
use logos_chat::DeliveryService;
use logos_chat::{DeliveryService, drain_inbound};
/// C callback invoked for each outbound envelope. Return 0 or positive on success, negative on
/// error. `addr_ptr/addr_len` is the delivery address; `data_ptr/data_len` is the encrypted
@ -14,16 +16,32 @@ pub type DeliverFn = Option<
) -> i32,
>;
/// `DeliveryService` for FFI consumers. Outbound publishes invoke the C
/// `DeliverFn` callback; inbound payloads are fed through a `Sender<Vec<u8>>`
/// returned at construction.
#[derive(Debug)]
pub struct CDelivery {
pub callback: DeliverFn,
callback: DeliverFn,
inbound: Mutex<mpsc::Receiver<Vec<u8>>>,
}
impl CDelivery {
/// Returns the delivery together with the `Sender` that feeds its
/// inbound side.
pub fn new(callback: DeliverFn) -> (Self, mpsc::Sender<Vec<u8>>) {
let (tx, rx) = mpsc::channel();
let delivery = Self {
callback,
inbound: Mutex::new(rx),
};
(delivery, tx)
}
}
impl DeliveryService for CDelivery {
type Error = i32;
fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), i32> {
fn publish(&self, envelope: AddressedEnvelope) -> Result<(), i32> {
let cb = self.callback.expect("callback must be non-null");
let addr = envelope.delivery_address.as_bytes();
let data = envelope.data.as_slice();
@ -31,8 +49,12 @@ impl DeliveryService for CDelivery {
if rc < 0 { Err(rc) } else { Ok(()) }
}
fn subscribe(&mut self, _delivery_address: &str) -> Result<(), Self::Error> {
// TODO: (P1) CDelivery does not support delivery_address filtering
fn subscribe(&self, _delivery_address: &str) -> Result<(), Self::Error> {
// TODO: (P1) CDelivery does not support delivery_address filtering.
Ok(())
}
fn pull(&self) -> Vec<Vec<u8>> {
drain_inbound(&self.inbound)
}
}

View File

@ -9,11 +9,12 @@ crate-type = ["rlib"]
[dependencies]
# Workspace dependencies (sorted)
chat-sqlite = { workspace = true }
components = { workspace = true}
components = { workspace = true }
libchat = { workspace = true }
# External dependencies (sorted)
thiserror = "2"
tracing = "0.1"
[dev-dependencies]
# External dependencies (sorted)

View File

@ -1,33 +1,51 @@
use logos_chat::{ChatClient, ConversationIdOwned, InProcessDelivery};
use std::sync::Arc;
use std::sync::mpsc;
use std::time::Duration;
use logos_chat::{ChatClient, ConversationIdOwned, Event, InProcessDelivery};
fn main() {
let delivery = InProcessDelivery::new(Default::default());
let mut cursor = delivery.cursor_at_tail("delivery_address");
let mut saro = ChatClient::new("saro", delivery.clone());
let mut raya = ChatClient::new("raya", delivery);
let (mut saro, saro_events) = ChatClient::new("saro", delivery.clone());
let (mut raya, raya_events) = ChatClient::new("raya", delivery);
let raya_bundle = raya.create_intro_bundle().unwrap();
saro.create_conversation(&raya_bundle, b"hello raya")
let (_saro_convo_id, _events) = saro
.create_conversation(&raya_bundle, b"hello raya")
.unwrap();
let raw = cursor.next().unwrap();
let content = raya.receive(&raw).unwrap().unwrap();
println!(
"Raya received: {:?}",
std::str::from_utf8(&content.data).unwrap()
);
let raya_convo_id = expect_invite(&raya_events, "Raya");
let raya_convo_id: ConversationIdOwned = Arc::from(content.conversation_id.as_str());
raya.send_message(&raya_convo_id, b"hi saro").unwrap();
let raw = cursor.next().unwrap();
let content = saro.receive(&raw).unwrap().unwrap();
println!(
"Saro received: {:?}",
std::str::from_utf8(&content.data).unwrap()
);
expect_message(&saro_events, "Saro");
println!("Message exchange complete.");
}
fn expect_invite(events: &mpsc::Receiver<Event>, who: &str) -> ConversationIdOwned {
let started = events.recv_timeout(Duration::from_secs(5)).unwrap();
let convo_id = match started {
Event::ConversationStarted {
conversation_id, ..
} => conversation_id,
other => panic!("expected ConversationStarted, got {other:?}"),
};
let received = events.recv_timeout(Duration::from_secs(5)).unwrap();
match received {
Event::MessageReceived { data, .. } => {
println!("{who} received: {:?}", std::str::from_utf8(&data).unwrap());
}
other => panic!("expected MessageReceived, got {other:?}"),
}
convo_id
}
fn expect_message(events: &mpsc::Receiver<Event>, who: &str) {
let event = events.recv_timeout(Duration::from_secs(5)).unwrap();
match event {
Event::MessageReceived { data, .. } => {
println!("{who} received: {:?}", std::str::from_utf8(&data).unwrap());
}
other => panic!("expected MessageReceived, got {other:?}"),
}
}

View File

@ -1,96 +1,153 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, mpsc};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use libchat::{
AddressedEnvelope, ChatError, ChatStorage, ContentData, Context, ConversationIdOwned,
DeliveryService, Introduction, StorageConfig,
ChatError, ChatStorage, Context, ConversationIdOwned, DeliveryService, Event, Introduction,
StorageConfig,
};
use components::EphemeralRegistry;
use crate::errors::ClientError;
type ChatContext<D> = Context<D, EphemeralRegistry, ChatStorage>;
const IDLE_POLL_INTERVAL: Duration = Duration::from_millis(50);
/// High-level chat client. Construction returns the handle together with a
/// `Receiver<Event>` for inbound observation.
pub struct ChatClient<D: DeliveryService> {
ctx: Context<D, EphemeralRegistry, ChatStorage>,
ctx: Arc<Mutex<ChatContext<D>>>,
shutdown: Arc<AtomicBool>,
translator: Option<JoinHandle<()>>,
}
impl<D: DeliveryService + 'static> ChatClient<D> {
/// Create an in-memory, ephemeral client. Identity is lost on drop.
pub fn new(name: impl Into<String>, delivery: D) -> Self {
/// In-memory, ephemeral client. Identity is lost on drop.
pub fn new(name: impl Into<String>, delivery: D) -> (Self, mpsc::Receiver<Event>) {
let registry = EphemeralRegistry::new();
let store = ChatStorage::in_memory();
Self {
ctx: Context::new_with_name(name, delivery, registry, store).unwrap(),
}
let ctx = Context::new_with_name(name, delivery, registry, store).unwrap();
Self::wrap(ctx)
}
/// Open or create a persistent client backed by `StorageConfig`.
///
/// If an identity already exists in storage it is loaded; otherwise a new
/// one is created and saved.
/// Persistent client backed by `config`. Identity is loaded if present,
/// otherwise created and saved.
pub fn open(
name: impl Into<String>,
config: StorageConfig,
delivery: D,
) -> Result<Self, ClientError<D::Error>> {
) -> Result<(Self, mpsc::Receiver<Event>), ClientError<D::Error>> {
let store = ChatStorage::new(config).map_err(ChatError::from)?;
let registry = EphemeralRegistry::new();
let ctx = Context::new_from_store(name, delivery, registry, store)?;
Ok(Self { ctx })
Ok(Self::wrap(ctx))
}
/// Returns the installation name (identity label) of this client.
pub fn installation_name(&self) -> &str {
self.ctx.installation_name()
pub fn installation_name(&self) -> String {
self.ctx.lock().unwrap().installation_name().to_string()
}
/// Produce a serialised introduction bundle for sharing out-of-band.
pub fn create_intro_bundle(&mut self) -> Result<Vec<u8>, ClientError<D::Error>> {
self.ctx.create_intro_bundle().map_err(Into::into)
self.ctx
.lock()
.unwrap()
.create_intro_bundle()
.map_err(Into::into)
}
/// Parse intro bundle bytes, initiate a private conversation, and deliver
/// all outbound envelopes. Returns this side's conversation ID.
pub fn create_conversation(
&mut self,
intro_bundle: &[u8],
initial_content: &[u8],
) -> Result<ConversationIdOwned, ClientError<D::Error>> {
) -> Result<(ConversationIdOwned, Vec<Event>), ClientError<D::Error>> {
let intro = Introduction::try_from(intro_bundle)?;
let (convo_id, envelopes) = self.ctx.create_private_convo(&intro, initial_content)?;
self.dispatch_all(envelopes)?;
Ok(convo_id)
self.ctx
.lock()
.unwrap()
.create_private_convo(&intro, initial_content)
.map_err(Into::into)
}
/// List all conversation IDs known to this client.
pub fn list_conversations(&self) -> Result<Vec<ConversationIdOwned>, ClientError<D::Error>> {
self.ctx.list_conversations().map_err(Into::into)
self.ctx
.lock()
.unwrap()
.list_conversations()
.map_err(Into::into)
}
/// Encrypt `content` and dispatch all outbound envelopes.
pub fn send_message(
&mut self,
convo_id: &ConversationIdOwned,
content: &[u8],
) -> Result<(), ClientError<D::Error>> {
let envelopes = self.ctx.send_content(convo_id.as_ref(), content)?;
self.dispatch_all(envelopes)
) -> Result<Vec<Event>, ClientError<D::Error>> {
self.ctx
.lock()
.unwrap()
.send_content(convo_id.as_ref(), content)
.map_err(Into::into)
}
/// Decrypt an inbound payload. Returns `Some(ContentData)` for user
/// content, `None` for protocol frames.
pub fn receive(
&mut self,
payload: &[u8],
) -> Result<Option<ContentData>, ClientError<D::Error>> {
self.ctx.handle_payload(payload).map_err(Into::into)
}
fn dispatch_all(
&mut self,
envelopes: Vec<AddressedEnvelope>,
) -> Result<(), ClientError<D::Error>> {
for env in envelopes {
let mut delivery = self.ctx.ds();
delivery.publish(env).map_err(ClientError::Delivery)?;
}
Ok(())
fn wrap(ctx: ChatContext<D>) -> (Self, mpsc::Receiver<Event>) {
let delivery = ctx.delivery_arc();
let ctx = Arc::new(Mutex::new(ctx));
let (event_tx, event_rx) = mpsc::channel();
let shutdown = Arc::new(AtomicBool::new(false));
let translator_ctx = Arc::clone(&ctx);
let translator_shutdown = Arc::clone(&shutdown);
let translator = thread::spawn(move || {
translator_loop(delivery, translator_ctx, event_tx, translator_shutdown)
});
(
Self {
ctx,
shutdown,
translator: Some(translator),
},
event_rx,
)
}
}
impl<D: DeliveryService> Drop for ChatClient<D> {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Release);
if let Some(handle) = self.translator.take() {
// Best-effort: a panicked translator should not poison Drop.
let _ = handle.join();
}
}
}
fn translator_loop<D: DeliveryService + 'static>(
delivery: Arc<D>,
ctx: Arc<Mutex<ChatContext<D>>>,
event_tx: mpsc::Sender<Event>,
shutdown: Arc<AtomicBool>,
) {
while !shutdown.load(Ordering::Acquire) {
let batch = delivery.pull();
if batch.is_empty() {
thread::sleep(IDLE_POLL_INTERVAL);
continue;
}
for bytes in batch {
let events = match ctx.lock().unwrap().handle_payload(&bytes) {
Ok(events) => events,
Err(e) => {
tracing::warn!("handle_payload error: {e:?}");
continue;
}
};
for event in events {
if event_tx.send(event).is_err() {
tracing::info!("translator exiting: event receiver dropped");
return;
}
}
}
}
}

View File

@ -1,7 +1,7 @@
use crate::{AddressedEnvelope, DeliveryService};
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex};
type Message = Vec<u8>;
@ -12,7 +12,7 @@ type Message = Vec<u8>;
/// so multiple consumers on the same address each see every message.
#[derive(Clone, Default, Debug)]
pub struct MessageBus {
log: Arc<RwLock<HashMap<String, Vec<Message>>>>,
log: Arc<Mutex<HashMap<String, Vec<Message>>>>,
}
impl MessageBus {
@ -29,7 +29,7 @@ impl MessageBus {
/// Returns a cursor positioned at the current tail of `address`.
/// The cursor will only see messages delivered after this call.
pub fn cursor_at_tail(&self, address: &str) -> Cursor {
let pos = self.log.read().unwrap().get(address).map_or(0, |v| v.len());
let pos = self.log.lock().unwrap().get(address).map_or(0, |v| v.len());
Cursor {
bus: self.clone(),
address: address.to_string(),
@ -41,12 +41,12 @@ impl MessageBus {
// Unwrap produces a panic when the lock is poisoned.
// It would most likely indicate log corruption (e.g. incomplete write from another thread),
// so panic propagation seems appropriate.
self.log.read().unwrap().get(address)?.get(pos).cloned()
self.log.lock().unwrap().get(address)?.get(pos).cloned()
}
fn push(&self, address: String, data: Message) {
self.log
.write()
.lock()
.unwrap()
.entry(address)
.or_default()
@ -77,40 +77,87 @@ impl Iterator for Cursor {
/// In-process delivery service backed by a [`MessageBus`].
///
/// Cheap to clone — all clones share the same underlying bus, so multiple
/// clients can share one logical delivery service. Construct with a
/// [`MessageBus`] and use [`cursor`](InProcessDelivery::cursor) /
/// [`cursor_at_tail`](InProcessDelivery::cursor_at_tail) to read messages.
#[derive(Clone, Default, Debug)]
pub struct InProcessDelivery(MessageBus);
/// clients can share one logical delivery service. Each clone has its own
/// per-address cursor for [`DeliveryService::pull`]; tests that prefer to
/// pull directly can use [`cursor`](InProcessDelivery::cursor) /
/// [`cursor_at_tail`](InProcessDelivery::cursor_at_tail) instead.
#[derive(Default, Debug)]
pub struct InProcessDelivery {
bus: MessageBus,
state: Mutex<DeliveryState>,
}
#[derive(Default, Debug, Clone)]
struct DeliveryState {
cursors: HashMap<String, usize>,
}
impl InProcessDelivery {
/// Create a delivery service backed by `bus`.
pub fn new(bus: MessageBus) -> Self {
Self(bus)
Self {
bus,
state: Mutex::new(DeliveryState::default()),
}
}
/// Returns a cursor positioned at the beginning of `address`.
pub fn cursor(&self, address: &str) -> Cursor {
self.0.cursor(address)
self.bus.cursor(address)
}
/// Returns a cursor positioned at the current tail of `address`.
/// The cursor will only see messages delivered after this call.
pub fn cursor_at_tail(&self, address: &str) -> Cursor {
self.0.cursor_at_tail(address)
self.bus.cursor_at_tail(address)
}
}
impl Clone for InProcessDelivery {
fn clone(&self) -> Self {
Self {
bus: self.bus.clone(),
state: Mutex::new(self.state.lock().unwrap().clone()),
}
}
}
impl DeliveryService for InProcessDelivery {
type Error = Infallible;
fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Infallible> {
self.0.push(envelope.delivery_address, envelope.data);
fn publish(&self, envelope: AddressedEnvelope) -> Result<(), Infallible> {
self.bus.push(envelope.delivery_address, envelope.data);
Ok(())
}
fn subscribe(&mut self, _delivery_address: &str) -> Result<(), Self::Error> {
// TODO: (P1) implement subscribe
fn subscribe(&self, delivery_address: &str) -> Result<(), Self::Error> {
// Initialise the cursor at the current tail so the subscriber only
// sees subsequent messages on this address.
let pos = self
.bus
.log
.lock()
.unwrap()
.get(delivery_address)
.map_or(0, |v| v.len());
self.state
.lock()
.unwrap()
.cursors
.entry(delivery_address.to_string())
.or_insert(pos);
Ok(())
}
fn pull(&self) -> Vec<Vec<u8>> {
let mut out = Vec::new();
let log = self.bus.log.lock().unwrap();
let mut state = self.state.lock().unwrap();
for (addr, cursor) in state.cursors.iter_mut() {
if let Some(messages) = log.get(addr) {
while *cursor < messages.len() {
out.push(messages[*cursor].clone());
*cursor += 1;
}
}
}
out
}
}

View File

@ -8,5 +8,6 @@ pub use errors::ClientError;
// Re-export types callers need to interact with ChatClient
pub use libchat::{
AddressedEnvelope, ContentData, ConversationIdOwned, DeliveryService, StorageConfig,
AddressedEnvelope, ConversationIdOwned, DeliveryService, EnvelopeId, Event, FailureReason,
StorageConfig, drain_inbound,
};

View File

@ -1,68 +1,159 @@
use logos_chat::{
ChatClient, ContentData, ConversationIdOwned, Cursor, InProcessDelivery, StorageConfig,
};
use std::sync::Arc;
use std::sync::mpsc;
use std::time::Duration;
fn receive(receiver: &mut ChatClient<InProcessDelivery>, cursor: &mut Cursor) -> ContentData {
let raw = cursor.next().expect("expected envelope");
receiver
.receive(&raw)
.expect("receive failed")
.expect("expected content")
use logos_chat::{
AddressedEnvelope, ChatClient, DeliveryService, Event, InProcessDelivery, StorageConfig,
};
fn expect_event<F, T>(events: &mpsc::Receiver<Event>, label: &str, mut f: F) -> T
where
F: FnMut(Event) -> Result<T, Event>,
{
let event = events
.recv_timeout(Duration::from_secs(5))
.unwrap_or_else(|_| panic!("timed out waiting for {label}"));
f(event).unwrap_or_else(|other| panic!("expected {label}, got {other:?}"))
}
#[test]
fn saro_raya_message_exchange() {
let delivery = InProcessDelivery::new(Default::default());
let mut cursor = delivery.cursor_at_tail("delivery_address");
let mut saro = ChatClient::new("saro", delivery.clone());
let mut raya = ChatClient::new("raya", delivery);
let (mut saro, saro_events) = ChatClient::new("saro", delivery.clone());
let (mut raya, raya_events) = ChatClient::new("raya", delivery);
let raya_bundle = raya.create_intro_bundle().unwrap();
let saro_convo_id = saro
let (saro_convo_id, send_events) = saro
.create_conversation(&raya_bundle, b"hello raya")
.unwrap();
assert!(
send_events.is_empty(),
"unexpected send events: {send_events:?}"
);
let content = receive(&mut raya, &mut cursor);
assert_eq!(content.data, b"hello raya");
assert!(content.is_new_convo);
// The invite payload yields ConversationStarted then MessageReceived.
let raya_convo_id = expect_event(&raya_events, "ConversationStarted", |e| match e {
Event::ConversationStarted {
conversation_id, ..
} => Ok(conversation_id),
other => Err(other),
});
expect_event(&raya_events, "MessageReceived", |e| match e {
Event::MessageReceived {
conversation_id,
data,
..
} => {
assert_eq!(conversation_id, raya_convo_id);
assert_eq!(data.as_slice(), b"hello raya");
Ok(())
}
other => Err(other),
});
let raya_convo_id: ConversationIdOwned = Arc::from(content.conversation_id.as_str());
raya.send_message(&raya_convo_id, b"hi saro").unwrap();
let content = receive(&mut saro, &mut cursor);
assert_eq!(content.data, b"hi saro");
assert!(!content.is_new_convo);
let send_events = raya.send_message(&raya_convo_id, b"hi saro").unwrap();
assert!(send_events.is_empty());
expect_event(&saro_events, "MessageReceived", |e| match e {
Event::MessageReceived { data, .. } => {
assert_eq!(data.as_slice(), b"hi saro");
Ok(())
}
other => Err(other),
});
for i in 0u8..5 {
let msg = format!("msg {i}");
saro.send_message(&saro_convo_id, msg.as_bytes()).unwrap();
let content = receive(&mut raya, &mut cursor);
assert_eq!(content.data, msg.as_bytes());
let send_events = saro.send_message(&saro_convo_id, msg.as_bytes()).unwrap();
assert!(send_events.is_empty());
expect_event(
&raya_events,
&format!("MessageReceived(msg {i})"),
|e| match e {
Event::MessageReceived { data, .. } => {
assert_eq!(data.as_slice(), msg.as_bytes());
Ok(())
}
other => Err(other),
},
);
let reply = format!("reply {i}");
raya.send_message(&raya_convo_id, reply.as_bytes()).unwrap();
let content = receive(&mut saro, &mut cursor);
assert_eq!(content.data, reply.as_bytes());
let send_events = raya.send_message(&raya_convo_id, reply.as_bytes()).unwrap();
assert!(send_events.is_empty());
expect_event(
&saro_events,
&format!("MessageReceived(reply {i})"),
|e| match e {
Event::MessageReceived { data, .. } => {
assert_eq!(data.as_slice(), reply.as_bytes());
Ok(())
}
other => Err(other),
},
);
}
assert_eq!(saro.list_conversations().unwrap().len(), 1);
assert_eq!(raya.list_conversations().unwrap().len(), 1);
}
#[derive(Debug, Default)]
struct FailingDelivery;
impl DeliveryService for FailingDelivery {
type Error = &'static str;
fn publish(&self, _: AddressedEnvelope) -> Result<(), Self::Error> {
Err("simulated transport failure")
}
fn subscribe(&self, _: &str) -> Result<(), Self::Error> {
Ok(())
}
fn pull(&self) -> Vec<Vec<u8>> {
Vec::new()
}
}
#[test]
fn dropping_client_shuts_down_translator() {
let (client, events) = ChatClient::new("saro", InProcessDelivery::default());
drop(client);
// Drop must join the translator thread; once joined, the translator's
// Sender<Event> is gone and recv returns Disconnected immediately.
let res = events.recv_timeout(Duration::from_secs(5));
assert!(matches!(res, Err(mpsc::RecvTimeoutError::Disconnected)));
}
#[test]
fn publish_failure_surfaces_as_event() {
// Spin a real raya just to mint a valid intro bundle.
let (mut raya, _) = ChatClient::new("raya", InProcessDelivery::default());
let bundle = raya.create_intro_bundle().unwrap();
let (mut saro, _) = ChatClient::new("saro", FailingDelivery);
let (_, send_events) = saro.create_conversation(&bundle, b"hello").unwrap();
assert!(
send_events.iter().any(Event::is_delivery_failure),
"expected a DeliveryFailed event, got {send_events:?}"
);
}
#[test]
fn open_persistent_client() {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.db").to_string_lossy().to_string();
let config = StorageConfig::File(db_path);
let client1 = ChatClient::open("saro", config.clone(), InProcessDelivery::default()).unwrap();
let name1 = client1.installation_name().to_string();
let (client1, _events1) =
ChatClient::open("saro", config.clone(), InProcessDelivery::default()).unwrap();
let name1 = client1.installation_name();
drop(client1);
let client2 = ChatClient::open("saro", config, InProcessDelivery::default()).unwrap();
let name2 = client2.installation_name().to_string();
let (client2, _events2) =
ChatClient::open("saro", config, InProcessDelivery::default()).unwrap();
let name2 = client2.installation_name();
assert_eq!(
name1, name2,

View File

@ -1,87 +1,56 @@
use std::{
cell::RefCell,
collections::{HashSet, VecDeque},
hash::{DefaultHasher, Hash, Hasher},
rc::Rc,
sync::{Arc, Mutex},
};
use libchat::{AddressedEnvelope, DeliveryService};
#[derive(Debug)]
struct BroadcasterShared<T> {
/// Per-address message queue; all published messages are appended here.
messages: VecDeque<T>,
base_index: usize,
#[derive(Debug, Default)]
struct SharedStore {
/// Append-only log of every published envelope.
messages: VecDeque<AddressedEnvelope>,
}
impl<T> BroadcasterShared<T> {
pub fn read(&self, cursor: usize) -> Option<&T> {
self.messages.get(cursor + self.base_index)
}
pub fn tail(&self) -> usize {
self.messages.len() + self.base_index
}
}
#[derive(Clone, Debug)]
pub struct LocalBroadcaster {
shared: Rc<RefCell<BroadcasterShared<AddressedEnvelope>>>,
#[derive(Clone, Debug, Default)]
struct ConsumerState {
/// Position in the shared log this consumer has scanned up to.
cursor: usize,
/// Addresses this consumer is interested in.
subscriptions: HashSet<String>,
outbound_msgs: Vec<u64>,
/// IDs of envelopes this consumer itself published — used to filter them
/// out when scanning the log (a consumer doesn't receive its own output).
outbound_msgs: HashSet<u64>,
}
/// `DeliveryService` for tests and local examples.
///
/// Each clone is an independent consumer (own cursor, subscriptions, and
/// outbound filter) over a shared in-memory log.
#[derive(Debug)]
pub struct LocalBroadcaster {
shared: Arc<Mutex<SharedStore>>,
state: Mutex<ConsumerState>,
}
/// This is Lightweight DeliveryService which can be used for tests
/// and local examples. Messages are not delivered until `poll` is called
/// which allows for more fine grain test cases.
impl LocalBroadcaster {
pub fn new() -> Self {
let shared = Rc::new(RefCell::new(BroadcasterShared {
messages: VecDeque::new(),
base_index: 0,
}));
let cursor = shared.borrow().tail();
Self {
shared,
cursor,
subscriptions: HashSet::new(),
outbound_msgs: Vec::new(),
shared: Arc::new(Mutex::new(SharedStore::default())),
state: Mutex::new(ConsumerState::default()),
}
}
/// Returns a new consumer that shares the same message store but has its
/// own independent cursor — it starts from the beginning of each address
/// queue regardless of what any other consumer has already processed.
/// Returns a new consumer that shares the same underlying log but starts
/// at the current tail — historical messages are skipped.
pub fn new_consumer(&self) -> Self {
let inner = self.shared.clone();
let cursor = inner.borrow().tail();
let cursor = self.shared.lock().unwrap().messages.len();
Self {
shared: inner,
cursor,
subscriptions: HashSet::new(),
outbound_msgs: Vec::new(),
}
}
/// Pulls all messages this consumer has not yet seen on `address`,
/// applying any registered filter. Advances the cursor so the same
/// messages are not returned again.
pub fn poll(&mut self) -> Option<Vec<u8>> {
loop {
let next = self.cursor;
match self.shared.borrow().read(next) {
None => return None,
Some(ae) => {
self.cursor = next + 1;
if self.subscriptions.contains(ae.delivery_address.as_str())
&& self.is_inbound(ae)
{
return Some(ae.data.clone());
}
}
}
shared: Arc::clone(&self.shared),
state: Mutex::new(ConsumerState {
cursor,
..ConsumerState::default()
}),
}
}
@ -90,11 +59,6 @@ impl LocalBroadcaster {
msg.data.as_slice().hash(&mut hasher);
hasher.finish()
}
fn is_inbound(&self, msg: &AddressedEnvelope) -> bool {
let mid = Self::msg_id(msg);
!self.outbound_msgs.contains(&mid)
}
}
impl Default for LocalBroadcaster {
@ -103,20 +67,47 @@ impl Default for LocalBroadcaster {
}
}
impl Clone for LocalBroadcaster {
fn clone(&self) -> Self {
Self {
shared: Arc::clone(&self.shared),
state: Mutex::new(self.state.lock().unwrap().clone()),
}
}
}
impl DeliveryService for LocalBroadcaster {
type Error = String;
fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), Self::Error> {
self.outbound_msgs.push(Self::msg_id(&envelope));
self.shared.borrow_mut().messages.push_back(envelope);
fn publish(&self, envelope: AddressedEnvelope) -> Result<(), Self::Error> {
let id = Self::msg_id(&envelope);
self.state.lock().unwrap().outbound_msgs.insert(id);
self.shared.lock().unwrap().messages.push_back(envelope);
Ok(())
}
fn subscribe(&mut self, delivery_address: &str) -> Result<(), Self::Error> {
// Strict temporal ordering of subscriptions is not enforced.
// Subscriptions are evaluated on polling, not when the message is published
self.subscriptions.insert(delivery_address.to_string());
fn subscribe(&self, delivery_address: &str) -> Result<(), Self::Error> {
self.state
.lock()
.unwrap()
.subscriptions
.insert(delivery_address.to_string());
Ok(())
}
fn pull(&self) -> Vec<Vec<u8>> {
let mut out = Vec::new();
let shared = self.shared.lock().unwrap();
let mut state = self.state.lock().unwrap();
while state.cursor < shared.messages.len() {
let ae = &shared.messages[state.cursor];
state.cursor += 1;
if state.subscriptions.contains(&ae.delivery_address)
&& !state.outbound_msgs.contains(&Self::msg_id(ae))
{
out.push(ae.data.clone());
}
}
out
}
}

View File

@ -79,7 +79,7 @@
}
);
devShells = forAllSystems ({ pkgs }:
devShells = forAllSystems ({ pkgs, ... }:
let
rustToolchain = pkgs.rust-bin.fromRustupToolchainFile ./rust_toolchain.toml;
in
@ -89,6 +89,7 @@
rustToolchain
pkgs.pkg-config
pkgs.cmake
pkgs.perl
];
};
}