Add compose attached node control and attach discovery

This commit is contained in:
andrussal 2026-03-02 11:19:55 +01:00
parent cd285484a7
commit 4195707aa7
15 changed files with 837 additions and 34 deletions

View File

@ -0,0 +1,196 @@
use std::{
env,
process::{Command, Stdio},
thread,
time::Duration,
};
use anyhow::{Result, anyhow};
use lb_ext::{CoreBuilderExt as _, LbcComposeDeployer, LbcExtEnv, ScenarioBuilder};
use testing_framework_core::scenario::{AttachSource, Deployer as _, Runner};
#[tokio::test]
async fn compose_attach_mode_restart_node_opt_in() -> Result<()> {
if env::var("TF_RUN_COMPOSE_ATTACH_NODE_CONTROL").is_err() {
return Ok(());
}
let managed = ScenarioBuilder::deployment_with(|d| d.with_node_count(1))
.enable_node_control()
.with_run_duration(Duration::from_secs(5))
.build()?;
let deployer = LbcComposeDeployer::default();
let managed_runner: Runner<LbcExtEnv> = deployer.deploy(&managed).await?;
let managed_client = managed_runner
.context()
.node_clients()
.snapshot()
.into_iter()
.next()
.ok_or_else(|| anyhow!("managed compose runner returned no node clients"))?;
let api_port = managed_client
.base_url()
.port()
.ok_or_else(|| anyhow!("managed node base url has no port"))?;
let project_name = discover_compose_project_by_published_port(api_port)?;
let attached = ScenarioBuilder::deployment_with(|d| d.with_node_count(1))
.enable_node_control()
.with_run_duration(Duration::from_secs(5))
.with_attach_source(
AttachSource::compose(vec!["node-0".to_owned()]).with_project(project_name.clone()),
)
.build()?;
let attached_runner: Runner<LbcExtEnv> = deployer.deploy(&attached).await?;
let pre_restart_container = discover_compose_service_container(&project_name, "node-0")?;
let pre_restart_started_at = inspect_container_started_at(&pre_restart_container)?;
let control = attached_runner
.context()
.node_control()
.ok_or_else(|| anyhow!("attached compose node control is unavailable"))?;
control
.restart_node("node-0")
.await
.map_err(|err| anyhow!("attached restart failed: {err}"))?;
wait_until_container_restarted(
&project_name,
"node-0",
&pre_restart_started_at,
Duration::from_secs(30),
)?;
Ok(())
}
fn discover_compose_project_by_published_port(port: u16) -> Result<String> {
let container_ids = run_docker_capture(["ps", "-q"])?;
let host_port_token = format!("\"HostPort\":\"{port}\"");
let mut matching_projects = Vec::new();
for container_id in container_ids
.lines()
.map(str::trim)
.filter(|id| !id.is_empty())
{
let ports = run_docker_capture([
"inspect",
"--format",
"{{json .NetworkSettings.Ports}}",
container_id,
])?;
if !ports.contains(&host_port_token) {
continue;
}
let project = run_docker_capture([
"inspect",
"--format",
"{{ index .Config.Labels \"com.docker.compose.project\" }}",
container_id,
])?;
let project = project.trim();
if !project.is_empty() {
matching_projects.push(project.to_owned());
}
}
match matching_projects.as_slice() {
[project] => Ok(project.clone()),
[] => Err(anyhow!(
"no compose project found exposing api host port {port}"
)),
_ => Err(anyhow!(
"multiple compose projects expose api host port {port}: {:?}",
matching_projects
)),
}
}
fn discover_compose_service_container(project: &str, service: &str) -> Result<String> {
let container = run_docker_capture([
"ps",
"--filter",
&format!("label=com.docker.compose.project={project}"),
"--filter",
&format!("label=com.docker.compose.service={service}"),
"--format",
"{{.ID}}",
])?;
let mut lines = container
.lines()
.map(str::trim)
.filter(|line| !line.is_empty());
let Some(container_id) = lines.next() else {
return Err(anyhow!(
"no running container found for compose project '{project}' service '{service}'"
));
};
if lines.next().is_some() {
return Err(anyhow!(
"multiple running containers found for compose project '{project}' service '{service}'"
));
}
Ok(container_id.to_owned())
}
fn inspect_container_started_at(container_id: &str) -> Result<String> {
let started_at =
run_docker_capture(["inspect", "--format", "{{.State.StartedAt}}", container_id])?;
let started_at = started_at.trim();
if started_at.is_empty() {
return Err(anyhow!(
"docker inspect returned empty StartedAt for container {container_id}"
));
}
Ok(started_at.to_owned())
}
fn wait_until_container_restarted(
project: &str,
service: &str,
previous_started_at: &str,
timeout: Duration,
) -> Result<()> {
let deadline = std::time::Instant::now() + timeout;
loop {
let container_id = discover_compose_service_container(project, service)?;
let started_at = inspect_container_started_at(&container_id)?;
if started_at != previous_started_at {
return Ok(());
}
if std::time::Instant::now() >= deadline {
return Err(anyhow!(
"timed out waiting for restarted container timestamp change: {project}/{service}"
));
}
thread::sleep(Duration::from_millis(500));
}
}
fn run_docker_capture<const N: usize>(args: [&str; N]) -> Result<String> {
let output = Command::new("docker")
.args(args)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.output()?;
if !output.status.success() {
return Err(anyhow!(
"docker {} failed: status={} stderr={}",
args.join(" "),
output.status,
String::from_utf8_lossy(&output.stderr).trim()
));
}
Ok(String::from_utf8_lossy(&output.stdout).to_string())
}

