Add snapshot-aware local node startup

This commit is contained in:
andrussal 2026-03-20 08:11:06 +01:00
parent 4b44a962d6
commit 00411bb5be
8 changed files with 115 additions and 6 deletions

13
Cargo.lock generated
View File

@ -1326,7 +1326,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de" checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de"
dependencies = [ dependencies = [
"data-encoding", "data-encoding",
"syn 1.0.109", "syn 2.0.114",
] ]
[[package]] [[package]]
@ -1744,6 +1744,12 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "fs_extra"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.31" version = "0.3.31"
@ -5467,7 +5473,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"itertools 0.10.5", "itertools 0.14.0",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.114", "syn 2.0.114",
@ -6617,6 +6623,7 @@ name = "testing-framework-runner-local"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"fs_extra",
"tempfile", "tempfile",
"testing-framework-core", "testing-framework-core",
"thiserror 2.0.18", "thiserror 2.0.18",
@ -7564,7 +7571,7 @@ version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
dependencies = [ dependencies = [
"windows-sys 0.48.0", "windows-sys 0.61.2",
] ]
[[package]] [[package]]

View File

@ -89,6 +89,25 @@ impl LocalDeployerEnv for LbcExtEnv {
) )
} }
fn build_node_config_from_template(
topology: &Self::Deployment,
index: usize,
peer_ports_by_name: &HashMap<String, u16>,
options: &StartNodeOptions<Self>,
peer_ports: &[u16],
template_config: Option<&<Self as Application>::NodeConfig>,
) -> Result<BuiltNodeConfig<<Self as Application>::NodeConfig>, DynError> {
let mapped_options = map_start_options(options)?;
<LbcEnv as LocalDeployerEnv>::build_node_config_from_template(
topology,
index,
peer_ports_by_name,
&mapped_options,
peer_ports,
template_config,
)
}
fn build_initial_node_configs( fn build_initial_node_configs(
topology: &Self::Deployment, topology: &Self::Deployment,
) -> Result<Vec<NodeConfigEntry<<Self as Application>::NodeConfig>>, ProcessSpawnError> { ) -> Result<Vec<NodeConfigEntry<<Self as Application>::NodeConfig>>, ProcessSpawnError> {
@ -103,6 +122,14 @@ impl LocalDeployerEnv for LbcExtEnv {
<LbcEnv as LocalDeployerEnv>::initial_persist_dir(topology, node_name, index) <LbcEnv as LocalDeployerEnv>::initial_persist_dir(topology, node_name, index)
} }
fn initial_snapshot_dir(
topology: &Self::Deployment,
node_name: &str,
index: usize,
) -> Option<PathBuf> {
<LbcEnv as LocalDeployerEnv>::initial_snapshot_dir(topology, node_name, index)
}
fn build_launch_spec( fn build_launch_spec(
config: &<Self as Application>::NodeConfig, config: &<Self as Application>::NodeConfig,
dir: &Path, dir: &Path,
@ -135,6 +162,7 @@ fn map_start_options(
mapped.peers = options.peers.clone(); mapped.peers = options.peers.clone();
mapped.config_override = options.config_override.clone(); mapped.config_override = options.config_override.clone();
mapped.persist_dir = options.persist_dir.clone(); mapped.persist_dir = options.persist_dir.clone();
mapped.snapshot_dir = options.snapshot_dir.clone();
Ok(mapped) Ok(mapped)
} }

View File

@ -42,6 +42,8 @@ pub struct StartNodeOptions<E: Application> {
Option<Arc<dyn Fn(E::NodeConfig) -> Result<E::NodeConfig, DynError> + Send + Sync>>, Option<Arc<dyn Fn(E::NodeConfig) -> Result<E::NodeConfig, DynError> + Send + Sync>>,
/// Optional persistent working directory for this node process. /// Optional persistent working directory for this node process.
pub persist_dir: Option<PathBuf>, pub persist_dir: Option<PathBuf>,
/// Optional directory whose contents should seed the node working dir.
pub snapshot_dir: Option<PathBuf>,
_phantom: PhantomData<E>, _phantom: PhantomData<E>,
} }
@ -52,6 +54,7 @@ impl<E: Application> fmt::Debug for StartNodeOptions<E> {
.field("config_override", &self.config_override.is_some()) .field("config_override", &self.config_override.is_some())
.field("config_patch", &self.config_patch.is_some()) .field("config_patch", &self.config_patch.is_some())
.field("persist_dir", &self.persist_dir) .field("persist_dir", &self.persist_dir)
.field("snapshot_dir", &self.snapshot_dir)
.finish() .finish()
} }
} }
@ -63,6 +66,7 @@ impl<E: Application> Default for StartNodeOptions<E> {
config_override: None, config_override: None,
config_patch: None, config_patch: None,
persist_dir: None, persist_dir: None,
snapshot_dir: None,
_phantom: PhantomData, _phantom: PhantomData,
} }
} }
@ -95,6 +99,12 @@ impl<E: Application> StartNodeOptions<E> {
self.persist_dir = Some(persist_dir); self.persist_dir = Some(persist_dir);
self self
} }
#[must_use]
pub fn with_snapshot_dir(mut self, snapshot_dir: PathBuf) -> Self {
self.snapshot_dir = Some(snapshot_dir);
self
}
} }
/// Indicates whether a capability requires node control. /// Indicates whether a capability requires node control.

