Componentize logos delivery (#148)

* Move logos_delivery to components

Rename components

update deps

WIP

Remove requirement for build.rs in chat-cli

fix imports

update linux flake

Linter fixes

fix build in linux

* Update docs

* Blankspace fix
This commit is contained in:
Jazz Turner-Baggs 2026-06-26 10:05:28 -07:00 committed by GitHub
parent 0d38dd80b7
commit 97eacc01a7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 237 additions and 118 deletions

View File

@ -20,9 +20,10 @@ jobs:
- run: rustup update stable && rustup default stable
# hashgraph-like-consensus's build.rs shells out to protoc via prost-build.
- run: sudo apt-get update && sudo apt-get install -y protobuf-compiler
# chat-cli's build.rs unconditionally links liblogosdelivery and requires
# LOGOS_DELIVERY_LIB_DIR. The smoketest job builds and exercises it under
# Nix; here we keep the toolchain-only job fast by skipping it.
# chat-cli pulls in components' embedded_p2p_delivery feature, whose
# build.rs links liblogosdelivery (built via Nix or LOGOS_DELIVERY_LIB_DIR).
# The smoketest job builds and exercises it under Nix; here we keep the
# toolchain-only job fast by skipping it.
- run: cargo build --verbose --workspace --exclude chat-cli
- run: cargo test --verbose --workspace --exclude chat-cli

2
Cargo.lock generated
View File

@ -1315,6 +1315,7 @@ dependencies = [
"arboard",
"base64",
"clap",
"components",
"crossbeam-channel",
"crossterm 0.29.0",
"logos-chat",
@ -1468,6 +1469,7 @@ dependencies = [
"libchat",
"reqwest 0.12.28",
"serde",
"serde_json",
"storage",
"thiserror",
"tracing",

View File

@ -9,6 +9,7 @@ path = "src/main.rs"
[dependencies]
# Workspace dependencies (sorted)
components = { workspace = true , features = ["embedded_p2p_delivery"]}
crossbeam-channel = { workspace = true }
logos-chat = { workspace = true }

View File

@ -1,18 +0,0 @@
fn main() {
println!("cargo:rerun-if-env-changed=LOGOS_DELIVERY_LIB_DIR");
println!("cargo::rustc-check-cfg=cfg(logos_delivery)");
let Ok(lib_dir) = std::env::var("LOGOS_DELIVERY_LIB_DIR") else {
return;
};
println!("cargo:rustc-cfg=logos_delivery");
println!("cargo:rustc-link-search=native={lib_dir}");
println!("cargo:rustc-link-lib=dylib=logosdelivery");
let target_os = std::env::var("CARGO_CFG_TARGET_OS").unwrap_or_default();
match target_os.as_str() {
"macos" | "linux" => println!("cargo:rustc-link-arg=-Wl,-rpath,{lib_dir}"),
other => panic!("unsupported OS for logos-delivery transport: {other}"),
}
}

View File

@ -13,13 +13,33 @@ use logos_chat::{
RegistrationService, StorageConfig, Transport,
};
use components::{EmbeddedP2pDeliveryService, P2pConfig};
#[derive(Debug)]
struct P2pTransport(EmbeddedP2pDeliveryService);
impl logos_chat::DeliveryService for P2pTransport {
type Error = <EmbeddedP2pDeliveryService as logos_chat::DeliveryService>::Error;
fn publish(&mut self, envelope: logos_chat::AddressedEnvelope) -> Result<(), Self::Error> {
self.0.publish(envelope)
}
fn subscribe(&mut self, addr: &str) -> Result<(), Self::Error> {
self.0.subscribe(addr)
}
}
impl logos_chat::Transport for P2pTransport {
fn inbound(&mut self) -> crossbeam_channel::Receiver<Vec<u8>> {
self.0.inbound_queue()
}
}
use app::ChatApp;
#[derive(Copy, Clone, Debug, ValueEnum)]
#[value(rename_all = "kebab-case")]
enum TransportKind {
File,
#[cfg(logos_delivery)]
LogosDelivery,
}
@ -79,19 +99,18 @@ fn main() -> Result<()> {
.context("failed to create file transport")?;
run(transport, &cli)
}
#[cfg(logos_delivery)]
TransportKind::LogosDelivery => {
use transport::logos_delivery::{Config, Service};
println!("Starting logos-delivery node (preset={})...", cli.preset);
println!("This may take a few seconds while connecting to the network.");
let cfg = Config {
let cfg = P2pConfig {
preset: cli.preset.clone(),
tcp_port: cli.port,
..Default::default()
};
let transport = Service::start(cfg).context("failed to start logos-delivery")?;
let transport = P2pTransport(
EmbeddedP2pDeliveryService::start(cfg).context("failed to start logos-delivery")?,
);
println!("Node connected. Initializing chat client...");
run(transport, &cli)
@ -160,74 +179,6 @@ where
result
}
#[cfg_attr(not(logos_delivery), allow(dead_code, unused_variables))]
fn run_logos_delivery(cli: Cli) -> Result<()> {
#[cfg(logos_delivery)]
{
use transport::logos_delivery::{Config, Service};
eprintln!("Starting logos-delivery node (preset={})...", cli.preset);
eprintln!("This may take a few seconds while connecting to the network.");
let logos_cfg = Config {
preset: cli.preset.clone(),
tcp_port: cli.port,
..Default::default()
};
let delivery = Service::start(logos_cfg).context("failed to start logos-delivery")?;
eprintln!("Node connected. Initializing chat client...");
let data_dir = cli
.db
.as_ref()
.and_then(|p| p.parent())
.map(|p| p.to_path_buf())
.unwrap_or_else(|| cli.data.clone());
let (client, events) = match cli.db {
Some(ref path) => {
let db_str = path
.to_str()
.context("db path contains non-UTF-8 characters")?
.to_string();
logos_chat::ChatClientBuilder::new()
.storage_config(logos_chat::StorageConfig::Encrypted {
path: db_str,
key: "chat-cli".to_string(),
})
.transport(delivery)
.build()
.map_err(|e| anyhow::anyhow!("{e:?}"))
.context("failed to open persistent client")?
}
None => logos_chat::ChatClientBuilder::new()
.transport(delivery)
.build()
.map_err(|e| anyhow::anyhow!("{e:?}"))
.context("failed to open chat client")?,
};
let mut app = ChatApp::new(client, events, &cli.name, &data_dir)?;
if cli.smoketest {
return Ok(());
}
let mut terminal = ui::init().context("failed to initialize terminal")?;
let result = run_app(&mut terminal, &mut app);
ui::restore().context("failed to restore terminal")?;
return result;
}
#[cfg(not(logos_delivery))]
anyhow::bail!(
"logos-delivery transport is not available in this build.\n\
Build with LOGOS_DELIVERY_LIB_DIR set to enable it."
)
}
fn run_app<I, T, R, S>(terminal: &mut ui::Tui, app: &mut ChatApp<I, T, R, S>) -> Result<()>
where
I: IdentityProvider + Send,

View File

@ -1,3 +1 @@
pub mod file;
#[cfg(logos_delivery)]
pub mod logos_delivery;

View File

@ -2,6 +2,10 @@
name = "components"
version = "0.1.0"
edition = "2024"
links = "logosdelivery"
[features]
embedded_p2p_delivery = []
[dependencies]
# Workspace dependencies (sorted)
@ -15,5 +19,6 @@ crossbeam-channel = { workspace = true }
hex = "0.4.3"
reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "2"
tracing = "0.1"

View File

@ -0,0 +1,166 @@
use std::fs;
use std::path::{Path, PathBuf};
use std::process::Command;
fn main() {
println!("cargo:rerun-if-env-changed=LOGOS_DELIVERY_LIB_DIR");
println!("cargo::rustc-check-cfg=cfg(logos_delivery)");
if std::env::var_os("CARGO_FEATURE_EMBEDDED_P2P_DELIVERY").is_none() {
return;
}
let Some(lib_dir) = locate_lib_dir() else {
// Feature is on but the native library is unavailable (e.g. `cargo
// check` on a machine without nix). Skip the cfg so the FFI module is
// not compiled — this keeps `cargo check` working without producing
// unresolved symbols at link time. `EmbeddedP2pDeliveryService` is
// simply absent until the library can be found.
return;
};
println!("cargo:rustc-cfg=logos_delivery");
let out_dir = std::env::var("OUT_DIR").expect("OUT_DIR not set");
let target_os = std::env::var("CARGO_CFG_TARGET_OS").unwrap_or_default();
// The shipped library carries a relocatable install name (@rpath on macOS,
// $ORIGIN soname on Linux), which would force every downstream BINARY to
// inject its own RPATH. Cargo propagates `rustc-link-search` and
// `rustc-link-lib` across crates, but NOT `rustc-link-arg` (the rpath) — so
// that relocatable name is exactly what makes consumers need their own
// build.rs. Instead, stamp a private copy with an ABSOLUTE install name;
// the propagating search + lib directives are then sufficient and consumers
// need zero build-script glue.
match target_os.as_str() {
"macos" => stamp_absolute_macos(&lib_dir, &out_dir),
"linux" => stamp_absolute_linux(&lib_dir, &out_dir),
other => panic!("unsupported OS for logos-delivery transport: {other}"),
}
println!("cargo:rustc-link-search=native={out_dir}");
println!("cargo:rustc-link-lib=dylib=logosdelivery");
}
/// Locate the native library directory as an ABSOLUTE, canonical path. Prefers
/// `LOGOS_DELIVERY_LIB_DIR`, then falls back to building it via nix. Returns
/// `None` when neither is available (e.g. `cargo check` without nix).
fn locate_lib_dir() -> Option<PathBuf> {
if let Ok(dir) = std::env::var("LOGOS_DELIVERY_LIB_DIR") {
if let Some(resolved) = resolve_lib_dir(&dir) {
return Some(resolved);
}
println!(
"cargo:warning=LOGOS_DELIVERY_LIB_DIR='{dir}' could not be resolved; \
falling back to `nix build`"
);
}
resolve_lib_dir(&nix_build_logos_delivery()?)
}
/// Resolve a lib dir to an absolute, canonical path. Cargo runs build scripts
/// with the cwd set to the crate dir, but a relative value (e.g. CI's
/// `./result/lib`) is anchored at the flake/workspace root where `nix build`
/// drops `result`. Canonicalizing also follows the `result` symlink to the
/// immutable store path, so the stamped install name / soname stays stable.
fn resolve_lib_dir(dir: &str) -> Option<PathBuf> {
let path = Path::new(dir);
let anchored = if path.is_absolute() {
path.to_path_buf()
} else {
let manifest = std::env::var("CARGO_MANIFEST_DIR").ok()?;
Path::new(&find_flake_root(&manifest)?).join(path)
};
anchored.canonicalize().ok()
}
/// Copy `liblogosdelivery.dylib` into `OUT_DIR` and rewrite its install name to
/// the absolute store path. The consumer records that absolute path, so dyld
/// loads the original file directly — whose own `@loader_path` RPATH resolves
/// `librln.dylib` beside it — with no RPATH needed on the consumer.
fn stamp_absolute_macos(lib_dir: &Path, out_dir: &str) {
let src = lib_dir.join("liblogosdelivery.dylib");
let dst = format!("{out_dir}/liblogosdelivery.dylib");
copy_writable(&src, Path::new(&dst));
run("install_name_tool", &["-id", path_str(&src), &dst]);
println!("cargo:rerun-if-changed={}", src.display());
}
/// Linux equivalent: an absolute `DT_SONAME` is recorded verbatim in the
/// consumer's `DT_NEEDED`, so `ld.so` loads it by path with no RPATH. Requires
/// `patchelf` at build time (provided by the nix devshell).
fn stamp_absolute_linux(lib_dir: &Path, out_dir: &str) {
let src = lib_dir.join("liblogosdelivery.so");
let dst = format!("{out_dir}/liblogosdelivery.so");
copy_writable(&src, Path::new(&dst));
run("patchelf", &["--set-soname", path_str(&src), &dst]);
println!("cargo:rerun-if-changed={}", src.display());
}
fn path_str(p: &Path) -> &str {
p.to_str()
.unwrap_or_else(|| panic!("non-UTF-8 path: {}", p.display()))
}
fn copy_writable(src: &Path, dst: &Path) {
use std::os::unix::fs::PermissionsExt;
fs::copy(src, dst)
.unwrap_or_else(|e| panic!("copy {} -> {}: {e}", src.display(), dst.display()));
// Store-sourced files are read-only; restore owner write so the install
// name / soname can be rewritten.
fs::set_permissions(dst, fs::Permissions::from_mode(0o644)).unwrap();
}
fn run(cmd: &str, args: &[&str]) {
let status = Command::new(cmd)
.args(args)
.status()
.unwrap_or_else(|e| panic!("failed to run `{cmd}`: {e}"));
assert!(status.success(), "`{cmd} {args:?}` failed with {status}");
}
fn nix_build_logos_delivery() -> Option<String> {
let manifest_dir = std::env::var("CARGO_MANIFEST_DIR").ok()?;
let flake_root = find_flake_root(&manifest_dir)?;
println!("cargo:rerun-if-changed={flake_root}/flake.lock");
let output = Command::new("nix")
.args([
"build",
".#logos-delivery",
"--no-link",
"--print-out-paths",
])
.current_dir(&flake_root)
.output()
.ok()?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
println!("cargo:warning=nix build .#logos-delivery failed: {stderr}");
return None;
}
let store_path = String::from_utf8(output.stdout).ok()?;
let lib_dir = format!("{}/lib", store_path.trim());
if std::path::Path::new(&lib_dir).exists() {
Some(lib_dir)
} else {
None
}
}
fn find_flake_root(start: &str) -> Option<String> {
let mut path = std::path::PathBuf::from(start);
loop {
if path.join("flake.nix").exists() {
return Some(path.to_string_lossy().into_owned());
}
if !path.pop() {
return None;
}
}
}

View File

@ -1,3 +1,9 @@
mod local_broadcaster;
pub use local_broadcaster::LocalBroadcaster;
#[cfg(logos_delivery)]
pub mod embedded_p2p_delivery;
#[cfg(logos_delivery)]
pub use embedded_p2p_delivery::{EmbeddedP2pDeliveryService, P2pConfig};

View File

@ -19,7 +19,7 @@ use std::time::Duration;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64;
use crossbeam_channel::{Receiver, Sender};
use logos_chat::{AddressedEnvelope, DeliveryService, Transport};
use libchat::{AddressedEnvelope, DeliveryService};
use tracing::{error, info, warn};
use wrapper::LogosNodeCtx;
@ -49,16 +49,16 @@ struct OutboundCmd {
type SubscriberList = Arc<Mutex<Vec<Sender<Vec<u8>>>>>;
// ── Config ───────────────────────────────────────────────────────────────────
// ── P2pConfig ───────────────────────────────────────────────────────────────────
#[derive(Debug, Clone)]
pub struct Config {
pub struct P2pConfig {
pub preset: String,
pub tcp_port: u16,
pub log_level: String,
}
impl Default for Config {
impl Default for P2pConfig {
fn default() -> Self {
Self {
preset: "logos.dev".into(),
@ -115,22 +115,22 @@ impl WakuPayload {
}
}
// ── Service ──────────────────────────────────────────────────────────────────
// ── EmbeddedP2pDeliveryService ──────────────────────────────────────────────────
/// logos-delivery backed delivery service. Cheap to clone — all clones share
/// the same background node.
#[derive(Clone, Debug)]
pub struct Service {
pub struct EmbeddedP2pDeliveryService {
outbound: mpsc::SyncSender<OutboundCmd>,
#[allow(dead_code)]
subscribers: SubscriberList,
inbound_rx: Option<Receiver<Vec<u8>>>,
}
impl Service {
impl EmbeddedP2pDeliveryService {
/// Start the embedded logos-delivery node. The client drains inbound
/// payloads via [`Transport::inbound`].
pub fn start(cfg: Config) -> Result<Self, DeliveryError> {
pub fn start(cfg: P2pConfig) -> Result<Self, DeliveryError> {
let (out_tx, out_rx) = mpsc::sync_channel::<OutboundCmd>(256);
let subscribers: SubscriberList = Arc::new(Mutex::new(Vec::new()));
let (ready_tx, ready_rx) = mpsc::channel::<Result<(), DeliveryError>>();
@ -177,7 +177,7 @@ impl Service {
}
fn node_thread(
cfg: Config,
cfg: P2pConfig,
out_rx: mpsc::Receiver<OutboundCmd>,
subscribers: SubscriberList,
inbound_tx: Sender<Vec<u8>>,
@ -276,9 +276,15 @@ impl Service {
msg.payload.decode()
}
pub fn inbound_queue(&mut self) -> Receiver<Vec<u8>> {
self.inbound_rx
.take()
.expect("inbound_queue called more than once")
}
}
impl DeliveryService for Service {
impl DeliveryService for EmbeddedP2pDeliveryService {
type Error = DeliveryError;
fn publish(&mut self, envelope: AddressedEnvelope) -> Result<(), DeliveryError> {
@ -306,11 +312,3 @@ impl DeliveryService for Service {
Ok(())
}
}
impl Transport for Service {
fn inbound(&mut self) -> Receiver<Vec<u8>> {
self.inbound_rx
.take()
.expect("Service::inbound called more than once")
}
}

View File

@ -1,5 +1,5 @@
mod contact_registry;
mod delivery;
pub mod delivery;
mod storage;
mod wakeup;

View File

@ -37,9 +37,10 @@
}
);
devShells = forAllSystems ({ pkgs, ... }:
devShells = forAllSystems ({ pkgs, system, ... }:
let
rustToolchain = pkgs.rust-bin.fromRustupToolchainFile ./rust_toolchain.toml;
logosDeliveryLib = self.packages.${system}.logos-delivery;
in
{
default = pkgs.mkShell {
@ -49,7 +50,15 @@
pkgs.cmake
pkgs.perl
pkgs.protobuf
];
]
# components/build.rs rewrites the dylib soname via patchelf on
# Linux so consumers link without their own build.rs. macOS uses
# install_name_tool, which ships with the toolchain.
++ pkgs.lib.optionals pkgs.stdenv.isLinux [ pkgs.patchelf ];
buildInputs = [ logosDeliveryLib ];
shellHook = ''
export LOGOS_DELIVERY_LIB_DIR="${logosDeliveryLib}/lib"
'';
};
}
);