View File

@ -103,6 +103,110 @@ async fn scenario_builder_api_port_override() -> Result<()> {
Ok(())
}
#[tokio::test]
#[ignore = "run manually with `cargo test -p runner-examples -- --ignored scenario_builder_identify_hide_listen_addrs_override`"]
async fn scenario_builder_identify_hide_listen_addrs_override() -> Result<()> {
let _ = try_init();
// Required env vars (set on the command line when running this test):
// - `POL_PROOF_DEV_MODE=true`
// - `LOGOS_BLOCKCHAIN_NODE_BIN=...`
// - `LOGOS_BLOCKCHAIN_CIRCUITS=...`
// - `RUST_LOG=info` (optional)
let base_builder = DeploymentBuilder::new(TopologyConfig::with_node_numbers(1));
let base_descriptors = base_builder.clone().build()?;
let base_node = base_descriptors.nodes().first().expect("node 0 descriptor");
let mut run_config = build_node_run_config(
&base_descriptors,
base_node,
base_descriptors
.config()
.node_config_override(base_node.index()),
)
.expect("build run config");
run_config
.user
.network
.backend
.swarm
.identify
.hide_listen_addrs = Some(true);
let mut scenario = ScenarioBuilder::new(Box::new(
base_builder.with_node_config_override(0, run_config),
))
.with_run_duration(Duration::from_secs(1))
.build()?;
let deployer = LbcLocalDeployer::default();
let runner = deployer.deploy(&scenario).await?;
let handle = runner.run(&mut scenario).await?;
let client = handle
.context()
.random_node_client()
.ok_or_else(|| anyhow::anyhow!("scenario did not expose any node clients"))?;
client
.consensus_info()
.await
.expect("consensus_info should succeed");
Ok(())
}
#[tokio::test]
#[ignore = "run manually with `cargo test -p runner-examples -- --ignored scenario_builder_identify_hide_listen_addrs_two_nodes`"]
async fn scenario_builder_identify_hide_listen_addrs_two_nodes() -> Result<()> {
let _ = try_init();
// Required env vars (set on the command line when running this test):
// - `POL_PROOF_DEV_MODE=true`
// - `LOGOS_BLOCKCHAIN_NODE_BIN=...`
// - `LOGOS_BLOCKCHAIN_CIRCUITS=...`
// - `RUST_LOG=info` (optional)
let base_builder = DeploymentBuilder::new(TopologyConfig::with_node_numbers(2));
let base_descriptors = base_builder.clone().build()?;
let mut deployment_builder = base_builder;
for idx in 0..2 {
let node = &base_descriptors.nodes()[idx];
let mut run_config = build_node_run_config(
&base_descriptors,
node,
base_descriptors.config().node_config_override(node.index()),
)
.expect("build run config");
run_config
.user
.network
.backend
.swarm
.identify
.hide_listen_addrs = Some(true);
deployment_builder = deployment_builder.with_node_config_override(idx, run_config);
}
let mut scenario = ScenarioBuilder::new(Box::new(deployment_builder))
.with_run_duration(Duration::from_secs(3))
.build()?;
let deployer = LbcLocalDeployer::default();
let runner = deployer.deploy(&scenario).await?;
let handle = runner.run(&mut scenario).await?;
let client = handle
.context()
.random_node_client()
.ok_or_else(|| anyhow::anyhow!("scenario did not expose any node clients"))?;
client
.consensus_info()
.await
.expect("consensus_info should succeed");
Ok(())
}
fn random_api_port() -> u16 {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind random API port");
listener.local_addr().expect("read API port").port()

View File

@ -37,7 +37,8 @@ pub use deployment_policy::{CleanupPolicy, DeploymentPolicy, RetryPolicy};
pub use expectation::Expectation;
pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs};
pub use runtime::{
BorrowedNode, BorrowedOrigin, CleanupGuard, Deployer, Feed, FeedHandle, FeedRuntime,
ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode, BorrowedNode,
BorrowedOrigin, CleanupGuard, Deployer, Feed, FeedHandle, FeedRuntime,
HttpReadinessRequirement, ManagedNode, ManagedSource, NodeClients, NodeHandle, NodeInventory,
ReadinessError, RunContext, RunHandle, RunMetrics, Runner, ScenarioError,
SourceOrchestrationPlan, SourceProviders, StabilizationConfig, StaticManagedProvider,
@ -46,9 +47,10 @@ pub use runtime::{
CONSENSUS_PROCESSED_BLOCKS, CONSENSUS_TRANSACTIONS_TOTAL, Metrics, MetricsError,
PrometheusEndpoint, PrometheusInstantSample,
},
orchestrate_sources, resolve_sources, spawn_feed, wait_for_http_ports,
wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement,
wait_for_http_ports_with_requirement, wait_http_readiness, wait_until_stable,
orchestrate_sources, orchestrate_sources_with_providers, resolve_sources, spawn_feed,
wait_for_http_ports, wait_for_http_ports_with_host,
wait_for_http_ports_with_host_and_requirement, wait_for_http_ports_with_requirement,
wait_http_readiness, wait_until_stable,
};
pub use sources::{AttachSource, ExternalNodeSource, ScenarioSources, SourceReadinessPolicy};
pub use workload::Workload;

