use crate::{AddressedEnvelope, delivery::DeliveryService}; use std::collections::HashMap; use std::convert::Infallible; use std::sync::{Arc, Mutex}; type Message = Vec; /// Shared in-process message bus. Cheap to clone — all clones share the same log. /// /// Messages are stored in an append-only log per delivery address. Readers hold /// independent [`Cursor`]s and advance their position without consuming messages, /// so multiple consumers on the same address each see every message. #[derive(Clone, Default)] pub struct MessageBus { log: Arc>>>, } impl MessageBus { /// Returns a cursor positioned at the beginning of `address`. /// The cursor will see all messages — past and future. pub fn subscribe(&self, address: &str) -> Cursor { Cursor { bus: self.clone(), address: address.to_string(), pos: 0, } } /// Returns a cursor positioned at the current tail of `address`. /// The cursor will only see messages delivered after this call. pub fn subscribe_tail(&self, address: &str) -> Cursor { let pos = self.log.lock().unwrap().get(address).map_or(0, |v| v.len()); Cursor { bus: self.clone(), address: address.to_string(), pos, } } fn push(&self, address: String, data: Message) { self.log .lock() .unwrap() .entry(address) .or_default() .push(data); } } /// Per-consumer read cursor into a [`MessageBus`] address slot. /// /// Reads are non-destructive: the underlying log is never modified. /// Multiple cursors on the same address each advance independently. pub struct Cursor { bus: MessageBus, address: String, pos: usize, } impl Iterator for Cursor { type Item = Message; fn next(&mut self) -> Option { let guard = self.bus.log.lock().unwrap(); let msgs = guard.get(&self.address)?; if self.pos < msgs.len() { let msg = msgs[self.pos].clone(); self.pos += 1; Some(msg) } else { None } } } /// In-process delivery service backed by a [`MessageBus`]. /// /// Cheap to clone — all clones share the same underlying bus, so multiple /// clients can share one logical delivery service. Use [`InProcessDelivery::new`] /// to get both the service and a bus handle for subscribing [`Cursor`]s. #[derive(Clone)] pub struct InProcessDelivery(MessageBus); impl InProcessDelivery { /// Create a new delivery service with its own private bus. /// Returns both the service and a handle to the bus so callers can /// subscribe [`Cursor`]s to read delivered messages. pub fn new() -> (Self, MessageBus) { let bus = MessageBus::default(); (Self(bus.clone()), bus) } } impl Default for InProcessDelivery { /// Create a standalone delivery service with no externally-held bus handle. /// Useful when routing is not needed (e.g. persistent-client tests). fn default() -> Self { Self(MessageBus::default()) } } impl DeliveryService for InProcessDelivery { type Error = Infallible; fn deliver(&mut self, envelope: AddressedEnvelope) -> Result<(), Infallible> { self.0.push(envelope.delivery_address, envelope.data); Ok(()) } }