View File

@ -14,6 +14,7 @@ workspace = true
[dependencies] [dependencies]
async-trait = "0.1" async-trait = "0.1"
fs_extra = "1.3"
tempfile = { workspace = true } tempfile = { workspace = true }
testing-framework-core = { path = "../../core" } testing-framework-core = { path = "../../core" }
thiserror = { workspace = true } thiserror = { workspace = true }

View File

@ -35,6 +35,17 @@ where
peer_ports: &[u16], peer_ports: &[u16],
) -> Result<BuiltNodeConfig<<Self as Application>::NodeConfig>, DynError>; ) -> Result<BuiltNodeConfig<<Self as Application>::NodeConfig>, DynError>;
fn build_node_config_from_template(
topology: &Self::Deployment,
index: usize,
peer_ports_by_name: &HashMap<String, u16>,
options: &StartNodeOptions<Self>,
peer_ports: &[u16],
_template_config: Option<&<Self as Application>::NodeConfig>,
) -> Result<BuiltNodeConfig<<Self as Application>::NodeConfig>, DynError> {
Self::build_node_config(topology, index, peer_ports_by_name, options, peer_ports)
}
fn build_initial_node_configs( fn build_initial_node_configs(
topology: &Self::Deployment, topology: &Self::Deployment,
) -> Result<Vec<NodeConfigEntry<<Self as Application>::NodeConfig>>, ProcessSpawnError>; ) -> Result<Vec<NodeConfigEntry<<Self as Application>::NodeConfig>>, ProcessSpawnError>;
@ -47,6 +58,14 @@ where
None None
} }
fn initial_snapshot_dir(
_topology: &Self::Deployment,
_node_name: &str,
_index: usize,
) -> Option<PathBuf> {
None
}
fn build_launch_spec( fn build_launch_spec(
config: &<Self as Application>::NodeConfig, config: &<Self as Application>::NodeConfig,
dir: &Path, dir: &Path,
@ -90,6 +109,7 @@ pub async fn spawn_node_from_config<E: LocalDeployerEnv>(
config: <E as Application>::NodeConfig, config: <E as Application>::NodeConfig,
keep_tempdir: bool, keep_tempdir: bool,
persist_dir: Option<&std::path::Path>, persist_dir: Option<&std::path::Path>,
snapshot_dir: Option<&std::path::Path>,
) -> Result<Node<E>, ProcessSpawnError> { ) -> Result<Node<E>, ProcessSpawnError> {
ProcessNode::spawn( ProcessNode::spawn(
&label, &label,
@ -98,6 +118,7 @@ pub async fn spawn_node_from_config<E: LocalDeployerEnv>(
E::node_endpoints, E::node_endpoints,
keep_tempdir, keep_tempdir,
persist_dir, persist_dir,
snapshot_dir,
E::node_client, E::node_client,
) )
.await .await

View File

@ -19,11 +19,12 @@ mod state;
use state::LocalNodeManagerState; use state::LocalNodeManagerState;
#[derive(Clone)] #[derive(Clone)]
struct NodeStartSnapshot { struct NodeStartSnapshot<Config> {
peer_ports: Vec<u16>, peer_ports: Vec<u16>,
peer_ports_by_name: HashMap<String, u16>, peer_ports_by_name: HashMap<String, u16>,
node_name: String, node_name: String,
index: usize, index: usize,
template_config: Option<Config>,
} }
#[derive(Debug, Error)] #[derive(Debug, Error)]
@ -83,12 +84,14 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
for (index, config_entry) in configs.into_iter().enumerate() { for (index, config_entry) in configs.into_iter().enumerate() {
let persist_dir = E::initial_persist_dir(descriptors, &config_entry.name, index); let persist_dir = E::initial_persist_dir(descriptors, &config_entry.name, index);
let snapshot_dir = E::initial_snapshot_dir(descriptors, &config_entry.name, index);
spawned.push( spawned.push(
spawn_node_from_config::<E>( spawn_node_from_config::<E>(
config_entry.name, config_entry.name,
config_entry.config, config_entry.config,
keep_tempdir, keep_tempdir,
persist_dir.as_deref(), persist_dir.as_deref(),
snapshot_dir.as_deref(),
) )
.await?, .await?,
); );
@ -113,6 +116,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
clients_by_name: HashMap::new(), clients_by_name: HashMap::new(),
indices_by_name: HashMap::new(), indices_by_name: HashMap::new(),
nodes: Vec::new(), nodes: Vec::new(),
template_config: None,
}; };
Self { Self {
@ -146,6 +150,9 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
pub fn stop_all(&self) { pub fn stop_all(&self) {
let mut state = self.lock_state(); let mut state = self.lock_state();
for node in &mut state.nodes {
node.start_kill();
}
state.nodes.clear(); state.nodes.clear();
state.peer_ports.clone_from(&self.seed.peer_ports); state.peer_ports.clone_from(&self.seed.peer_ports);
@ -155,6 +162,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
state.clients_by_name.clear(); state.clients_by_name.clear();
state.indices_by_name.clear(); state.indices_by_name.clear();
state.node_count = self.seed.node_count; state.node_count = self.seed.node_count;
state.template_config = None;
self.node_clients.clear(); self.node_clients.clear();
} }
@ -228,12 +236,13 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
) -> Result<StartedNode<E>, NodeManagerError> { ) -> Result<StartedNode<E>, NodeManagerError> {
let snapshot = self.start_snapshot(name)?; let snapshot = self.start_snapshot(name)?;
let mut built = E::build_node_config( let mut built = E::build_node_config_from_template(
&self.descriptors, &self.descriptors,
snapshot.index, snapshot.index,
&snapshot.peer_ports_by_name, &snapshot.peer_ports_by_name,
&options, &options,
&snapshot.peer_ports, &snapshot.peer_ports,
snapshot.template_config.as_ref(),
) )
.map_err(|source| NodeManagerError::Config { source })?; .map_err(|source| NodeManagerError::Config { source })?;
@ -250,6 +259,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
built.network_port, built.network_port,
built.config, built.config,
options.persist_dir.as_deref(), options.persist_dir.as_deref(),
options.snapshot_dir.as_deref(),
) )
.await?; .await?;
@ -290,12 +300,14 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
network_port: u16, network_port: u16,
config: <E as Application>::NodeConfig, config: <E as Application>::NodeConfig,
persist_dir: Option<&std::path::Path>, persist_dir: Option<&std::path::Path>,
snapshot_dir: Option<&std::path::Path>,
) -> Result<E::NodeClient, NodeManagerError> { ) -> Result<E::NodeClient, NodeManagerError> {
let node = spawn_node_from_config::<E>( let node = spawn_node_from_config::<E>(
node_name.to_string(), node_name.to_string(),
config, config,
self.keep_tempdir, self.keep_tempdir,
persist_dir, persist_dir,
snapshot_dir,
) )
.await .await
.map_err(|source| NodeManagerError::Spawn { .map_err(|source| NodeManagerError::Spawn {
@ -306,6 +318,9 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
self.node_clients.add_node(client.clone()); self.node_clients.add_node(client.clone());
let mut state = self.lock_state(); let mut state = self.lock_state();
if state.template_config.is_none() && snapshot_dir.is_some() {
state.template_config = Some(node.config().clone());
}
state.register_node(node_name, network_port, client.clone(), node); state.register_node(node_name, network_port, client.clone(), node);
@ -322,7 +337,10 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
reinsert_node_at(&mut state, index, node); reinsert_node_at(&mut state, index, node);
} }
fn start_snapshot(&self, requested_name: &str) -> Result<NodeStartSnapshot, NodeManagerError> { fn start_snapshot(
&self,
requested_name: &str,
) -> Result<NodeStartSnapshot<E::NodeConfig>, NodeManagerError> {
let state = self.lock_state(); let state = self.lock_state();
let index = state.node_count; let index = state.node_count;
let node_name = validate_new_node_name::<E>(state.node_count, &state, requested_name)?; let node_name = validate_new_node_name::<E>(state.node_count, &state, requested_name)?;
@ -332,6 +350,7 @@ impl<E: LocalDeployerEnv> NodeManager<E> {
peer_ports_by_name: state.peer_ports_by_name.clone(), peer_ports_by_name: state.peer_ports_by_name.clone(),
node_name, node_name,
index, index,
template_config: state.template_config.clone(),
}) })
} }
@ -349,6 +368,7 @@ fn clear_registered_nodes<E: LocalDeployerEnv>(state: &mut LocalNodeManagerState
state.clients_by_name.clear(); state.clients_by_name.clear();
state.indices_by_name.clear(); state.indices_by_name.clear();
state.node_count = 0; state.node_count = 0;
state.template_config = None;
} }
fn validate_new_node_name<E: LocalDeployerEnv>( fn validate_new_node_name<E: LocalDeployerEnv>(

View File

@ -9,6 +9,7 @@ pub(crate) struct LocalNodeManagerState<E: LocalDeployerEnv> {
pub(crate) clients_by_name: HashMap<String, E::NodeClient>, pub(crate) clients_by_name: HashMap<String, E::NodeClient>,
pub(crate) indices_by_name: HashMap<String, usize>, pub(crate) indices_by_name: HashMap<String, usize>,
pub(crate) nodes: Vec<Node<E>>, pub(crate) nodes: Vec<Node<E>>,
pub(crate) template_config: Option<E::NodeConfig>,
} }
impl<E: LocalDeployerEnv> LocalNodeManagerState<E> { impl<E: LocalDeployerEnv> LocalNodeManagerState<E> {

View File

@ -10,6 +10,7 @@ use std::{
time::Duration, time::Duration,
}; };
use fs_extra::dir::{CopyOptions, copy as copy_dir};
use tempfile::TempDir; use tempfile::TempDir;
use testing_framework_core::{env::Application, process::RuntimeNode, scenario::DynError}; use testing_framework_core::{env::Application, process::RuntimeNode, scenario::DynError};
use tokio::{ use tokio::{
@ -112,6 +113,11 @@ pub enum ProcessSpawnError {
#[source] #[source]
source: io::Error, source: io::Error,
}, },
#[error("failed to copy snapshot directory: {source}")]
Snapshot {
#[source]
source: io::Error,
},
#[error("process wait failed: {source}")] #[error("process wait failed: {source}")]
Wait { Wait {
#[source] #[source]
@ -192,9 +198,14 @@ impl<Config: Clone + Send + Sync + 'static, Client: Clone + Send + Sync + 'stati
endpoints_from_config: impl FnOnce(&Config) -> NodeEndpoints, endpoints_from_config: impl FnOnce(&Config) -> NodeEndpoints,
keep_tempdir: bool, keep_tempdir: bool,
persist_dir: Option<&Path>, persist_dir: Option<&Path>,
snapshot_dir: Option<&Path>,
client_from_endpoints: impl FnOnce(&NodeEndpoints) -> Client, client_from_endpoints: impl FnOnce(&NodeEndpoints) -> Client,
) -> Result<Self, ProcessSpawnError> { ) -> Result<Self, ProcessSpawnError> {
let tempdir = create_tempdir(persist_dir)?; let tempdir = create_tempdir(persist_dir)?;
if let Some(snapshot_dir) = snapshot_dir {
copy_snapshot_dir(snapshot_dir, tempdir.path())
.map_err(|source| ProcessSpawnError::Snapshot { source })?;
}
let launch = build_launch_spec(&config, tempdir.path(), label) let launch = build_launch_spec(&config, tempdir.path(), label)
.map_err(|source| ProcessSpawnError::Config { source })?; .map_err(|source| ProcessSpawnError::Config { source })?;
@ -328,6 +339,16 @@ fn write_launch_file(base: &Path, file: &LaunchFile) -> io::Result<()> {
fs::write(path, &file.contents) fs::write(path, &file.contents)
} }
fn copy_snapshot_dir(from: &Path, to: &Path) -> io::Result<()> {
let mut options = CopyOptions::new();
options.copy_inside = true;
options.overwrite = true;
copy_dir(from, to, &options)
.map(|_| ())
.map_err(io::Error::other)
}
fn default_api_socket() -> SocketAddr { fn default_api_socket() -> SocketAddr {
SocketAddr::from((Ipv4Addr::LOCALHOST, 0)) SocketAddr::from((Ipv4Addr::LOCALHOST, 0))
} }