View File

@ -16,10 +16,13 @@ pub use node_clients::NodeClients;
#[doc(hidden)]
pub use orchestration::{
ManagedSource, SourceOrchestrationPlan, build_source_orchestration_plan, orchestrate_sources,
resolve_sources,
orchestrate_sources_with_providers, resolve_sources,
};
#[doc(hidden)]
pub use providers::{SourceProviders, StaticManagedProvider};
pub use providers::{
ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode,
SourceProviders, StaticManagedProvider,
};
pub use readiness::{
HttpReadinessRequirement, ReadinessError, StabilizationConfig, wait_for_http_ports,
wait_for_http_ports_with_host, wait_for_http_ports_with_host_and_requirement,

View File

@ -7,4 +7,7 @@ pub use source_orchestration_plan::{
ManagedSource, SourceModeName, SourceOrchestrationMode, SourceOrchestrationPlan,
SourceOrchestrationPlanError,
};
pub use source_resolver::{build_source_orchestration_plan, orchestrate_sources, resolve_sources};
pub use source_resolver::{
build_source_orchestration_plan, orchestrate_sources, orchestrate_sources_with_providers,
resolve_sources,
};

View File

@ -65,13 +65,10 @@ impl SourceOrchestrationPlan {
) -> Result<Self, SourceOrchestrationPlanError> {
let mode = mode_from_sources(sources);
let plan = Self {
Ok(Self {
mode,
readiness_policy,
};
plan.ensure_currently_wired()?;
Ok(plan)
})
}
#[must_use]
@ -82,13 +79,24 @@ impl SourceOrchestrationPlan {
| SourceOrchestrationMode::ExternalOnly { external } => external,
}
}
}
fn ensure_currently_wired(&self) -> Result<(), SourceOrchestrationPlanError> {
match self.mode {
SourceOrchestrationMode::Managed { .. }
| SourceOrchestrationMode::ExternalOnly { .. } => Ok(()),
SourceOrchestrationMode::Attached { .. } => not_wired(SourceModeName::Attached),
}
#[cfg(test)]
mod tests {
use super::{SourceOrchestrationMode, SourceOrchestrationPlan};
use crate::scenario::{AttachSource, ScenarioSources, SourceReadinessPolicy};
#[test]
fn attached_sources_are_planned() {
let sources = ScenarioSources::attached(AttachSource::compose(vec!["node-0".to_string()]));
let plan =
SourceOrchestrationPlan::try_from_sources(&sources, SourceReadinessPolicy::AllReady)
.expect("attached sources should build a source orchestration plan");
assert!(matches!(
plan.mode,
SourceOrchestrationMode::Attached { .. }
));
}
}
@ -107,7 +115,3 @@ fn mode_from_sources(sources: &ScenarioSources) -> SourceOrchestrationMode {
},
}
}
fn not_wired(mode: SourceModeName) -> Result<(), SourceOrchestrationPlanError> {
Err(SourceOrchestrationPlanError::SourceModeNotWiredYet { mode })
}

