From 6888c18275d5b43e64b4d809cbedf758dc5fed5b Mon Sep 17 00:00:00 2001 From: andrussal Date: Sun, 8 Mar 2026 13:40:27 +0100 Subject: [PATCH] Simplify node client inventory --- testing-framework/core/src/scenario/mod.rs | 5 +- .../src/scenario/runtime/inventory/mod.rs | 2 +- .../runtime/inventory/node_inventory.rs | 314 ++---------------- .../core/src/scenario/runtime/mod.rs | 3 +- .../core/src/scenario/runtime/node_clients.rs | 4 +- 5 files changed, 27 insertions(+), 301 deletions(-) diff --git a/testing-framework/core/src/scenario/mod.rs b/testing-framework/core/src/scenario/mod.rs index f2e43d6..cc3b300 100644 --- a/testing-framework/core/src/scenario/mod.rs +++ b/testing-framework/core/src/scenario/mod.rs @@ -37,9 +37,8 @@ pub use deployment_policy::{CleanupPolicy, DeploymentPolicy, RetryPolicy}; pub use expectation::Expectation; pub use observability::{ObservabilityCapabilityProvider, ObservabilityInputs}; pub use runtime::{ - ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode, BorrowedNode, - BorrowedOrigin, CleanupGuard, Deployer, Feed, FeedHandle, FeedRuntime, - HttpReadinessRequirement, ManagedNode, ManagedSource, NodeClients, NodeHandle, NodeInventory, + ApplicationExternalProvider, AttachProvider, AttachProviderError, AttachedNode, CleanupGuard, + Deployer, Feed, FeedHandle, FeedRuntime, HttpReadinessRequirement, ManagedSource, NodeClients, ReadinessError, RunContext, RunHandle, RunMetrics, Runner, ScenarioError, SourceOrchestrationPlan, SourceProviders, StabilizationConfig, StaticManagedProvider, build_source_orchestration_plan, diff --git a/testing-framework/core/src/scenario/runtime/inventory/mod.rs b/testing-framework/core/src/scenario/runtime/inventory/mod.rs index 6bc0334..575f53f 100644 --- a/testing-framework/core/src/scenario/runtime/inventory/mod.rs +++ b/testing-framework/core/src/scenario/runtime/inventory/mod.rs @@ -1,3 +1,3 @@ mod node_inventory; -pub use node_inventory::{BorrowedNode, BorrowedOrigin, ManagedNode, NodeHandle, NodeInventory}; +pub(crate) use node_inventory::NodeInventory; diff --git a/testing-framework/core/src/scenario/runtime/inventory/node_inventory.rs b/testing-framework/core/src/scenario/runtime/inventory/node_inventory.rs index c45385e..d9b45ce 100644 --- a/testing-framework/core/src/scenario/runtime/inventory/node_inventory.rs +++ b/testing-framework/core/src/scenario/runtime/inventory/node_inventory.rs @@ -1,91 +1,18 @@ -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use parking_lot::RwLock; -use crate::scenario::{Application, DynError, NodeControlHandle, StartNodeOptions, StartedNode}; +use crate::scenario::Application; -/// Origin for borrowed (non-managed) nodes in the runtime inventory. -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum BorrowedOrigin { - /// Node discovered from an attached cluster provider. - Attached, - /// Node provided explicitly as an external endpoint. - External, -} - -/// Managed node handle with full lifecycle capabilities. -pub struct ManagedNode { - /// Canonical node identity used for deduplication and lookups. - pub identity: String, - /// Application-specific API client for this node. - pub client: E::NodeClient, -} - -/// Borrowed node handle (attached or external), query-only by default. -pub struct BorrowedNode { - /// Canonical node identity used for deduplication and lookups. - pub identity: String, - /// Application-specific API client for this node. - pub client: E::NodeClient, - /// Borrowed source kind used for diagnostics and selection. - pub origin: BorrowedOrigin, -} - -/// Unified node handle variant used by runtime inventory snapshots. -pub enum NodeHandle { - /// Managed node variant. - Managed(ManagedNode), - /// Borrowed node variant. - Borrowed(BorrowedNode), -} - -impl Clone for ManagedNode { - fn clone(&self) -> Self { - Self { - identity: self.identity.clone(), - client: self.client.clone(), - } - } -} - -impl Clone for BorrowedNode { - fn clone(&self) -> Self { - Self { - identity: self.identity.clone(), - client: self.client.clone(), - origin: self.origin, - } - } -} - -impl Clone for NodeHandle { - fn clone(&self) -> Self { - match self { - Self::Managed(node) => Self::Managed(node.clone()), - Self::Borrowed(node) => Self::Borrowed(node.clone()), - } - } -} - -/// Thread-safe node inventory with identity-based upsert semantics. -pub struct NodeInventory { - inner: Arc>>, -} - -struct NodeInventoryInner { - nodes: Vec>, - indices_by_identity: HashMap, - next_synthetic_id: usize, +/// Thread-safe node client storage used by runtime handles. +pub(crate) struct NodeInventory { + clients: Arc>>, } impl Default for NodeInventory { fn default() -> Self { Self { - inner: Arc::new(RwLock::new(NodeInventoryInner { - nodes: Vec::new(), - indices_by_identity: HashMap::new(), - next_synthetic_id: 0, - })), + clients: Arc::new(RwLock::new(Vec::new())), } } } @@ -93,243 +20,44 @@ impl Default for NodeInventory { impl Clone for NodeInventory { fn clone(&self) -> Self { Self { - inner: Arc::clone(&self.inner), + clients: Arc::clone(&self.clients), } } } impl NodeInventory { #[must_use] - /// Builds an inventory from managed clients. - pub fn from_managed_clients(clients: Vec) -> Self { - let inventory = Self::default(); - - for client in clients { - inventory.add_managed_node(client, None); - } - - inventory - } - - #[must_use] - /// Returns a cloned snapshot of all node clients. - pub fn snapshot_clients(&self) -> Vec { - self.inner.read().nodes.iter().map(clone_client).collect() - } - - #[must_use] - /// Returns cloned managed node handles from the current inventory. - pub fn managed_nodes(&self) -> Vec> { - self.inner - .read() - .nodes - .iter() - .filter_map(|handle| match handle { - NodeHandle::Managed(node) => Some(node.clone()), - NodeHandle::Borrowed(_) => None, - }) - .collect() - } - - #[must_use] - /// Returns cloned borrowed node handles from the current inventory. - pub fn borrowed_nodes(&self) -> Vec> { - self.inner - .read() - .nodes - .iter() - .filter_map(|handle| match handle { - NodeHandle::Managed(_) => None, - NodeHandle::Borrowed(node) => Some(node.clone()), - }) - .collect() - } - - #[must_use] - /// Finds a managed node by canonical identity. - pub fn find_managed(&self, identity: &str) -> Option> { - let guard = self.inner.read(); - match node_by_identity(&guard, identity)? { - NodeHandle::Managed(node) => Some(node.clone()), - NodeHandle::Borrowed(_) => None, + pub(crate) fn from_clients(clients: Vec) -> Self { + Self { + clients: Arc::new(RwLock::new(clients)), } } #[must_use] - /// Finds a borrowed node by canonical identity. - pub fn find_borrowed(&self, identity: &str) -> Option> { - let guard = self.inner.read(); - match node_by_identity(&guard, identity)? { - NodeHandle::Managed(_) => None, - NodeHandle::Borrowed(node) => Some(node.clone()), - } + pub(crate) fn snapshot_clients(&self) -> Vec { + self.clients.read().clone() } #[must_use] - /// Finds any node handle by canonical identity. - pub fn find_node(&self, identity: &str) -> Option> { - let guard = self.inner.read(); - node_by_identity(&guard, identity).cloned() + pub(crate) fn len(&self) -> usize { + self.clients.read().len() } #[must_use] - /// Returns current number of nodes in inventory. - pub fn len(&self) -> usize { - self.inner.read().nodes.len() - } - - #[must_use] - /// Returns true when no nodes are registered. - pub fn is_empty(&self) -> bool { + pub(crate) fn is_empty(&self) -> bool { self.len() == 0 } - /// Clears all nodes and identity indexes. - pub fn clear(&self) { - let mut guard = self.inner.write(); - guard.nodes.clear(); - guard.indices_by_identity.clear(); - guard.next_synthetic_id = 0; + pub(crate) fn clear(&self) { + self.clients.write().clear(); } - /// Adds or replaces a managed node entry using canonical identity - /// resolution. Re-adding the same node identity updates the stored handle. - pub fn add_managed_node(&self, client: E::NodeClient, identity_hint: Option) { - let mut guard = self.inner.write(); - let identity = canonical_identity::(&client, identity_hint, &mut guard); - let handle = NodeHandle::Managed(ManagedNode { - identity: identity.clone(), - client, - }); - upsert_node(&mut guard, identity, handle); + pub(crate) fn add_client(&self, client: E::NodeClient) { + self.clients.write().push(client); } - /// Adds or replaces an attached node entry. - pub fn add_attached_node(&self, client: E::NodeClient, identity_hint: Option) { - self.add_borrowed_node(client, BorrowedOrigin::Attached, identity_hint); - } - - /// Adds or replaces an external static node entry. - pub fn add_external_node(&self, client: E::NodeClient, identity_hint: Option) { - self.add_borrowed_node(client, BorrowedOrigin::External, identity_hint); - } - - /// Executes a synchronous read over a cloned client slice. - pub fn with_clients(&self, f: impl FnOnce(&[E::NodeClient]) -> R) -> R { - let guard = self.inner.read(); - let clients = guard.nodes.iter().map(clone_client).collect::>(); + pub(crate) fn with_clients(&self, f: impl FnOnce(&[E::NodeClient]) -> R) -> R { + let clients = self.clients.read(); f(&clients) } - - fn add_borrowed_node( - &self, - client: E::NodeClient, - origin: BorrowedOrigin, - identity_hint: Option, - ) { - let mut guard = self.inner.write(); - let identity = canonical_identity::(&client, identity_hint, &mut guard); - let handle = NodeHandle::Borrowed(BorrowedNode { - identity: identity.clone(), - client, - origin, - }); - upsert_node(&mut guard, identity, handle); - } -} - -impl ManagedNode { - #[must_use] - /// Returns the node client. - pub const fn client(&self) -> &E::NodeClient { - &self.client - } - - /// Delegates restart to the deployer's control surface for this node name. - pub async fn restart( - &self, - control: &dyn NodeControlHandle, - node_name: &str, - ) -> Result<(), DynError> { - control.restart_node(node_name).await - } - - /// Delegates stop to the deployer's control surface for this node name. - pub async fn stop( - &self, - control: &dyn NodeControlHandle, - node_name: &str, - ) -> Result<(), DynError> { - control.stop_node(node_name).await - } - - /// Delegates dynamic node start with options to the control surface. - pub async fn start_with( - &self, - control: &dyn NodeControlHandle, - node_name: &str, - options: StartNodeOptions, - ) -> Result, DynError> { - control.start_node_with(node_name, options).await - } - - #[must_use] - /// Returns process id if the backend can expose it for this node name. - pub fn pid(&self, control: &dyn NodeControlHandle, node_name: &str) -> Option { - control.node_pid(node_name) - } -} - -impl BorrowedNode { - #[must_use] - /// Returns the node client. - pub const fn client(&self) -> &E::NodeClient { - &self.client - } -} - -fn upsert_node( - inner: &mut NodeInventoryInner, - identity: String, - handle: NodeHandle, -) { - if let Some(existing_index) = inner.indices_by_identity.get(&identity).copied() { - inner.nodes[existing_index] = handle; - return; - } - - let index = inner.nodes.len(); - inner.nodes.push(handle); - inner.indices_by_identity.insert(identity, index); -} - -fn canonical_identity( - _client: &E::NodeClient, - identity_hint: Option, - inner: &mut NodeInventoryInner, -) -> String { - // Priority: explicit hint -> synthetic. - if let Some(identity) = identity_hint.filter(|value| !value.trim().is_empty()) { - return identity; - } - - let synthetic = format!("node:{}", inner.next_synthetic_id); - inner.next_synthetic_id += 1; - - synthetic -} - -fn clone_client(handle: &NodeHandle) -> E::NodeClient { - match handle { - NodeHandle::Managed(node) => node.client.clone(), - NodeHandle::Borrowed(node) => node.client.clone(), - } -} - -fn node_by_identity<'a, E: Application>( - inner: &'a NodeInventoryInner, - identity: &str, -) -> Option<&'a NodeHandle> { - let index = *inner.indices_by_identity.get(identity)?; - inner.nodes.get(index) } diff --git a/testing-framework/core/src/scenario/runtime/mod.rs b/testing-framework/core/src/scenario/runtime/mod.rs index 97bd2d6..5682ccc 100644 --- a/testing-framework/core/src/scenario/runtime/mod.rs +++ b/testing-framework/core/src/scenario/runtime/mod.rs @@ -1,6 +1,6 @@ pub mod context; mod deployer; -pub mod inventory; +mod inventory; pub mod metrics; mod node_clients; pub mod orchestration; @@ -11,7 +11,6 @@ mod runner; use async_trait::async_trait; pub use context::{CleanupGuard, RunContext, RunHandle, RunMetrics}; pub use deployer::{Deployer, ScenarioError}; -pub use inventory::{BorrowedNode, BorrowedOrigin, ManagedNode, NodeHandle, NodeInventory}; pub use node_clients::NodeClients; #[doc(hidden)] pub use orchestration::{ diff --git a/testing-framework/core/src/scenario/runtime/node_clients.rs b/testing-framework/core/src/scenario/runtime/node_clients.rs index ce31dcf..b35a0ed 100644 --- a/testing-framework/core/src/scenario/runtime/node_clients.rs +++ b/testing-framework/core/src/scenario/runtime/node_clients.rs @@ -29,7 +29,7 @@ impl NodeClients { /// Build clients from preconstructed vectors. pub fn new(nodes: Vec) -> Self { Self { - inventory: NodeInventory::from_managed_clients(nodes), + inventory: NodeInventory::from_clients(nodes), } } @@ -72,7 +72,7 @@ impl NodeClients { } pub fn add_node(&self, client: E::NodeClient) { - self.inventory.add_managed_node(client, None); + self.inventory.add_client(client); } pub fn clear(&self) {