diff --git a/Cargo.lock b/Cargo.lock index 471e158..c60198c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -95,6 +95,15 @@ version = "1.0.101" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea" +[[package]] +name = "arc-swap" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a3a1fd6f75306b68087b831f025c712524bcb19aad54e557b1129cfa0a2b207" +dependencies = [ + "rustversion", +] + [[package]] name = "async-broadcast" version = "0.7.2" @@ -177,11 +186,13 @@ dependencies = [ "serde", "serde_json", "serde_path_to_error", + "serde_urlencoded", "sync_wrapper", "tokio", "tower", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -202,6 +213,7 @@ dependencies = [ "sync_wrapper", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -405,6 +417,20 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -1026,7 +1052,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2", + "socket2 0.6.2", "tokio", "tower-service", "tracing", @@ -1436,6 +1462,15 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "matchit" version = "0.7.3" @@ -1448,6 +1483,71 @@ version = "2.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" +[[package]] +name = "metrics-counter-examples" +version = "0.1.0" +dependencies = [ + "anyhow", + "metrics-counter-node", + "metrics-counter-runtime-ext", + "metrics-counter-runtime-workloads", + "reqwest", + "serde", + "serde_json", + "testing-framework-core", + "testing-framework-runner-compose", + "testing-framework-runner-k8s", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "metrics-counter-node" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum", + "clap", + "prometheus", + "reqwest", + "serde", + "serde_yaml", + "tokio", + "tower-http", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "metrics-counter-runtime-ext" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "metrics-counter-node", + "reqwest", + "serde", + "serde_yaml", + "testing-framework-core", + "testing-framework-runner-compose", + "testing-framework-runner-k8s", + "testing-framework-runner-local", +] + +[[package]] +name = "metrics-counter-runtime-workloads" +version = "0.1.0" +dependencies = [ + "async-trait", + "metrics-counter-runtime-ext", + "serde", + "serde_json", + "testing-framework-core", + "tokio", + "tracing", +] + [[package]] name = "mime" version = "0.3.17" @@ -1482,12 +1582,40 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys 0.61.2", +] + +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + [[package]] name = "num-conv" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -1789,6 +1917,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ca5326d8d0b950a9acd87e6a3f94745394f62e4dae1b1ee22b2bc0c394af43a" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror 2.0.18", +] + [[package]] name = "prometheus-http-query" version = "0.8.3" @@ -1803,6 +1946,26 @@ dependencies = [ "url", ] +[[package]] +name = "protobuf" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4" +dependencies = [ + "once_cell", + "protobuf-support", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf-support" +version = "3.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6" +dependencies = [ + "thiserror 1.0.69", +] + [[package]] name = "quote" version = "1.0.44" @@ -1848,6 +2011,66 @@ dependencies = [ "getrandom 0.2.17", ] +[[package]] +name = "redis" +version = "0.29.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc42f3a12fd4408ce64d8efef67048a924e543bd35c6591c0447fda9054695f" +dependencies = [ + "arc-swap", + "backon", + "bytes", + "combine", + "futures-channel", + "futures-util", + "itoa", + "num-bigint", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2 0.5.10", + "tokio", + "tokio-util", + "url", +] + +[[package]] +name = "redis-streams-examples" +version = "0.1.0" +dependencies = [ + "anyhow", + "redis-streams-runtime-ext", + "redis-streams-runtime-workloads", + "testing-framework-core", + "testing-framework-runner-compose", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "redis-streams-runtime-ext" +version = "0.1.0" +dependencies = [ + "async-trait", + "redis", + "reqwest", + "testing-framework-core", + "testing-framework-runner-compose", +] + +[[package]] +name = "redis-streams-runtime-workloads" +version = "0.1.0" +dependencies = [ + "async-trait", + "redis-streams-runtime-ext", + "testing-framework-core", + "tokio", + "tracing", +] + [[package]] name = "redox_syscall" version = "0.5.18" @@ -2194,6 +2417,12 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.9" @@ -2205,6 +2434,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -2249,6 +2487,16 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "socket2" version = "0.6.2" @@ -2465,6 +2713,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "time" version = "0.3.47" @@ -2514,9 +2771,10 @@ dependencies = [ "bytes", "libc", "mio", + "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.6.2", "tokio-macros", "windows-sys 0.61.2", ] @@ -2657,6 +2915,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -2737,6 +3025,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index d24851f..e77cbfd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,13 @@ members = [ "cfgsync/artifacts", "cfgsync/core", "cfgsync/runtime", + "examples/metrics_counter/examples", + "examples/metrics_counter/metrics-counter-node", + "examples/metrics_counter/testing/integration", + "examples/metrics_counter/testing/workloads", + "examples/redis_streams/examples", + "examples/redis_streams/testing/integration", + "examples/redis_streams/testing/workloads", "testing-framework/core", "testing-framework/deployers/compose", "testing-framework/deployers/k8s", diff --git a/examples/metrics_counter/Dockerfile b/examples/metrics_counter/Dockerfile new file mode 100644 index 0000000..3a54fa3 --- /dev/null +++ b/examples/metrics_counter/Dockerfile @@ -0,0 +1,24 @@ +FROM rustlang/rust:nightly-bookworm AS builder + +WORKDIR /build + +COPY Cargo.toml Cargo.lock ./ +COPY cfgsync/ ./cfgsync/ +COPY examples/ ./examples/ +COPY testing-framework/ ./testing-framework/ + +RUN cargo build --release -p metrics-counter-node + +FROM debian:bookworm-slim + +RUN apt-get update && \ + apt-get install -y ca-certificates && \ + rm -rf /var/lib/apt/lists/* + +COPY --from=builder /build/target/release/metrics-counter-node /usr/local/bin/metrics-counter-node + +RUN mkdir -p /etc/metrics-counter +WORKDIR /app + +ENTRYPOINT ["/usr/local/bin/metrics-counter-node"] +CMD ["--config", "/etc/metrics-counter/config.yaml"] diff --git a/examples/metrics_counter/README.md b/examples/metrics_counter/README.md new file mode 100644 index 0000000..ae37911 --- /dev/null +++ b/examples/metrics_counter/README.md @@ -0,0 +1,55 @@ +# Metrics Counter Example + +This example runs a tiny counter service together with a Prometheus scraper. + +The scenarios increment counters through the node API and then query Prometheus +to check the aggregate result. + +## How TF runs this + +Each example follows the same pattern: + +- TF starts the counter nodes and a Prometheus scraper +- a workload drives counter increments through the node API +- an expectation, or the manual scenario, checks the Prometheus query result + +## API + +Each node exposes: + +- `POST /counter/inc` to increment the local counter +- `GET /counter/value` to read the current counter value +- `GET /metrics` for Prometheus scraping + +## Scenarios + +- `compose_prometheus_expectation` runs the app and Prometheus in Docker Compose, then checks the Prometheus query result +- `k8s_prometheus_expectation` runs the same check on Kubernetes +- `k8s_manual_prometheus` starts the nodes through the k8s manual cluster API, restarts one node, and checks the Prometheus aggregate again + +## Run with Docker Compose + +```bash +LOGOS_BLOCKCHAIN_METRICS_QUERY_URL=http://127.0.0.1:19091 \ +cargo run -p metrics-counter-examples --bin compose_prometheus_expectation +``` + +## Run with Kubernetes + +```bash +docker build -t metrics-counter-node:local -f examples/metrics_counter/Dockerfile . +LOGOS_BLOCKCHAIN_METRICS_QUERY_URL=http://127.0.0.1:30991 \ +cargo run -p metrics-counter-examples --bin k8s_prometheus_expectation +``` + +Overrides: +- `METRICS_COUNTER_K8S_IMAGE` (falls back to `METRICS_COUNTER_IMAGE`, then `metrics-counter-node:local`) +- `METRICS_COUNTER_K8S_PROMETHEUS_NODE_PORT` (defaults to `30991`) + +## Run with Kubernetes manual cluster + +```bash +docker build -t metrics-counter-node:local -f examples/metrics_counter/Dockerfile . +LOGOS_BLOCKCHAIN_METRICS_QUERY_URL=http://127.0.0.1:30991 \ +cargo run -p metrics-counter-examples --bin k8s_manual_prometheus +``` diff --git a/examples/metrics_counter/examples/Cargo.toml b/examples/metrics_counter/examples/Cargo.toml new file mode 100644 index 0000000..3febefd --- /dev/null +++ b/examples/metrics_counter/examples/Cargo.toml @@ -0,0 +1,21 @@ +[package] +edition.workspace = true +license.workspace = true +name = "metrics-counter-examples" +version.workspace = true + +[dependencies] +anyhow = "1.0" +metrics-counter-node = { path = "../metrics-counter-node" } +metrics-counter-runtime-ext = { path = "../testing/integration" } +metrics-counter-runtime-workloads = { path = "../testing/workloads" } +reqwest = { workspace = true, features = ["json"] } +serde = { workspace = true } +serde_json = { workspace = true } +testing-framework-core = { workspace = true } +testing-framework-runner-compose = { workspace = true } +testing-framework-runner-k8s = { workspace = true } + +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/examples/metrics_counter/examples/src/bin/compose_prometheus_expectation.rs b/examples/metrics_counter/examples/src/bin/compose_prometheus_expectation.rs new file mode 100644 index 0000000..d736c1a --- /dev/null +++ b/examples/metrics_counter/examples/src/bin/compose_prometheus_expectation.rs @@ -0,0 +1,61 @@ +use std::{env, time::Duration}; + +use anyhow::{Context as _, Result}; +use metrics_counter_runtime_workloads::{ + CounterIncrementWorkload, MetricsCounterBuilderExt, MetricsCounterScenarioBuilder, + MetricsCounterTopology, PrometheusCounterAtLeast, +}; +use testing_framework_core::scenario::{Deployer, ObservabilityBuilderExt}; +use testing_framework_runner_compose::ComposeRunnerError; +use tracing::{info, warn}; + +const DEFAULT_PROM_URL: &str = "http://127.0.0.1:19091"; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let metrics_url = env::var("LOGOS_BLOCKCHAIN_METRICS_QUERY_URL") + .ok() + .filter(|value| !value.trim().is_empty()) + .unwrap_or_else(|| DEFAULT_PROM_URL.to_owned()); + + let mut scenario = + MetricsCounterScenarioBuilder::deployment_with(|_| MetricsCounterTopology::new(3)) + .enable_observability() + .with_metrics_query_url_str(&metrics_url) + .with_run_duration(Duration::from_secs(20)) + .with_workload( + CounterIncrementWorkload::new() + .operations(300) + .rate_per_sec(30), + ) + .with_expectation(PrometheusCounterAtLeast::new(300.0)) + .build()?; + + let deployer = metrics_counter_runtime_ext::MetricsCounterComposeDeployer::new(); + let runner = match deployer.deploy(&scenario).await { + Ok(runner) => runner, + Err(ComposeRunnerError::DockerUnavailable) => { + warn!("docker unavailable; skipping compose metrics-counter run"); + return Ok(()); + } + Err(error) => { + return Err(anyhow::Error::new(error)) + .context("deploying metrics-counter compose stack"); + } + }; + + info!( + metrics_url, + "running metrics-counter compose prometheus scenario" + ); + runner + .run(&mut scenario) + .await + .context("running metrics-counter compose scenario")?; + + Ok(()) +} diff --git a/examples/metrics_counter/examples/src/bin/k8s_manual_prometheus.rs b/examples/metrics_counter/examples/src/bin/k8s_manual_prometheus.rs new file mode 100644 index 0000000..0d98f54 --- /dev/null +++ b/examples/metrics_counter/examples/src/bin/k8s_manual_prometheus.rs @@ -0,0 +1,181 @@ +use std::{env, time::Duration}; + +use anyhow::{Context as _, Result, anyhow}; +use metrics_counter_node::MetricsCounterHttpClient; +use metrics_counter_runtime_ext::{MetricsCounterK8sDeployer, MetricsCounterTopology}; +use reqwest::Url; +use serde::{Deserialize, Serialize}; +use testing_framework_runner_k8s::ManualClusterError; +use tracing::{info, warn}; + +const DEFAULT_PROM_URL: &str = "http://127.0.0.1:30991"; + +#[derive(Serialize)] +struct IncrementRequest {} + +#[derive(Deserialize)] +struct CounterView { + value: u64, +} + +#[derive(Deserialize)] +struct PrometheusQueryResponse { + data: PrometheusData, +} + +#[derive(Deserialize)] +struct PrometheusData { + result: Vec, +} + +#[derive(Deserialize)] +struct PrometheusSample { + value: (serde_json::Value, String), +} + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let metrics_url = env::var("LOGOS_BLOCKCHAIN_METRICS_QUERY_URL") + .ok() + .filter(|value| !value.trim().is_empty()) + .unwrap_or_else(|| DEFAULT_PROM_URL.to_owned()); + + let deployer = MetricsCounterK8sDeployer::new(); + let cluster = match deployer + .manual_cluster_from_descriptors(MetricsCounterTopology::new(3)) + .await + { + Ok(cluster) => cluster, + Err(ManualClusterError::ClientInit { source }) => { + warn!("k8s unavailable ({source}); skipping metrics-counter k8s manual run"); + return Ok(()); + } + Err(ManualClusterError::InstallStack { source }) + if k8s_cluster_unavailable(&source.to_string()) => + { + warn!("k8s unavailable ({source}); skipping metrics-counter k8s manual run"); + return Ok(()); + } + Err(error) => { + return Err(anyhow::Error::new(error)) + .context("creating metrics-counter k8s manual cluster"); + } + }; + + let node0 = cluster.start_node("node-0").await?.client; + let node1 = cluster.start_node("node-1").await?.client; + let node2 = cluster.start_node("node-2").await?.client; + + cluster.wait_network_ready().await?; + + increment_many(&node0, 40).await?; + increment_many(&node1, 30).await?; + increment_many(&node2, 20).await?; + + wait_for_counter_value(&node0, 40).await?; + wait_for_counter_value(&node1, 30).await?; + wait_for_counter_value(&node2, 20).await?; + wait_for_prometheus_sum(&metrics_url, 90.0).await?; + + info!("restarting node-1 in manual cluster"); + cluster.restart_node("node-1").await?; + cluster.wait_network_ready().await?; + + let restarted_node1 = cluster + .node_client("node-1") + .ok_or_else(|| anyhow!("node-1 client missing after restart"))?; + + wait_for_counter_value(&restarted_node1, 0).await?; + + increment_many(&node0, 10).await?; + increment_many(&restarted_node1, 5).await?; + + wait_for_counter_value(&node0, 50).await?; + wait_for_counter_value(&restarted_node1, 5).await?; + wait_for_counter_value(&node2, 20).await?; + wait_for_prometheus_sum(&metrics_url, 75.0).await?; + + cluster.stop_all(); + Ok(()) +} + +async fn increment_many(client: &MetricsCounterHttpClient, operations: usize) -> Result<()> { + for _ in 0..operations { + let _: CounterView = client + .post("/counter/inc", &IncrementRequest {}) + .await + .map_err(|error| anyhow!(error.to_string()))?; + } + + Ok(()) +} + +async fn wait_for_counter_value(client: &MetricsCounterHttpClient, expected: u64) -> Result<()> { + let deadline = tokio::time::Instant::now() + Duration::from_secs(20); + + while tokio::time::Instant::now() < deadline { + let view: CounterView = client + .get("/counter/value") + .await + .map_err(|error| anyhow!(error.to_string()))?; + if view.value == expected { + return Ok(()); + } + tokio::time::sleep(Duration::from_millis(300)).await; + } + + Err(anyhow!("counter did not reach expected value {expected}")) +} + +async fn wait_for_prometheus_sum(metrics_url: &str, expected: f64) -> Result<()> { + let base_url = Url::parse(metrics_url)?; + let deadline = tokio::time::Instant::now() + Duration::from_secs(30); + + while tokio::time::Instant::now() < deadline { + let total = query_prometheus_sum(&base_url).await?; + if (total - expected).abs() < f64::EPSILON { + info!(total, expected, "prometheus sum reached expected value"); + return Ok(()); + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + + Err(anyhow!( + "prometheus sum did not reach expected value {expected}" + )) +} + +async fn query_prometheus_sum(base_url: &Url) -> Result { + let client = reqwest::Client::new(); + let mut url = base_url.join("/api/v1/query")?; + url.query_pairs_mut() + .append_pair("query", "sum(metrics_counter_value)"); + + let response: PrometheusQueryResponse = client + .get(url) + .send() + .await? + .error_for_status()? + .json() + .await?; + + let Some(sample) = response.data.result.first() else { + return Ok(0.0); + }; + + sample + .value + .1 + .parse() + .map_err(|error| anyhow!("invalid prometheus value: {error}")) +} + +fn k8s_cluster_unavailable(message: &str) -> bool { + message.contains("Unable to connect to the server") + || message.contains("TLS handshake timeout") + || message.contains("connection refused") +} diff --git a/examples/metrics_counter/examples/src/bin/k8s_prometheus_expectation.rs b/examples/metrics_counter/examples/src/bin/k8s_prometheus_expectation.rs new file mode 100644 index 0000000..2257d2a --- /dev/null +++ b/examples/metrics_counter/examples/src/bin/k8s_prometheus_expectation.rs @@ -0,0 +1,73 @@ +use std::{env, time::Duration}; + +use anyhow::{Context as _, Result}; +use metrics_counter_runtime_ext::MetricsCounterK8sDeployer; +use metrics_counter_runtime_workloads::{ + CounterIncrementWorkload, MetricsCounterBuilderExt, MetricsCounterScenarioBuilder, + MetricsCounterTopology, PrometheusCounterAtLeast, +}; +use testing_framework_core::scenario::{Deployer, ObservabilityBuilderExt}; +use testing_framework_runner_k8s::K8sRunnerError; +use tracing::{info, warn}; + +const DEFAULT_PROM_URL: &str = "http://127.0.0.1:30991"; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let metrics_url = env::var("LOGOS_BLOCKCHAIN_METRICS_QUERY_URL") + .ok() + .filter(|value| !value.trim().is_empty()) + .unwrap_or_else(|| DEFAULT_PROM_URL.to_owned()); + + let mut scenario = + MetricsCounterScenarioBuilder::deployment_with(|_| MetricsCounterTopology::new(3)) + .enable_observability() + .with_metrics_query_url_str(&metrics_url) + .with_run_duration(Duration::from_secs(25)) + .with_workload( + CounterIncrementWorkload::new() + .operations(240) + .rate_per_sec(20), + ) + .with_expectation(PrometheusCounterAtLeast::new(240.0)) + .build()?; + + let deployer = MetricsCounterK8sDeployer::new(); + let runner = match deployer.deploy(&scenario).await { + Ok(runner) => runner, + Err(K8sRunnerError::ClientInit { source }) => { + warn!("k8s unavailable ({source}); skipping metrics-counter k8s run"); + return Ok(()); + } + Err(K8sRunnerError::InstallStack { source }) + if k8s_cluster_unavailable(&source.to_string()) => + { + warn!("k8s unavailable ({source}); skipping metrics-counter k8s run"); + return Ok(()); + } + Err(error) => { + return Err(anyhow::Error::new(error)).context("deploying metrics-counter k8s stack"); + } + }; + + info!( + metrics_url, + "running metrics-counter k8s prometheus scenario" + ); + runner + .run(&mut scenario) + .await + .context("running metrics-counter k8s scenario")?; + + Ok(()) +} + +fn k8s_cluster_unavailable(message: &str) -> bool { + message.contains("Unable to connect to the server") + || message.contains("TLS handshake timeout") + || message.contains("connection refused") +} diff --git a/examples/metrics_counter/metrics-counter-node/Cargo.toml b/examples/metrics_counter/metrics-counter-node/Cargo.toml new file mode 100644 index 0000000..4d94f78 --- /dev/null +++ b/examples/metrics_counter/metrics-counter-node/Cargo.toml @@ -0,0 +1,22 @@ +[package] +edition.workspace = true +license.workspace = true +name = "metrics-counter-node" +version.workspace = true + +[[bin]] +name = "metrics-counter-node" +path = "src/main.rs" + +[dependencies] +anyhow = "1.0" +axum = "0.7" +clap = { version = "4.0", features = ["derive"] } +prometheus = "0.14" +reqwest = { workspace = true, features = ["json"] } +serde = { workspace = true } +serde_yaml = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tower-http = { version = "0.6", features = ["trace"] } +tracing = { workspace = true } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/examples/metrics_counter/metrics-counter-node/src/client.rs b/examples/metrics_counter/metrics-counter-node/src/client.rs new file mode 100644 index 0000000..933eed6 --- /dev/null +++ b/examples/metrics_counter/metrics-counter-node/src/client.rs @@ -0,0 +1,40 @@ +use reqwest::Url; +use serde::Serialize; + +#[derive(Clone)] +pub struct MetricsCounterHttpClient { + base_url: Url, + client: reqwest::Client, +} + +impl MetricsCounterHttpClient { + #[must_use] + pub fn new(base_url: Url) -> Self { + Self { + base_url, + client: reqwest::Client::new(), + } + } + + pub async fn get(&self, path: &str) -> anyhow::Result { + let url = self.base_url.join(path)?; + let response = self.client.get(url).send().await?.error_for_status()?; + Ok(response.json().await?) + } + + pub async fn post( + &self, + path: &str, + body: &B, + ) -> anyhow::Result { + let url = self.base_url.join(path)?; + let response = self + .client + .post(url) + .json(body) + .send() + .await? + .error_for_status()?; + Ok(response.json().await?) + } +} diff --git a/examples/metrics_counter/metrics-counter-node/src/config.rs b/examples/metrics_counter/metrics-counter-node/src/config.rs new file mode 100644 index 0000000..276d779 --- /dev/null +++ b/examples/metrics_counter/metrics-counter-node/src/config.rs @@ -0,0 +1,14 @@ +use clap::Parser; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CounterConfig { + pub node_id: u64, + pub http_port: u16, +} + +#[derive(Debug, Parser)] +pub struct Cli { + #[arg(long)] + pub config: std::path::PathBuf, +} diff --git a/examples/metrics_counter/metrics-counter-node/src/lib.rs b/examples/metrics_counter/metrics-counter-node/src/lib.rs new file mode 100644 index 0000000..25b9b99 --- /dev/null +++ b/examples/metrics_counter/metrics-counter-node/src/lib.rs @@ -0,0 +1,3 @@ +pub mod client; + +pub use client::MetricsCounterHttpClient; diff --git a/examples/metrics_counter/metrics-counter-node/src/main.rs b/examples/metrics_counter/metrics-counter-node/src/main.rs new file mode 100644 index 0000000..6256d48 --- /dev/null +++ b/examples/metrics_counter/metrics-counter-node/src/main.rs @@ -0,0 +1,19 @@ +mod config; +mod server; +mod state; + +use clap::Parser; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let cli = config::Cli::parse(); + let raw = std::fs::read_to_string(cli.config)?; + let config: config::CounterConfig = serde_yaml::from_str(&raw)?; + let state = state::CounterState::new(); + + server::start_server(config, state).await +} diff --git a/examples/metrics_counter/metrics-counter-node/src/server.rs b/examples/metrics_counter/metrics-counter-node/src/server.rs new file mode 100644 index 0000000..361122c --- /dev/null +++ b/examples/metrics_counter/metrics-counter-node/src/server.rs @@ -0,0 +1,83 @@ +use std::net::SocketAddr; + +use axum::{ + Router, + extract::State, + http::StatusCode, + response::{IntoResponse, Json, Response}, + routing::{get, post}, +}; +use prometheus::{Encoder, TextEncoder}; +use serde::Serialize; +use tower_http::trace::TraceLayer; + +use crate::{ + config::CounterConfig, + state::{CounterState, CounterView}, +}; + +#[derive(Serialize)] +struct HealthResponse { + status: &'static str, +} + +pub async fn start_server(config: CounterConfig, state: CounterState) -> anyhow::Result<()> { + let app = Router::new() + .route("/health/live", get(health_live)) + .route("/health/ready", get(health_ready)) + .route("/counter/inc", post(increment)) + .route("/counter/value", get(counter_value)) + .route("/metrics", get(metrics)) + .layer(TraceLayer::new_for_http()) + .with_state(state.clone()); + + let addr = SocketAddr::from(([0, 0, 0, 0], config.http_port)); + let listener = tokio::net::TcpListener::bind(addr).await?; + + state.set_ready(true).await; + tracing::info!(node_id = config.node_id, %addr, "metrics-counter node ready"); + + axum::serve(listener, app).await?; + Ok(()) +} + +async fn health_live() -> (StatusCode, Json) { + (StatusCode::OK, Json(HealthResponse { status: "alive" })) +} + +async fn health_ready(State(state): State) -> (StatusCode, Json) { + if state.is_ready().await { + (StatusCode::OK, Json(HealthResponse { status: "ready" })) + } else { + ( + StatusCode::SERVICE_UNAVAILABLE, + Json(HealthResponse { + status: "not-ready", + }), + ) + } +} + +async fn increment(State(state): State) -> Json { + Json(state.increment()) +} + +async fn counter_value(State(state): State) -> Json { + Json(state.view()) +} + +async fn metrics() -> Response { + let metric_families = prometheus::gather(); + let mut buffer = Vec::new(); + let encoder = TextEncoder::new(); + if encoder.encode(&metric_families, &mut buffer).is_err() { + return StatusCode::INTERNAL_SERVER_ERROR.into_response(); + } + + ( + StatusCode::OK, + [("content-type", encoder.format_type().to_string())], + buffer, + ) + .into_response() +} diff --git a/examples/metrics_counter/metrics-counter-node/src/state.rs b/examples/metrics_counter/metrics-counter-node/src/state.rs new file mode 100644 index 0000000..7a29772 --- /dev/null +++ b/examples/metrics_counter/metrics-counter-node/src/state.rs @@ -0,0 +1,64 @@ +use std::sync::{ + Arc, + atomic::{AtomicU64, Ordering}, +}; + +use prometheus::{IntCounter, IntGauge}; +use serde::Serialize; +use tokio::sync::RwLock; + +#[derive(Clone, Debug, Serialize)] +pub struct CounterView { + pub value: u64, +} + +#[derive(Clone)] +pub struct CounterState { + ready: Arc>, + value: Arc, + increments_total: IntCounter, + counter_value: IntGauge, +} + +impl CounterState { + pub fn new() -> Self { + let increments_total = IntCounter::new( + "metrics_counter_increments_total", + "Total increment operations", + ) + .expect("metrics_counter_increments_total"); + let counter_value = + IntGauge::new("metrics_counter_value", "Current counter value").expect("counter gauge"); + + let _ = prometheus::register(Box::new(increments_total.clone())); + let _ = prometheus::register(Box::new(counter_value.clone())); + + Self { + ready: Arc::new(RwLock::new(false)), + value: Arc::new(AtomicU64::new(0)), + increments_total, + counter_value, + } + } + + pub async fn set_ready(&self, value: bool) { + *self.ready.write().await = value; + } + + pub async fn is_ready(&self) -> bool { + *self.ready.read().await + } + + pub fn increment(&self) -> CounterView { + let value = self.value.fetch_add(1, Ordering::SeqCst) + 1; + self.increments_total.inc(); + self.counter_value.set(value as i64); + CounterView { value } + } + + pub fn view(&self) -> CounterView { + CounterView { + value: self.value.load(Ordering::SeqCst), + } + } +} diff --git a/examples/metrics_counter/testing/integration/Cargo.toml b/examples/metrics_counter/testing/integration/Cargo.toml new file mode 100644 index 0000000..52ce28b --- /dev/null +++ b/examples/metrics_counter/testing/integration/Cargo.toml @@ -0,0 +1,18 @@ +[package] +edition.workspace = true +license.workspace = true +name = "metrics-counter-runtime-ext" +version.workspace = true + +[dependencies] +testing-framework-core = { workspace = true } +testing-framework-runner-compose = { workspace = true } +testing-framework-runner-k8s = { workspace = true } +testing-framework-runner-local = { workspace = true } + +anyhow = "1.0" +async-trait = { workspace = true } +metrics-counter-node = { path = "../../metrics-counter-node" } +reqwest = { workspace = true, features = ["json"] } +serde = { workspace = true } +serde_yaml = { workspace = true } diff --git a/examples/metrics_counter/testing/integration/src/app.rs b/examples/metrics_counter/testing/integration/src/app.rs new file mode 100644 index 0000000..4cd784b --- /dev/null +++ b/examples/metrics_counter/testing/integration/src/app.rs @@ -0,0 +1,49 @@ +use async_trait::async_trait; +use metrics_counter_node::MetricsCounterHttpClient; +use serde::{Deserialize, Serialize}; +use testing_framework_core::{ + cfgsync::{StaticNodeConfigProvider, serialize_yaml_config}, + scenario::{Application, DynError, NodeAccess}, +}; + +pub type MetricsCounterTopology = testing_framework_core::topology::ClusterTopology; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct MetricsCounterNodeConfig { + pub node_id: u64, + pub http_port: u16, +} + +pub struct MetricsCounterEnv; + +#[async_trait] +impl Application for MetricsCounterEnv { + type Deployment = MetricsCounterTopology; + type NodeClient = MetricsCounterHttpClient; + type NodeConfig = MetricsCounterNodeConfig; + fn build_node_client(access: &NodeAccess) -> Result { + Ok(MetricsCounterHttpClient::new(access.api_base_url()?)) + } + + fn node_readiness_path() -> &'static str { + "/health/ready" + } +} + +impl StaticNodeConfigProvider for MetricsCounterEnv { + type Error = serde_yaml::Error; + + fn build_node_config( + _deployment: &Self::Deployment, + node_index: usize, + ) -> Result { + Ok(MetricsCounterNodeConfig { + node_id: node_index as u64, + http_port: 8080, + }) + } + + fn serialize_node_config(config: &Self::NodeConfig) -> Result { + serialize_yaml_config(config) + } +} diff --git a/examples/metrics_counter/testing/integration/src/compose_env.rs b/examples/metrics_counter/testing/integration/src/compose_env.rs new file mode 100644 index 0000000..3edc70e --- /dev/null +++ b/examples/metrics_counter/testing/integration/src/compose_env.rs @@ -0,0 +1,94 @@ +use std::{env, fs, path::Path}; + +use reqwest::Url; +use testing_framework_core::scenario::DynError; +use testing_framework_runner_compose::{ + BinaryConfigNodeSpec, ComposeBinaryApp, EnvEntry, NodeDescriptor, +}; + +use crate::MetricsCounterEnv; + +const NODE_CONFIG_PATH: &str = "/etc/metrics-counter/config.yaml"; +const PROM_CONFIG_MOUNT_PATH: &str = "/etc/prometheus/prometheus.yml:ro"; +const PROMETHEUS_SERVICE_NAME: &str = "prometheus"; +const PROMETHEUS_CONTAINER_PORT: u16 = 9090; +const DEFAULT_PROMETHEUS_QUERY_PORT: u16 = 19091; + +impl ComposeBinaryApp for MetricsCounterEnv { + fn compose_node_spec() -> BinaryConfigNodeSpec { + BinaryConfigNodeSpec::conventional( + "/usr/local/bin/metrics-counter-node", + NODE_CONFIG_PATH, + vec![8080, 8081], + ) + } + + fn prepare_compose_workspace( + path: &Path, + topology: &::Deployment, + _metrics_otlp_ingest_url: Option<&Url>, + ) -> Result<(), DynError> { + if prometheus_query_port().is_none() { + return Ok(()); + } + + let stack_dir = path + .parent() + .ok_or_else(|| anyhow::anyhow!("cfgsync path has no parent"))?; + let configs_dir = stack_dir.join("configs"); + fs::create_dir_all(&configs_dir)?; + fs::write( + configs_dir.join("prometheus.yml"), + render_prometheus_config(topology), + )?; + Ok(()) + } + + fn compose_extra_services( + _topology: &::Deployment, + ) -> Result, DynError> { + let Some(prom_port) = prometheus_query_port() else { + return Ok(Vec::new()); + }; + + Ok(vec![NodeDescriptor::new( + PROMETHEUS_SERVICE_NAME, + "prom/prometheus:v2.54.1", + vec![ + "/bin/prometheus".to_owned(), + "--config.file=/etc/prometheus/prometheus.yml".to_owned(), + "--web.listen-address=0.0.0.0:9090".to_owned(), + ], + vec![prometheus_config_volume()], + vec![], + vec![format!("127.0.0.1:{prom_port}:{PROMETHEUS_CONTAINER_PORT}")], + vec![PROMETHEUS_CONTAINER_PORT], + vec![EnvEntry::new("TZ", "UTC")], + None, + )]) + } +} + +fn prometheus_config_volume() -> String { + format!("./stack/configs/prometheus.yml:{PROM_CONFIG_MOUNT_PATH}") +} + +fn prometheus_query_port() -> Option { + let raw = env::var("LOGOS_BLOCKCHAIN_METRICS_QUERY_URL") + .unwrap_or_else(|_| format!("http://127.0.0.1:{DEFAULT_PROMETHEUS_QUERY_PORT}")); + let parsed = Url::parse(raw.trim()).ok()?; + parsed + .port_or_known_default() + .or(Some(DEFAULT_PROMETHEUS_QUERY_PORT)) +} + +fn render_prometheus_config(topology: &crate::MetricsCounterTopology) -> String { + let targets = (0..topology.node_count) + .map(|index| format!("\"node-{index}:8080\"")) + .collect::>() + .join(", "); + + format!( + "global:\n scrape_interval: 1s\nscrape_configs:\n - job_name: metrics_counter\n metrics_path: /metrics\n static_configs:\n - targets: [{targets}]\n" + ) +} diff --git a/examples/metrics_counter/testing/integration/src/k8s_env.rs b/examples/metrics_counter/testing/integration/src/k8s_env.rs new file mode 100644 index 0000000..f5bc308 --- /dev/null +++ b/examples/metrics_counter/testing/integration/src/k8s_env.rs @@ -0,0 +1,183 @@ +use std::{collections::BTreeMap, env}; + +use testing_framework_core::scenario::DynError; +use testing_framework_runner_k8s::{ + BinaryConfigK8sSpec, HelmManifest, K8sBinaryApp, + k8s_openapi::{ + api::{ + apps::v1::{Deployment, DeploymentSpec}, + core::v1::{ + ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, PodSpec, + PodTemplateSpec, Service, ServicePort, ServiceSpec, Volume, VolumeMount, + }, + }, + apimachinery::pkg::{ + apis::meta::v1::{LabelSelector, ObjectMeta}, + util::intstr::IntOrString, + }, + }, +}; + +use crate::MetricsCounterEnv; + +const CHART_NAME: &str = "metrics-counter"; +const NODE_NAME_PREFIX: &str = "metrics-counter-node"; +const NODE_CONFIG_PATH: &str = "/etc/metrics-counter/config.yaml"; +const CONTAINER_HTTP_PORT: u16 = 8080; +const SERVICE_TESTING_PORT: u16 = 8081; +const PROMETHEUS_SERVICE_NAME: &str = "metrics-counter-prometheus"; +const PROMETHEUS_CONTAINER_PORT: u16 = 9090; +const DEFAULT_PROMETHEUS_NODE_PORT: u16 = 30991; + +impl K8sBinaryApp for MetricsCounterEnv { + fn k8s_binary_spec() -> BinaryConfigK8sSpec { + BinaryConfigK8sSpec::conventional( + CHART_NAME, + NODE_NAME_PREFIX, + "/usr/local/bin/metrics-counter-node", + NODE_CONFIG_PATH, + CONTAINER_HTTP_PORT, + SERVICE_TESTING_PORT, + ) + } + + fn extend_k8s_manifest( + topology: &Self::Deployment, + manifest: &mut HelmManifest, + ) -> Result<(), DynError> { + manifest.extend(render_prometheus_assets(topology)?); + Ok(()) + } +} + +fn render_prometheus_assets( + topology: &crate::MetricsCounterTopology, +) -> Result { + let mut manifest = HelmManifest::new(); + manifest.push_yaml(&prometheus_config_map(topology))?; + manifest.push_yaml(&prometheus_deployment())?; + manifest.push_yaml(&prometheus_service())?; + Ok(manifest) +} + +fn prometheus_config_map(topology: &crate::MetricsCounterTopology) -> ConfigMap { + let mut data = BTreeMap::new(); + data.insert( + "prometheus.yml".to_owned(), + render_prometheus_config(topology), + ); + + ConfigMap { + metadata: ObjectMeta { + name: Some(prometheus_config_name()), + ..Default::default() + }, + data: Some(data), + ..Default::default() + } +} + +fn prometheus_deployment() -> Deployment { + let labels = prometheus_labels(); + + Deployment { + metadata: ObjectMeta { + name: Some(PROMETHEUS_SERVICE_NAME.to_owned()), + ..Default::default() + }, + spec: Some(DeploymentSpec { + replicas: Some(1), + selector: LabelSelector { + match_labels: Some(labels.clone()), + ..Default::default() + }, + template: PodTemplateSpec { + metadata: Some(ObjectMeta { + labels: Some(labels), + ..Default::default() + }), + spec: Some(PodSpec { + containers: vec![Container { + name: "prometheus".to_owned(), + image: Some("prom/prometheus:v2.54.1".to_owned()), + args: Some(vec![ + "--config.file=/etc/prometheus/prometheus.yml".to_owned(), + format!("--web.listen-address=0.0.0.0:{PROMETHEUS_CONTAINER_PORT}"), + ]), + ports: Some(vec![ContainerPort { + container_port: i32::from(PROMETHEUS_CONTAINER_PORT), + ..Default::default() + }]), + volume_mounts: Some(vec![VolumeMount { + name: "config".to_owned(), + mount_path: "/etc/prometheus/prometheus.yml".to_owned(), + sub_path: Some("prometheus.yml".to_owned()), + ..Default::default() + }]), + ..Default::default() + }], + volumes: Some(vec![Volume { + name: "config".to_owned(), + config_map: Some(ConfigMapVolumeSource { + name: prometheus_config_name(), + ..Default::default() + }), + ..Default::default() + }]), + ..Default::default() + }), + }, + ..Default::default() + }), + ..Default::default() + } +} + +fn prometheus_service() -> Service { + Service { + metadata: ObjectMeta { + name: Some(PROMETHEUS_SERVICE_NAME.to_owned()), + ..Default::default() + }, + spec: Some(ServiceSpec { + selector: Some(prometheus_labels()), + type_: Some("NodePort".to_owned()), + ports: Some(vec![ServicePort { + name: Some("http".to_owned()), + port: i32::from(PROMETHEUS_CONTAINER_PORT), + target_port: Some(IntOrString::Int(i32::from(PROMETHEUS_CONTAINER_PORT))), + node_port: Some(i32::from(prometheus_query_port())), + protocol: Some("TCP".to_owned()), + ..Default::default() + }]), + ..Default::default() + }), + ..Default::default() + } +} + +fn render_prometheus_config(topology: &crate::MetricsCounterTopology) -> String { + let targets = (0..topology.node_count) + .map(|index| format!("\"{NODE_NAME_PREFIX}-{index}:{CONTAINER_HTTP_PORT}\"")) + .collect::>() + .join(", "); + + format!( + "global:\n scrape_interval: 1s\nscrape_configs:\n - job_name: metrics_counter\n metrics_path: /metrics\n static_configs:\n - targets: [{targets}]\n" + ) +} + +fn prometheus_query_port() -> u16 { + env::var("METRICS_COUNTER_K8S_PROMETHEUS_NODE_PORT") + .ok() + .and_then(|value| value.trim().parse().ok()) + .unwrap_or(DEFAULT_PROMETHEUS_NODE_PORT) +} + +fn prometheus_labels() -> BTreeMap { + BTreeMap::from([("app".to_owned(), PROMETHEUS_SERVICE_NAME.to_owned())]) +} + +fn prometheus_config_name() -> String { + format!("{PROMETHEUS_SERVICE_NAME}-config") +} diff --git a/examples/metrics_counter/testing/integration/src/lib.rs b/examples/metrics_counter/testing/integration/src/lib.rs new file mode 100644 index 0000000..b9f8212 --- /dev/null +++ b/examples/metrics_counter/testing/integration/src/lib.rs @@ -0,0 +1,12 @@ +mod app; +mod compose_env; +mod k8s_env; +mod local_env; +pub mod scenario; + +pub use app::*; +pub use scenario::{MetricsCounterBuilderExt, MetricsCounterScenarioBuilder}; + +pub type MetricsCounterComposeDeployer = + testing_framework_runner_compose::ComposeDeployer; +pub type MetricsCounterK8sDeployer = testing_framework_runner_k8s::K8sDeployer; diff --git a/examples/metrics_counter/testing/integration/src/local_env.rs b/examples/metrics_counter/testing/integration/src/local_env.rs new file mode 100644 index 0000000..712c194 --- /dev/null +++ b/examples/metrics_counter/testing/integration/src/local_env.rs @@ -0,0 +1,44 @@ +use std::collections::HashMap; + +use testing_framework_core::scenario::{DynError, StartNodeOptions}; +use testing_framework_runner_local::{ + LocalBinaryApp, LocalNodePorts, LocalPeerNode, LocalProcessSpec, yaml_node_config, +}; + +use crate::{MetricsCounterEnv, MetricsCounterNodeConfig}; + +impl LocalBinaryApp for MetricsCounterEnv { + fn initial_node_name_prefix() -> &'static str { + "metrics-counter-node" + } + + fn build_local_node_config_with_peers( + _topology: &Self::Deployment, + index: usize, + ports: &LocalNodePorts, + _peers: &[LocalPeerNode], + _peer_ports_by_name: &HashMap, + _options: &StartNodeOptions, + _template_config: Option< + &::NodeConfig, + >, + ) -> Result<::NodeConfig, DynError> { + Ok(MetricsCounterNodeConfig { + node_id: index as u64, + http_port: ports.network_port(), + }) + } + + fn local_process_spec() -> LocalProcessSpec { + LocalProcessSpec::new("METRICS_COUNTER_NODE_BIN", "metrics-counter-node") + .with_rust_log("metrics_counter_node=info") + } + + fn render_local_config(config: &MetricsCounterNodeConfig) -> Result, DynError> { + yaml_node_config(config) + } + + fn http_api_port(config: &MetricsCounterNodeConfig) -> u16 { + config.http_port + } +} diff --git a/examples/metrics_counter/testing/integration/src/scenario.rs b/examples/metrics_counter/testing/integration/src/scenario.rs new file mode 100644 index 0000000..7f99191 --- /dev/null +++ b/examples/metrics_counter/testing/integration/src/scenario.rs @@ -0,0 +1,15 @@ +use testing_framework_core::scenario::ScenarioBuilder; + +use crate::{MetricsCounterEnv, MetricsCounterTopology}; + +pub type MetricsCounterScenarioBuilder = ScenarioBuilder; + +pub trait MetricsCounterBuilderExt: Sized { + fn deployment_with(f: impl FnOnce(MetricsCounterTopology) -> MetricsCounterTopology) -> Self; +} + +impl MetricsCounterBuilderExt for MetricsCounterScenarioBuilder { + fn deployment_with(f: impl FnOnce(MetricsCounterTopology) -> MetricsCounterTopology) -> Self { + MetricsCounterScenarioBuilder::with_deployment(f(MetricsCounterTopology::new(3))) + } +} diff --git a/examples/metrics_counter/testing/workloads/Cargo.toml b/examples/metrics_counter/testing/workloads/Cargo.toml new file mode 100644 index 0000000..a06456e --- /dev/null +++ b/examples/metrics_counter/testing/workloads/Cargo.toml @@ -0,0 +1,15 @@ +[package] +edition.workspace = true +license.workspace = true +name = "metrics-counter-runtime-workloads" +version.workspace = true + +[dependencies] +metrics-counter-runtime-ext = { path = "../integration" } +testing-framework-core = { workspace = true } + +async-trait = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } diff --git a/examples/metrics_counter/testing/workloads/src/expectations.rs b/examples/metrics_counter/testing/workloads/src/expectations.rs new file mode 100644 index 0000000..f0d348f --- /dev/null +++ b/examples/metrics_counter/testing/workloads/src/expectations.rs @@ -0,0 +1,50 @@ +use async_trait::async_trait; +use metrics_counter_runtime_ext::MetricsCounterEnv; +use testing_framework_core::scenario::{DynError, Expectation, RunContext}; +use tracing::info; + +#[derive(Clone)] +pub struct PrometheusCounterAtLeast { + min_total: f64, +} + +impl PrometheusCounterAtLeast { + #[must_use] + pub const fn new(min_total: f64) -> Self { + Self { min_total } + } +} + +#[async_trait] +impl Expectation for PrometheusCounterAtLeast { + fn name(&self) -> &str { + "prometheus_counter_at_least" + } + + async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> { + if !ctx.telemetry().is_configured() { + return Err( + "prometheus endpoint unavailable; set LOGOS_BLOCKCHAIN_METRICS_QUERY_URL".into(), + ); + } + + let total = ctx + .telemetry() + .counter_value("sum(metrics_counter_increments_total)")?; + + if total < self.min_total { + return Err(format!( + "metrics_counter_increments_total below threshold: total={total}, min={}", + self.min_total + ) + .into()); + } + + info!( + total, + min = self.min_total, + "prometheus counter threshold met" + ); + Ok(()) + } +} diff --git a/examples/metrics_counter/testing/workloads/src/increment.rs b/examples/metrics_counter/testing/workloads/src/increment.rs new file mode 100644 index 0000000..3fc7b50 --- /dev/null +++ b/examples/metrics_counter/testing/workloads/src/increment.rs @@ -0,0 +1,83 @@ +use std::time::Duration; + +use async_trait::async_trait; +use metrics_counter_runtime_ext::MetricsCounterEnv; +use serde::Serialize; +use testing_framework_core::scenario::{DynError, RunContext, Workload}; +use tracing::info; + +#[derive(Clone)] +pub struct CounterIncrementWorkload { + operations: usize, + rate_per_sec: Option, +} + +#[derive(Serialize)] +struct IncrementRequest {} + +impl CounterIncrementWorkload { + #[must_use] + pub fn new() -> Self { + Self { + operations: 200, + rate_per_sec: Some(25), + } + } + + #[must_use] + pub const fn operations(mut self, value: usize) -> Self { + self.operations = value; + self + } + + #[must_use] + pub const fn rate_per_sec(mut self, value: usize) -> Self { + self.rate_per_sec = Some(value); + self + } +} + +impl Default for CounterIncrementWorkload { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl Workload for CounterIncrementWorkload { + fn name(&self) -> &str { + "counter_increment_workload" + } + + async fn start(&self, ctx: &RunContext) -> Result<(), DynError> { + let clients = ctx.node_clients().snapshot(); + if clients.is_empty() { + return Err("no metrics-counter node clients available".into()); + } + + let interval = self.rate_per_sec.and_then(compute_interval); + info!(operations = self.operations, rate_per_sec = ?self.rate_per_sec, "starting counter increment workload"); + + for idx in 0..self.operations { + let client = &clients[idx % clients.len()]; + let _: serde_json::Value = client.post("/counter/inc", &IncrementRequest {}).await?; + + if (idx + 1) % 50 == 0 { + info!(completed = idx + 1, "counter increment progress"); + } + + if let Some(delay) = interval { + tokio::time::sleep(delay).await; + } + } + + Ok(()) + } +} + +fn compute_interval(rate_per_sec: usize) -> Option { + if rate_per_sec == 0 { + return None; + } + Some(Duration::from_millis((1000 / rate_per_sec as u64).max(1))) +} diff --git a/examples/metrics_counter/testing/workloads/src/lib.rs b/examples/metrics_counter/testing/workloads/src/lib.rs new file mode 100644 index 0000000..71b977a --- /dev/null +++ b/examples/metrics_counter/testing/workloads/src/lib.rs @@ -0,0 +1,9 @@ +mod expectations; +mod increment; + +pub use expectations::PrometheusCounterAtLeast; +pub use increment::CounterIncrementWorkload; +pub use metrics_counter_runtime_ext::{ + MetricsCounterBuilderExt, MetricsCounterEnv, MetricsCounterScenarioBuilder, + MetricsCounterTopology, +}; diff --git a/examples/redis_streams/README.md b/examples/redis_streams/README.md new file mode 100644 index 0000000..ebcfd56 --- /dev/null +++ b/examples/redis_streams/README.md @@ -0,0 +1,35 @@ +# Redis Streams Example + +This example uses Redis Streams consumer groups. + +There are two scenarios. One produces messages, consumes them, acknowledges +them, and checks that the pending queue drains to zero. The other leaves +messages pending on one consumer and has a second consumer reclaim them with +`XAUTOCLAIM`. + +This example is compose-only. + +## How TF runs this + +Each example follows the same pattern: + +- TF starts Redis in Docker Compose +- a workload drives stream and consumer-group operations through the Redis client +- an expectation checks health and pending-message state + +## Scenarios + +- `compose_roundtrip` covers stream setup, production, consumer-group reads, acknowledgements, and pending drain +- `compose_failover` simulates pending-message reclaim after one consumer stops acknowledging + +## Run with Docker Compose + +```bash +cargo run -p redis-streams-examples --bin compose_roundtrip +``` + +## Run the reclaim scenario + +```bash +cargo run -p redis-streams-examples --bin compose_failover +``` diff --git a/examples/redis_streams/examples/Cargo.toml b/examples/redis_streams/examples/Cargo.toml new file mode 100644 index 0000000..31b0ce7 --- /dev/null +++ b/examples/redis_streams/examples/Cargo.toml @@ -0,0 +1,15 @@ +[package] +edition.workspace = true +license.workspace = true +name = "redis-streams-examples" +version.workspace = true + +[dependencies] +anyhow = "1.0" +redis-streams-runtime-ext = { path = "../testing/integration" } +redis-streams-runtime-workloads = { path = "../testing/workloads" } +testing-framework-core = { workspace = true } +testing-framework-runner-compose = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/examples/redis_streams/examples/src/bin/compose_failover.rs b/examples/redis_streams/examples/src/bin/compose_failover.rs new file mode 100644 index 0000000..26de215 --- /dev/null +++ b/examples/redis_streams/examples/src/bin/compose_failover.rs @@ -0,0 +1,49 @@ +use std::time::Duration; + +use anyhow::{Context as _, Result}; +use redis_streams_runtime_ext::RedisStreamsComposeDeployer; +use redis_streams_runtime_workloads::{ + RedisStreamsBuilderExt, RedisStreamsClusterHealthy, RedisStreamsReclaimFailoverWorkload, + RedisStreamsScenarioBuilder, +}; +use testing_framework_core::scenario::Deployer; +use testing_framework_runner_compose::ComposeRunnerError; +use tracing::{info, warn}; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let mut scenario = RedisStreamsScenarioBuilder::deployment_with(|topology| topology) + .with_run_duration(Duration::from_secs(30)) + .with_workload( + RedisStreamsReclaimFailoverWorkload::new("tf-stream", "tf-group") + .messages(300) + .batch(64), + ) + .with_expectation(RedisStreamsClusterHealthy::new()) + .build()?; + + let deployer = RedisStreamsComposeDeployer::new(); + let runner = match deployer.deploy(&scenario).await { + Ok(runner) => runner, + Err(ComposeRunnerError::DockerUnavailable) => { + warn!("docker unavailable; skipping redis streams compose failover run"); + return Ok(()); + } + Err(error) => { + return Err(anyhow::Error::new(error)) + .context("deploying redis streams compose failover stack"); + } + }; + + info!("running redis streams compose failover scenario"); + runner + .run(&mut scenario) + .await + .context("running redis streams compose failover scenario")?; + + Ok(()) +} diff --git a/examples/redis_streams/examples/src/bin/compose_roundtrip.rs b/examples/redis_streams/examples/src/bin/compose_roundtrip.rs new file mode 100644 index 0000000..456afd8 --- /dev/null +++ b/examples/redis_streams/examples/src/bin/compose_roundtrip.rs @@ -0,0 +1,48 @@ +use std::time::Duration; + +use anyhow::{Context as _, Result}; +use redis_streams_runtime_ext::RedisStreamsComposeDeployer; +use redis_streams_runtime_workloads::{ + RedisStreamsBuilderExt, RedisStreamsClusterHealthy, RedisStreamsRoundTripWorkload, + RedisStreamsScenarioBuilder, +}; +use testing_framework_core::scenario::Deployer; +use testing_framework_runner_compose::ComposeRunnerError; +use tracing::{info, warn}; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let mut scenario = RedisStreamsScenarioBuilder::deployment_with(|topology| topology) + .with_run_duration(Duration::from_secs(30)) + .with_workload( + RedisStreamsRoundTripWorkload::new("tf-stream", "tf-group") + .messages(300) + .read_batch(50), + ) + .with_expectation(RedisStreamsClusterHealthy::new()) + .build()?; + + let deployer = RedisStreamsComposeDeployer::new(); + let runner = match deployer.deploy(&scenario).await { + Ok(runner) => runner, + Err(ComposeRunnerError::DockerUnavailable) => { + warn!("docker unavailable; skipping redis streams compose run"); + return Ok(()); + } + Err(error) => { + return Err(anyhow::Error::new(error)).context("deploying redis streams compose stack"); + } + }; + + info!("running redis streams compose roundtrip scenario"); + runner + .run(&mut scenario) + .await + .context("running redis streams compose scenario")?; + + Ok(()) +} diff --git a/examples/redis_streams/testing/integration/Cargo.toml b/examples/redis_streams/testing/integration/Cargo.toml new file mode 100644 index 0000000..f1a69e5 --- /dev/null +++ b/examples/redis_streams/testing/integration/Cargo.toml @@ -0,0 +1,13 @@ +[package] +edition.workspace = true +license.workspace = true +name = "redis-streams-runtime-ext" +version.workspace = true + +[dependencies] +testing-framework-core = { workspace = true } +testing-framework-runner-compose = { workspace = true } + +async-trait = { workspace = true } +redis = { version = "0.29", features = ["aio", "connection-manager", "streams", "tokio-comp"] } +reqwest = { workspace = true } diff --git a/examples/redis_streams/testing/integration/src/app.rs b/examples/redis_streams/testing/integration/src/app.rs new file mode 100644 index 0000000..2a731b6 --- /dev/null +++ b/examples/redis_streams/testing/integration/src/app.rs @@ -0,0 +1,172 @@ +use async_trait::async_trait; +use redis::{ + AsyncCommands, Client, + aio::ConnectionManager, + streams::{StreamAutoClaimOptions, StreamAutoClaimReply, StreamPendingReply, StreamReadReply}, +}; +use testing_framework_core::{ + cfgsync::{StaticNodeConfigProvider, serialize_plain_text_config}, + scenario::{Application, DynError, NodeAccess}, +}; + +pub type RedisStreamsTopology = testing_framework_core::topology::ClusterTopology; + +#[derive(Clone)] +pub struct RedisStreamsClient { + url: String, + client: Client, +} + +impl RedisStreamsClient { + pub fn new(url: String) -> Result { + let client = Client::open(url.clone())?; + Ok(Self { url, client }) + } + + pub fn url(&self) -> &str { + &self.url + } + + async fn connection(&self) -> Result { + Ok(self.client.get_connection_manager().await?) + } + + pub async fn ping(&self) -> Result<(), DynError> { + let mut conn = self.connection().await?; + redis::cmd("PING").query_async::(&mut conn).await?; + Ok(()) + } + + pub async fn ensure_group(&self, stream: &str, group: &str) -> Result<(), DynError> { + let mut conn = self.connection().await?; + let result = redis::cmd("XGROUP") + .arg("CREATE") + .arg(stream) + .arg(group) + .arg("$") + .arg("MKSTREAM") + .query_async::<()>(&mut conn) + .await; + + match result { + Ok(()) => Ok(()), + Err(error) if error.to_string().contains("BUSYGROUP") => Ok(()), + Err(error) => Err(error.into()), + } + } + + pub async fn append_message(&self, stream: &str, payload: &str) -> Result { + let mut conn = self.connection().await?; + let id = redis::cmd("XADD") + .arg(stream) + .arg("*") + .arg("payload") + .arg(payload) + .query_async::(&mut conn) + .await?; + Ok(id) + } + + pub async fn read_group_batch( + &self, + stream: &str, + group: &str, + consumer: &str, + count: usize, + block_ms: usize, + ) -> Result, DynError> { + let mut conn = self.connection().await?; + + let options = redis::streams::StreamReadOptions::default() + .group(group, consumer) + .count(count) + .block(block_ms); + + let reply: StreamReadReply = conn.xread_options(&[stream], &[">"], &options).await?; + + let mut ids = Vec::new(); + for key in reply.keys { + for entry in key.ids { + ids.push(entry.id); + } + } + + Ok(ids) + } + + pub async fn ack_messages( + &self, + stream: &str, + group: &str, + ids: &[String], + ) -> Result { + if ids.is_empty() { + return Ok(0); + } + + let mut conn = self.connection().await?; + let acked = redis::cmd("XACK") + .arg(stream) + .arg(group) + .arg(ids) + .query_async::(&mut conn) + .await?; + Ok(acked) + } + + pub async fn autoclaim_batch( + &self, + stream: &str, + group: &str, + consumer: &str, + min_idle_ms: u64, + start: &str, + count: usize, + ) -> Result<(String, Vec), DynError> { + let mut conn = self.connection().await?; + let options = StreamAutoClaimOptions::default().count(count); + let reply: StreamAutoClaimReply = conn + .xautoclaim_options(stream, group, consumer, min_idle_ms, start, options) + .await?; + + Ok(( + reply.next_stream_id, + reply.claimed.into_iter().map(|entry| entry.id).collect(), + )) + } + + pub async fn pending_count(&self, stream: &str, group: &str) -> Result { + let mut conn = self.connection().await?; + let reply: StreamPendingReply = conn.xpending(stream, group).await?; + Ok(reply.count() as u64) + } +} + +pub struct RedisStreamsEnv; + +#[async_trait] +impl Application for RedisStreamsEnv { + type Deployment = RedisStreamsTopology; + type NodeClient = RedisStreamsClient; + type NodeConfig = String; + + fn build_node_client(access: &NodeAccess) -> Result { + let port = access.testing_port().unwrap_or(access.api_port()); + RedisStreamsClient::new(format!("redis://{}:{port}", access.host())) + } +} + +impl StaticNodeConfigProvider for RedisStreamsEnv { + type Error = std::convert::Infallible; + + fn build_node_config( + _deployment: &Self::Deployment, + _node_index: usize, + ) -> Result { + Ok("appendonly yes\nprotected-mode no\n".to_owned()) + } + + fn serialize_node_config(config: &Self::NodeConfig) -> Result { + serialize_plain_text_config(config) + } +} diff --git a/examples/redis_streams/testing/integration/src/compose_env.rs b/examples/redis_streams/testing/integration/src/compose_env.rs new file mode 100644 index 0000000..8079591 --- /dev/null +++ b/examples/redis_streams/testing/integration/src/compose_env.rs @@ -0,0 +1,105 @@ +use std::{env, fs, io::Error, path::Path}; + +use testing_framework_core::{ + cfgsync::StaticArtifactRenderer, scenario::DynError, topology::DeploymentDescriptor, +}; +use testing_framework_runner_compose::{ + ComposeDeployEnv, ComposeNodeConfigFileName, ComposeReadinessProbe, EnvEntry, + LoopbackNodeRuntimeSpec, infrastructure::ports::NodeHostPorts, +}; + +use crate::{RedisStreamsClient, RedisStreamsEnv}; + +const NODE_CONFIG_PATH: &str = "/usr/local/etc/redis/redis.conf"; +const REDIS_PORT: u16 = 6379; + +impl ComposeDeployEnv for RedisStreamsEnv { + fn prepare_compose_configs( + path: &Path, + topology: &Self::Deployment, + _cfgsync_port: u16, + _metrics_otlp_ingest_url: Option<&reqwest::Url>, + ) -> Result<(), DynError> { + let hostnames = Self::cfgsync_hostnames(topology); + let configs_dir = path + .parent() + .ok_or_else(|| Error::other("cfgsync path has no parent"))? + .join("configs"); + fs::create_dir_all(&configs_dir)?; + + for index in 0..topology.node_count() { + let mut config = ::build_node_config(topology, index)?; + ::rewrite_for_hostnames( + topology, + index, + &hostnames, + &mut config, + )?; + let rendered = ::serialize_node_config(&config)?; + fs::write( + configs_dir.join(Self::static_node_config_file_name(index)), + rendered, + )?; + } + + Ok(()) + } + + fn static_node_config_file_name(index: usize) -> String { + ComposeNodeConfigFileName::FixedExtension("conf").resolve(index) + } + + fn loopback_node_runtime_spec( + _topology: &Self::Deployment, + index: usize, + ) -> Result, DynError> { + Ok(Some(build_redis_runtime(index))) + } + + fn node_client_from_ports( + ports: &NodeHostPorts, + host: &str, + ) -> Result { + redis_client_from_ports(ports, host) + } + + fn readiness_probe() -> ComposeReadinessProbe { + ComposeReadinessProbe::Tcp + } +} + +fn build_redis_runtime(index: usize) -> LoopbackNodeRuntimeSpec { + let image = env::var("REDIS_STREAMS_IMAGE").unwrap_or_else(|_| "redis:7".to_owned()); + let platform = env::var("REDIS_STREAMS_PLATFORM").ok(); + LoopbackNodeRuntimeSpec { + image, + entrypoint: build_redis_entrypoint(), + volumes: vec![format!( + "./stack/configs/node-{index}.conf:{NODE_CONFIG_PATH}:ro" + )], + extra_hosts: vec![], + container_ports: vec![REDIS_PORT], + environment: vec![EnvEntry::new("RUST_LOG", "info")], + platform, + } +} + +fn redis_client_from_ports( + ports: &NodeHostPorts, + host: &str, +) -> Result { + RedisStreamsClient::new(format!("redis://{host}:{}", ports.api)) +} + +fn build_redis_entrypoint() -> Vec { + vec![ + "redis-server".to_owned(), + NODE_CONFIG_PATH.to_owned(), + "--bind".to_owned(), + "0.0.0.0".to_owned(), + "--port".to_owned(), + REDIS_PORT.to_string(), + "--protected-mode".to_owned(), + "no".to_owned(), + ] +} diff --git a/examples/redis_streams/testing/integration/src/lib.rs b/examples/redis_streams/testing/integration/src/lib.rs new file mode 100644 index 0000000..9a97e0f --- /dev/null +++ b/examples/redis_streams/testing/integration/src/lib.rs @@ -0,0 +1,9 @@ +mod app; +mod compose_env; +pub mod scenario; + +pub use app::*; +pub use scenario::{RedisStreamsBuilderExt, RedisStreamsScenarioBuilder}; + +pub type RedisStreamsComposeDeployer = + testing_framework_runner_compose::ComposeDeployer; diff --git a/examples/redis_streams/testing/integration/src/scenario.rs b/examples/redis_streams/testing/integration/src/scenario.rs new file mode 100644 index 0000000..c6cc680 --- /dev/null +++ b/examples/redis_streams/testing/integration/src/scenario.rs @@ -0,0 +1,15 @@ +use testing_framework_core::scenario::ScenarioBuilder; + +use crate::{RedisStreamsEnv, RedisStreamsTopology}; + +pub type RedisStreamsScenarioBuilder = ScenarioBuilder; + +pub trait RedisStreamsBuilderExt: Sized { + fn deployment_with(f: impl FnOnce(RedisStreamsTopology) -> RedisStreamsTopology) -> Self; +} + +impl RedisStreamsBuilderExt for RedisStreamsScenarioBuilder { + fn deployment_with(f: impl FnOnce(RedisStreamsTopology) -> RedisStreamsTopology) -> Self { + RedisStreamsScenarioBuilder::with_deployment(f(RedisStreamsTopology::new(3))) + } +} diff --git a/examples/redis_streams/testing/workloads/Cargo.toml b/examples/redis_streams/testing/workloads/Cargo.toml new file mode 100644 index 0000000..5cf411d --- /dev/null +++ b/examples/redis_streams/testing/workloads/Cargo.toml @@ -0,0 +1,13 @@ +[package] +edition.workspace = true +license.workspace = true +name = "redis-streams-runtime-workloads" +version.workspace = true + +[dependencies] +redis-streams-runtime-ext = { path = "../integration" } +testing-framework-core = { workspace = true } + +async-trait = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } diff --git a/examples/redis_streams/testing/workloads/src/health.rs b/examples/redis_streams/testing/workloads/src/health.rs new file mode 100644 index 0000000..b409c98 --- /dev/null +++ b/examples/redis_streams/testing/workloads/src/health.rs @@ -0,0 +1,74 @@ +use std::time::Duration; + +use async_trait::async_trait; +use redis_streams_runtime_ext::{RedisStreamsClient, RedisStreamsEnv}; +use testing_framework_core::scenario::{DynError, Expectation, RunContext}; +use tokio::time::Instant; +use tracing::info; + +#[derive(Clone)] +pub struct RedisStreamsClusterHealthy { + timeout: Duration, + poll_interval: Duration, +} + +impl RedisStreamsClusterHealthy { + #[must_use] + pub const fn new() -> Self { + Self { + timeout: Duration::from_secs(20), + poll_interval: Duration::from_millis(500), + } + } + + #[must_use] + pub const fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } +} + +impl Default for RedisStreamsClusterHealthy { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl Expectation for RedisStreamsClusterHealthy { + fn name(&self) -> &str { + "redis_streams_cluster_healthy" + } + + async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> { + let clients = ctx.node_clients().snapshot(); + if clients.is_empty() { + return Err("no redis streams node clients available".into()); + } + + let deadline = Instant::now() + self.timeout; + while Instant::now() < deadline { + if all_nodes_healthy(&clients).await? { + info!(nodes = clients.len(), "redis streams cluster healthy"); + return Ok(()); + } + + tokio::time::sleep(self.poll_interval).await; + } + + Err(format!( + "redis streams cluster not healthy within {:?}", + self.timeout + ) + .into()) + } +} + +async fn all_nodes_healthy(clients: &[RedisStreamsClient]) -> Result { + for client in clients { + if client.ping().await.is_err() { + return Ok(false); + } + } + Ok(true) +} diff --git a/examples/redis_streams/testing/workloads/src/lib.rs b/examples/redis_streams/testing/workloads/src/lib.rs new file mode 100644 index 0000000..35d17bc --- /dev/null +++ b/examples/redis_streams/testing/workloads/src/lib.rs @@ -0,0 +1,10 @@ +mod health; +mod reclaim_failover; +mod roundtrip; + +pub use health::RedisStreamsClusterHealthy; +pub use reclaim_failover::RedisStreamsReclaimFailoverWorkload; +pub use redis_streams_runtime_ext::{ + RedisStreamsBuilderExt, RedisStreamsEnv, RedisStreamsScenarioBuilder, RedisStreamsTopology, +}; +pub use roundtrip::RedisStreamsRoundTripWorkload; diff --git a/examples/redis_streams/testing/workloads/src/reclaim_failover.rs b/examples/redis_streams/testing/workloads/src/reclaim_failover.rs new file mode 100644 index 0000000..bb1e1c8 --- /dev/null +++ b/examples/redis_streams/testing/workloads/src/reclaim_failover.rs @@ -0,0 +1,202 @@ +use std::time::Duration; + +use async_trait::async_trait; +use redis_streams_runtime_ext::{RedisStreamsClient, RedisStreamsEnv}; +use testing_framework_core::scenario::{DynError, RunContext, Workload}; +use tokio::time::Instant; +use tracing::info; + +#[derive(Clone)] +pub struct RedisStreamsReclaimFailoverWorkload { + stream: String, + group: String, + producer_consumer: String, + failover_consumer: String, + messages: usize, + batch: usize, + timeout: Duration, +} + +impl RedisStreamsReclaimFailoverWorkload { + #[must_use] + pub fn new(stream: impl Into, group: impl Into) -> Self { + Self { + stream: stream.into(), + group: group.into(), + producer_consumer: "worker-a".to_owned(), + failover_consumer: "worker-b".to_owned(), + messages: 200, + batch: 64, + timeout: Duration::from_secs(20), + } + } + + #[must_use] + pub const fn messages(mut self, value: usize) -> Self { + self.messages = value; + self + } + + #[must_use] + pub const fn batch(mut self, value: usize) -> Self { + self.batch = value; + self + } + + #[must_use] + pub fn producer_consumer(mut self, value: impl Into) -> Self { + self.producer_consumer = value.into(); + self + } + + #[must_use] + pub fn failover_consumer(mut self, value: impl Into) -> Self { + self.failover_consumer = value.into(); + self + } + + #[must_use] + pub const fn timeout(mut self, value: Duration) -> Self { + self.timeout = value; + self + } +} + +#[async_trait] +impl Workload for RedisStreamsReclaimFailoverWorkload { + fn name(&self) -> &str { + "redis_streams_reclaim_failover_workload" + } + + async fn start(&self, ctx: &RunContext) -> Result<(), DynError> { + let clients = ctx.node_clients().snapshot(); + if clients.is_empty() { + return Err("redis streams failover workload requires at least 1 node client".into()); + } + + let driver = &clients[0]; + driver.ensure_group(&self.stream, &self.group).await?; + + info!(messages = self.messages, stream = %self.stream, group = %self.group, "redis streams failover: produce phase"); + produce_messages(driver, &self.stream, self.messages).await?; + + info!(messages = self.messages, consumer = %self.producer_consumer, "redis streams failover: create pending phase"); + let pending_count = create_pending_messages( + driver, + &self.stream, + &self.group, + &self.producer_consumer, + self.messages, + self.batch, + self.timeout, + ) + .await?; + + info!(pending = pending_count, from = %self.producer_consumer, to = %self.failover_consumer, "redis streams failover: reclaim+ack phase"); + reclaim_and_ack_pending( + driver, + &self.stream, + &self.group, + &self.failover_consumer, + pending_count, + self.batch, + self.timeout, + ) + .await?; + + let pending = driver.pending_count(&self.stream, &self.group).await?; + if pending != 0 { + return Err( + format!("redis streams pending entries remain after reclaim: {pending}").into(), + ); + } + + info!( + pending = pending_count, + "redis streams failover reclaim complete" + ); + Ok(()) + } +} + +async fn produce_messages( + client: &RedisStreamsClient, + stream: &str, + messages: usize, +) -> Result<(), DynError> { + for idx in 0..messages { + let payload = format!("msg-{idx}"); + client.append_message(stream, &payload).await?; + } + + Ok(()) +} + +async fn create_pending_messages( + client: &RedisStreamsClient, + stream: &str, + group: &str, + consumer: &str, + expected: usize, + batch: usize, + timeout: Duration, +) -> Result { + let mut claimed = 0usize; + let deadline = Instant::now() + timeout; + + while claimed < expected && Instant::now() < deadline { + let ids = client + .read_group_batch(stream, group, consumer, batch, 500) + .await?; + + if ids.is_empty() { + continue; + } + + claimed += ids.len(); + } + + if claimed == expected { + return Ok(claimed); + } + + Err(format!("redis streams pending creation timed out: claimed {claimed}/{expected}").into()) +} + +async fn reclaim_and_ack_pending( + client: &RedisStreamsClient, + stream: &str, + group: &str, + failover_consumer: &str, + expected: usize, + batch: usize, + timeout: Duration, +) -> Result<(), DynError> { + let mut reclaimed = 0usize; + let mut cursor = "0-0".to_owned(); + let deadline = Instant::now() + timeout; + + while reclaimed < expected && Instant::now() < deadline { + let (next_cursor, ids) = client + .autoclaim_batch(stream, group, failover_consumer, 0, &cursor, batch) + .await?; + + if ids.is_empty() { + if next_cursor == "0-0" { + break; + } + cursor = next_cursor; + continue; + } + + let acked = client.ack_messages(stream, group, &ids).await? as usize; + reclaimed += acked; + cursor = next_cursor; + } + + if reclaimed == expected { + return Ok(()); + } + + Err(format!("redis streams reclaim timed out: acked {reclaimed}/{expected}").into()) +} diff --git a/examples/redis_streams/testing/workloads/src/roundtrip.rs b/examples/redis_streams/testing/workloads/src/roundtrip.rs new file mode 100644 index 0000000..4b3cfd0 --- /dev/null +++ b/examples/redis_streams/testing/workloads/src/roundtrip.rs @@ -0,0 +1,131 @@ +use std::time::Duration; + +use async_trait::async_trait; +use redis_streams_runtime_ext::RedisStreamsEnv; +use testing_framework_core::scenario::{DynError, RunContext, Workload}; +use tokio::time::Instant; +use tracing::info; + +#[derive(Clone)] +pub struct RedisStreamsRoundTripWorkload { + stream: String, + group: String, + consumer: String, + messages: usize, + read_batch: usize, + timeout: Duration, +} + +impl RedisStreamsRoundTripWorkload { + #[must_use] + pub fn new(stream: impl Into, group: impl Into) -> Self { + Self { + stream: stream.into(), + group: group.into(), + consumer: "worker-1".to_owned(), + messages: 200, + read_batch: 32, + timeout: Duration::from_secs(20), + } + } + + #[must_use] + pub const fn messages(mut self, value: usize) -> Self { + self.messages = value; + self + } + + #[must_use] + pub const fn read_batch(mut self, value: usize) -> Self { + self.read_batch = value; + self + } + + #[must_use] + pub fn consumer(mut self, value: impl Into) -> Self { + self.consumer = value.into(); + self + } + + #[must_use] + pub const fn timeout(mut self, value: Duration) -> Self { + self.timeout = value; + self + } +} + +#[async_trait] +impl Workload for RedisStreamsRoundTripWorkload { + fn name(&self) -> &str { + "redis_streams_roundtrip_workload" + } + + async fn start(&self, ctx: &RunContext) -> Result<(), DynError> { + let clients = ctx.node_clients().snapshot(); + if clients.is_empty() { + return Err("redis streams workload requires at least 1 node".into()); + } + + let driver = &clients[0]; + + driver.ensure_group(&self.stream, &self.group).await?; + + info!(messages = self.messages, stream = %self.stream, group = %self.group, "redis streams produce phase"); + for idx in 0..self.messages { + let payload = format!("msg-{idx}"); + driver.append_message(&self.stream, &payload).await?; + } + + info!(messages = self.messages, stream = %self.stream, group = %self.group, "redis streams consume+ack phase"); + consume_and_ack( + driver, + &self.stream, + &self.group, + &self.consumer, + self.messages, + self.read_batch, + self.timeout, + ) + .await?; + + let pending = driver.pending_count(&self.stream, &self.group).await?; + if pending != 0 { + return Err(format!("redis streams pending entries remain: {pending}").into()); + } + + info!(messages = self.messages, stream = %self.stream, group = %self.group, "redis streams roundtrip finished"); + Ok(()) + } +} + +async fn consume_and_ack( + client: &redis_streams_runtime_ext::RedisStreamsClient, + stream: &str, + group: &str, + consumer: &str, + expected: usize, + batch: usize, + timeout: Duration, +) -> Result<(), DynError> { + let mut acked_total = 0usize; + let deadline = Instant::now() + timeout; + + while acked_total < expected && Instant::now() < deadline { + let ids = client + .read_group_batch(stream, group, consumer, batch, 500) + .await?; + + if ids.is_empty() { + continue; + } + + let acked = client.ack_messages(stream, group, &ids).await? as usize; + acked_total += acked; + } + + if acked_total == expected { + return Ok(()); + } + + Err(format!("redis streams timed out: acked {acked_total}/{expected} messages").into()) +}