View File

@ -91,8 +91,7 @@ pub async fn resolve_sources<E: Application>(
/// - Managed mode is backed by prebuilt deployer-managed clients via
/// `StaticManagedProvider`.
/// - External nodes are resolved via `Application::external_node_client`.
/// - Attached mode remains blocked at plan validation until attach providers
/// are fully wired.
/// - Attached nodes are discovered through the selected attach provider.
pub async fn orchestrate_sources<E: Application>(
plan: &SourceOrchestrationPlan,
node_clients: NodeClients<E>,
@ -103,6 +102,17 @@ pub async fn orchestrate_sources<E: Application>(
)))
.with_external(Arc::new(ApplicationExternalProvider));
orchestrate_sources_with_providers(plan, providers).await
}
/// Orchestrates scenario sources with caller-supplied provider set.
///
/// Deployer runtimes can use this to inject attach/external providers with
/// backend-specific discovery and control semantics.
pub async fn orchestrate_sources_with_providers<E: Application>(
plan: &SourceOrchestrationPlan,
providers: SourceProviders<E>,
) -> Result<NodeClients<E>, DynError> {
let resolved = resolve_sources(plan, &providers).await?;
if matches!(plan.mode, SourceOrchestrationMode::Managed { .. }) && resolved.managed.is_empty() {

View File

@ -7,7 +7,7 @@ mod managed_provider;
#[allow(dead_code)]
mod source_providers;
pub use attach_provider::{AttachProviderError, AttachedNode};
pub use attach_provider::{AttachProvider, AttachProviderError, AttachedNode};
pub use external_provider::{ApplicationExternalProvider, ExternalNode, ExternalProviderError};
pub use managed_provider::{ManagedProviderError, ManagedProvisionedNode, StaticManagedProvider};
pub use source_providers::SourceProviders;

View File

@ -17,6 +17,7 @@ anyhow = "1"
async-trait = { workspace = true }
reqwest = { features = ["json"], workspace = true }
serde = { features = ["derive"], workspace = true }
serde_json = { workspace = true }
tempfile = { workspace = true }
tera = "1.19"
testing-framework-core = { path = "../../core" }
@ -30,5 +31,4 @@ uuid = { features = ["v4"], version = "1" }
[dev-dependencies]
groth16 = { workspace = true }
key-management-system-service = { workspace = true }
serde_json = { workspace = true }
zksign = { workspace = true }

View File

@ -0,0 +1,146 @@
use std::marker::PhantomData;
use async_trait::async_trait;
use testing_framework_core::scenario::{
AttachProvider, AttachProviderError, AttachSource, AttachedNode, DynError, ExternalNodeSource,
};
use url::Url;
use crate::{
docker::attached::{discover_service_container_id, inspect_mapped_tcp_ports},
env::ComposeDeployEnv,
};
pub(super) struct ComposeAttachProvider<E: ComposeDeployEnv> {
host: String,
_env: PhantomData<E>,
}
impl<E: ComposeDeployEnv> ComposeAttachProvider<E> {
pub(super) fn new(host: String) -> Self {
Self {
host,
_env: PhantomData,
}
}
}
#[async_trait]
impl<E: ComposeDeployEnv> AttachProvider<E> for ComposeAttachProvider<E> {
async fn discover(
&self,
source: &AttachSource,
) -> Result<Vec<AttachedNode<E>>, AttachProviderError> {
let (project, services) = match source {
AttachSource::Compose { project, services } => (project, services),
_ => {
return Err(AttachProviderError::UnsupportedSource {
attach_source: source.clone(),
});
}
};
let project = project
.as_ref()
.ok_or_else(|| AttachProviderError::Discovery {
source: "compose attach source requires an explicit project name".into(),
})?;
if services.is_empty() {
return Err(AttachProviderError::Discovery {
source: "compose attach source requires at least one service name".into(),
});
}
let mut attached = Vec::with_capacity(services.len());
for service in services {
let container_id = discover_service_container_id(project, service)
.await
.map_err(to_discovery_error)?;
let api_port = discover_api_port(&container_id)
.await
.map_err(to_discovery_error)?;
let endpoint =
build_service_endpoint(&self.host, api_port).map_err(to_discovery_error)?;
let source = ExternalNodeSource::new(service.clone(), endpoint.to_string());
let client = E::external_node_client(&source).map_err(to_discovery_error)?;
attached.push(AttachedNode {
identity_hint: Some(service.clone()),
client,
});
}
Ok(attached)
}
}
fn to_discovery_error(source: DynError) -> AttachProviderError {
AttachProviderError::Discovery { source }
}
async fn discover_api_port(container_id: &str) -> Result<u16, DynError> {
let mapped_ports = inspect_mapped_tcp_ports(container_id).await?;
match mapped_ports.as_slice() {
[] => Err(format!(
"no mapped tcp ports discovered for attached compose service container '{container_id}'"
)
.into()),
[port] => Ok(port.host_port),
_ => {
let mapped_ports = mapped_ports
.iter()
.map(|port| format!("{}->{}", port.container_port, port.host_port))
.collect::<Vec<_>>()
.join(", ");
Err(format!(
"attached compose service container '{container_id}' has multiple mapped tcp ports ({mapped_ports}); provide a single exposed API port"
)
.into())
}
}
}
fn build_service_endpoint(host: &str, port: u16) -> Result<Url, DynError> {
let endpoint = Url::parse(&format!("http://{host}:{port}/"))?;
Ok(endpoint)
}
#[cfg(test)]
mod tests {
use super::build_service_endpoint;
use crate::docker::attached::parse_mapped_tcp_ports;
#[test]
fn parse_mapped_tcp_ports_skips_non_tcp_and_invalid_keys() {
let raw = r#"{
"18018/tcp":[{"HostIp":"0.0.0.0","HostPort":"32001"}],
"9999/udp":[{"HostIp":"0.0.0.0","HostPort":"39999"}],
"invalid":[{"HostIp":"0.0.0.0","HostPort":"12345"}]
}"#;
let mapped = parse_mapped_tcp_ports(raw).expect("mapped ports should parse");
assert_eq!(mapped.len(), 1);
assert_eq!(mapped[0].container_port, 18018);
assert_eq!(mapped[0].host_port, 32001);
}
#[test]
fn parse_mapped_tcp_ports_returns_sorted_ports() {
let raw = r#"{
"18019/tcp":[{"HostIp":"0.0.0.0","HostPort":"32002"}],
"18018/tcp":[{"HostIp":"0.0.0.0","HostPort":"32001"}]
}"#;
let mapped = parse_mapped_tcp_ports(raw).expect("mapped ports should parse");
assert_eq!(mapped[0].container_port, 18018);
assert_eq!(mapped[1].container_port, 18019);
}
#[test]
fn build_service_endpoint_formats_http_url() {
let endpoint = build_service_endpoint("127.0.0.1", 32001).expect("endpoint should parse");
assert_eq!(endpoint.as_str(), "http://127.0.0.1:32001/");
}
}

View File

@ -1,3 +1,4 @@
mod attach_provider;
pub mod clients;
pub mod orchestrator;
pub mod ports;

View File

@ -3,10 +3,11 @@ use std::{env, sync::Arc, time::Duration};
use reqwest::Url;
use testing_framework_core::{
scenario::{
DeploymentPolicy, FeedHandle, FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients,
NodeControlHandle, ObservabilityCapabilityProvider, ObservabilityInputs,
RequiresNodeControl, RunContext, Runner, Scenario, build_source_orchestration_plan,
orchestrate_sources,
ApplicationExternalProvider, AttachSource, CleanupGuard, DeploymentPolicy, FeedHandle,
FeedRuntime, HttpReadinessRequirement, Metrics, NodeClients, NodeControlHandle,
ObservabilityCapabilityProvider, ObservabilityInputs, RequiresNodeControl, RunContext,
Runner, Scenario, ScenarioSources, SourceProviders, StaticManagedProvider,
build_source_orchestration_plan, orchestrate_sources_with_providers,
},
topology::DeploymentDescriptor,
};
@ -14,6 +15,7 @@ use tracing::info;
use super::{
ComposeDeployer,
attach_provider::ComposeAttachProvider,
clients::ClientBuilder,
make_cleanup_guard,
ports::PortManager,
@ -21,13 +23,14 @@ use super::{
setup::{DeploymentContext, DeploymentSetup},
};
use crate::{
docker::control::ComposeNodeControl,
docker::control::{ComposeAttachedNodeControl, ComposeNodeControl},
env::ComposeDeployEnv,
errors::ComposeRunnerError,
infrastructure::{
environment::StackEnvironment,
ports::{HostPortMapping, compose_runner_host},
},
lifecycle::block_feed::spawn_block_feed_with_retry,
};
const PRINT_ENDPOINTS_ENV: &str = "TESTNET_PRINT_ENDPOINTS";
@ -55,6 +58,12 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
}
})?;
if scenario.sources().is_attached() {
return self
.deploy_attached_only::<Caps>(scenario, source_plan)
.await;
}
let deployment = scenario.deployment();
let setup = DeploymentSetup::<E>::new(deployment);
setup.validate_environment().await?;
@ -81,7 +90,16 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
.await?;
// Source orchestration currently runs here after managed clients are prepared.
deployed.node_clients = orchestrate_sources(&source_plan, deployed.node_clients)
let source_providers = SourceProviders::default()
.with_managed(Arc::new(StaticManagedProvider::new(
deployed.node_clients.snapshot(),
)))
.with_attach(Arc::new(ComposeAttachProvider::<E>::new(
compose_runner_host(),
)))
.with_external(Arc::new(ApplicationExternalProvider));
deployed.node_clients = orchestrate_sources_with_providers(&source_plan, source_providers)
.await
.map_err(|source| ComposeRunnerError::SourceOrchestration { source })?;
@ -106,6 +124,83 @@ impl<E: ComposeDeployEnv> DeploymentOrchestrator<E> {
Ok(runner)
}
async fn deploy_attached_only<Caps>(
&self,
scenario: &Scenario<E, Caps>,
source_plan: testing_framework_core::scenario::SourceOrchestrationPlan,
) -> Result<Runner<E>, ComposeRunnerError>
where
Caps: RequiresNodeControl + ObservabilityCapabilityProvider + Send + Sync,
{
let observability = resolve_observability_inputs(scenario)?;
let source_providers = SourceProviders::default()
.with_managed(Arc::new(StaticManagedProvider::new(Vec::new())))
.with_attach(Arc::new(ComposeAttachProvider::<E>::new(
compose_runner_host(),
)))
.with_external(Arc::new(ApplicationExternalProvider));
let node_clients = orchestrate_sources_with_providers(&source_plan, source_providers)
.await
.map_err(|source| ComposeRunnerError::SourceOrchestration { source })?;
if node_clients.is_empty() {
return Err(ComposeRunnerError::RuntimePreflight);
}
let node_control = self.attached_node_control::<Caps>(scenario)?;
let (feed, feed_task) = spawn_block_feed_with_retry::<E>(&node_clients).await?;
let context = RunContext::new(
scenario.deployment().clone(),
node_clients,
scenario.duration(),
scenario.expectation_cooldown(),
observability.telemetry_handle()?,
feed,
node_control,
);
let cleanup_guard: Box<dyn CleanupGuard> = Box::new(feed_task);
Ok(Runner::new(context, Some(cleanup_guard)))
}
fn attached_node_control<Caps>(
&self,
scenario: &Scenario<E, Caps>,
) -> Result<Option<Arc<dyn NodeControlHandle<E>>>, ComposeRunnerError>
where
Caps: RequiresNodeControl + Send + Sync,
{
if !Caps::REQUIRED {
return Ok(None);
}
let ScenarioSources::Attached { attach, .. } = scenario.sources() else {
return Err(ComposeRunnerError::InternalInvariant {
message: "attached node control requested outside attached source mode",
});
};
let AttachSource::Compose { project, .. } = attach else {
return Err(ComposeRunnerError::InternalInvariant {
message: "compose deployer requires compose attach source for node control",
});
};
let Some(project_name) = project
.as_ref()
.map(|value| value.trim())
.filter(|value| !value.is_empty())
else {
return Err(ComposeRunnerError::InternalInvariant {
message: "attached compose mode requires explicit project name for node control",
});
};
Ok(Some(Arc::new(ComposeAttachedNodeControl {
project_name: project_name.to_owned(),
}) as Arc<dyn NodeControlHandle<E>>))
}
async fn build_runner<Caps>(
&self,
scenario: &Scenario<E, Caps>,

View File

@ -0,0 +1,129 @@
use std::process::Stdio;
use serde_json::Value;
use testing_framework_core::scenario::DynError;
use tokio::process::Command;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct MappedTcpPort {
pub container_port: u16,
pub host_port: u16,
}
pub async fn discover_service_container_id(
project: &str,
service: &str,
) -> Result<String, DynError> {
let stdout = run_docker_capture([
"ps",
"--filter",
&format!("label=com.docker.compose.project={project}"),
"--filter",
&format!("label=com.docker.compose.service={service}"),
"--format",
"{{.ID}}",
])
.await?;
let ids: Vec<String> = stdout
.lines()
.map(str::trim)
.filter(|line| !line.is_empty())
.map(ToOwned::to_owned)
.collect();
match ids.as_slice() {
[id] => Ok(id.clone()),
[] => Err(format!(
"no running container found for compose project '{project}' service '{service}'"
)
.into()),
_ => Err(format!(
"multiple running containers found for compose project '{project}' service '{service}'"
)
.into()),
}
}
pub async fn inspect_mapped_tcp_ports(container_id: &str) -> Result<Vec<MappedTcpPort>, DynError> {
let stdout = run_docker_capture([
"inspect",
"--format",
"{{json .NetworkSettings.Ports}}",
container_id,
])
.await?;
parse_mapped_tcp_ports(&stdout)
}
pub fn parse_mapped_tcp_ports(raw: &str) -> Result<Vec<MappedTcpPort>, DynError> {
let ports_value: Value = serde_json::from_str(raw.trim())?;
let ports_object = ports_value
.as_object()
.ok_or_else(|| "docker inspect ports payload is not an object".to_owned())?;
let mut mapped = Vec::new();
for (container_port_key, bindings) in ports_object {
let Some(container_port) = parse_container_port(container_port_key) else {
continue;
};
let Some(bindings_array) = bindings.as_array() else {
continue;
};
let Some(host_port) = bindings_array.iter().find_map(parse_host_port_binding) else {
continue;
};
mapped.push(MappedTcpPort {
container_port,
host_port,
});
}
mapped.sort_by_key(|port| port.container_port);
Ok(mapped)
}
pub async fn run_docker_capture<const N: usize>(args: [&str; N]) -> Result<String, DynError> {
let output = Command::new("docker")
.args(args)
.stdin(Stdio::null())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.output()
.await?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
return Err(format!(
"docker {} failed with status {}: {stderr}",
args.join(" "),
output.status
)
.into());
}
Ok(String::from_utf8_lossy(&output.stdout).to_string())
}
fn parse_container_port(port_key: &str) -> Option<u16> {
let (port, proto) = port_key.split_once('/')?;
if proto != "tcp" {
return None;
}
port.parse::<u16>().ok()
}
fn parse_host_port_binding(binding: &Value) -> Option<u16> {
binding
.get("HostPort")
.and_then(Value::as_str)?
.parse::<u16>()
.ok()
}

View File

@ -7,13 +7,21 @@ use testing_framework_core::{
adjust_timeout,
scenario::{Application, DynError, NodeControlHandle},
};
use tokio::process::Command;
use tokio::{process::Command, time::timeout};
use tracing::info;
use crate::{docker::commands::run_docker_command, errors::ComposeRunnerError};
use crate::{
docker::{
attached::discover_service_container_id,
commands::{ComposeCommandError, run_docker_command},
},
errors::ComposeRunnerError,
};
const COMPOSE_RESTART_TIMEOUT: Duration = Duration::from_secs(120);
const COMPOSE_RESTART_DESCRIPTION: &str = "docker compose restart";
const DOCKER_CONTAINER_RESTART_DESCRIPTION: &str = "docker container restart";
const DOCKER_CONTAINER_STOP_DESCRIPTION: &str = "docker container stop";
pub async fn restart_compose_service(
compose_file: &Path,
@ -38,6 +46,50 @@ pub async fn restart_compose_service(
.map_err(ComposeRunnerError::Compose)
}
pub async fn restart_attached_compose_service(
project_name: &str,
service: &str,
) -> Result<(), DynError> {
let container_id = discover_service_container_id(project_name, service).await?;
let command = docker_container_command("restart", &container_id);
info!(
service,
project = project_name,
container = container_id,
"restarting attached compose service"
);
run_docker_action(
command,
DOCKER_CONTAINER_RESTART_DESCRIPTION,
adjust_timeout(COMPOSE_RESTART_TIMEOUT),
)
.await
}
pub async fn stop_attached_compose_service(
project_name: &str,
service: &str,
) -> Result<(), DynError> {
let container_id = discover_service_container_id(project_name, service).await?;
let command = docker_container_command("stop", &container_id);
info!(
service,
project = project_name,
container = container_id,
"stopping attached compose service"
);
run_docker_action(
command,
DOCKER_CONTAINER_STOP_DESCRIPTION,
adjust_timeout(COMPOSE_RESTART_TIMEOUT),
)
.await
}
fn compose_restart_command(compose_file: &Path, project_name: &str, service: &str) -> Command {
let mut command = Command::new("docker");
command
@ -51,6 +103,43 @@ fn compose_restart_command(compose_file: &Path, project_name: &str, service: &st
command
}
fn docker_container_command(action: &str, container_id: &str) -> Command {
let mut command = Command::new("docker");
command.arg(action).arg(container_id);
command
}
async fn run_docker_action(
mut command: Command,
description: &str,
timeout_duration: Duration,
) -> Result<(), DynError> {
match timeout(timeout_duration, command.output()).await {
Ok(Ok(output)) => {
if output.status.success() {
return Ok(());
}
let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
Err(format!(
"{description} failed with status {}: {stderr}",
output.status
)
.into())
}
Ok(Err(source)) => Err(format!("{description} failed to spawn: {source}").into()),
Err(_) => {
let compose_timeout = ComposeCommandError::Timeout {
command: description.to_owned(),
timeout: timeout_duration,
};
Err(compose_timeout.into())
}
}
}
/// Compose-specific node control handle for restarting nodes.
pub struct ComposeNodeControl {
pub(crate) compose_file: PathBuf,
@ -65,3 +154,23 @@ impl<E: Application> NodeControlHandle<E> for ComposeNodeControl {
.map_err(|err| format!("node restart failed: {err}").into())
}
}
/// Node control handle for compose attached mode.
pub struct ComposeAttachedNodeControl {
pub(crate) project_name: String,
}
#[async_trait::async_trait]
impl<E: Application> NodeControlHandle<E> for ComposeAttachedNodeControl {
async fn restart_node(&self, name: &str) -> Result<(), DynError> {
restart_attached_compose_service(&self.project_name, name)
.await
.map_err(|source| format!("node restart failed for service '{name}': {source}").into())
}
async fn stop_node(&self, name: &str) -> Result<(), DynError> {
stop_attached_compose_service(&self.project_name, name)
.await
.map_err(|source| format!("node stop failed for service '{name}': {source}").into())
}
}

View File

@ -1,3 +1,4 @@
pub mod attached;
pub mod commands;
pub mod control;
pub mod platform;