mirror of
https://github.com/logos-blockchain/logos-blockchain-testing.git
synced 2026-04-11 13:43:08 +00:00
feat(examples): add specialized backend demo apps
This commit is contained in:
parent
7439f4799a
commit
d663ff4766
298
Cargo.lock
generated
298
Cargo.lock
generated
@ -95,6 +95,15 @@ version = "1.0.101"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea"
|
||||
|
||||
[[package]]
|
||||
name = "arc-swap"
|
||||
version = "1.9.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6a3a1fd6f75306b68087b831f025c712524bcb19aad54e557b1129cfa0a2b207"
|
||||
dependencies = [
|
||||
"rustversion",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-broadcast"
|
||||
version = "0.7.2"
|
||||
@ -177,11 +186,13 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_path_to_error",
|
||||
"serde_urlencoded",
|
||||
"sync_wrapper",
|
||||
"tokio",
|
||||
"tower",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -202,6 +213,7 @@ dependencies = [
|
||||
"sync_wrapper",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -405,6 +417,20 @@ version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
|
||||
|
||||
[[package]]
|
||||
name = "combine"
|
||||
version = "4.6.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"memchr",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "concurrent-queue"
|
||||
version = "2.5.0"
|
||||
@ -1026,7 +1052,7 @@ dependencies = [
|
||||
"libc",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"socket2",
|
||||
"socket2 0.6.2",
|
||||
"tokio",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
@ -1436,6 +1462,15 @@ version = "0.4.29"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897"
|
||||
|
||||
[[package]]
|
||||
name = "matchers"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
|
||||
dependencies = [
|
||||
"regex-automata",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "matchit"
|
||||
version = "0.7.3"
|
||||
@ -1448,6 +1483,71 @@ version = "2.7.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273"
|
||||
|
||||
[[package]]
|
||||
name = "metrics-counter-examples"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"metrics-counter-node",
|
||||
"metrics-counter-runtime-ext",
|
||||
"metrics-counter-runtime-workloads",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"testing-framework-core",
|
||||
"testing-framework-runner-compose",
|
||||
"testing-framework-runner-k8s",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "metrics-counter-node"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"axum",
|
||||
"clap",
|
||||
"prometheus",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_yaml",
|
||||
"tokio",
|
||||
"tower-http",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "metrics-counter-runtime-ext"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"metrics-counter-node",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_yaml",
|
||||
"testing-framework-core",
|
||||
"testing-framework-runner-compose",
|
||||
"testing-framework-runner-k8s",
|
||||
"testing-framework-runner-local",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "metrics-counter-runtime-workloads"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"metrics-counter-runtime-ext",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"testing-framework-core",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mime"
|
||||
version = "0.3.17"
|
||||
@ -1482,12 +1582,40 @@ dependencies = [
|
||||
"tempfile",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nu-ansi-term"
|
||||
version = "0.50.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
|
||||
dependencies = [
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-bigint"
|
||||
version = "0.4.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9"
|
||||
dependencies = [
|
||||
"num-integer",
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-conv"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050"
|
||||
|
||||
[[package]]
|
||||
name = "num-integer"
|
||||
version = "0.1.46"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f"
|
||||
dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-traits"
|
||||
version = "0.2.19"
|
||||
@ -1789,6 +1917,21 @@ dependencies = [
|
||||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prometheus"
|
||||
version = "0.14.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3ca5326d8d0b950a9acd87e6a3f94745394f62e4dae1b1ee22b2bc0c394af43a"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"fnv",
|
||||
"lazy_static",
|
||||
"memchr",
|
||||
"parking_lot",
|
||||
"protobuf",
|
||||
"thiserror 2.0.18",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "prometheus-http-query"
|
||||
version = "0.8.3"
|
||||
@ -1803,6 +1946,26 @@ dependencies = [
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protobuf"
|
||||
version = "3.7.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
"protobuf-support",
|
||||
"thiserror 1.0.69",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "protobuf-support"
|
||||
version = "3.7.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6"
|
||||
dependencies = [
|
||||
"thiserror 1.0.69",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "1.0.44"
|
||||
@ -1848,6 +2011,66 @@ dependencies = [
|
||||
"getrandom 0.2.17",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redis"
|
||||
version = "0.29.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1bc42f3a12fd4408ce64d8efef67048a924e543bd35c6591c0447fda9054695f"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"backon",
|
||||
"bytes",
|
||||
"combine",
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
"itoa",
|
||||
"num-bigint",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"ryu",
|
||||
"sha1_smol",
|
||||
"socket2 0.5.10",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redis-streams-examples"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"redis-streams-runtime-ext",
|
||||
"redis-streams-runtime-workloads",
|
||||
"testing-framework-core",
|
||||
"testing-framework-runner-compose",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redis-streams-runtime-ext"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"redis",
|
||||
"reqwest",
|
||||
"testing-framework-core",
|
||||
"testing-framework-runner-compose",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redis-streams-runtime-workloads"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"redis-streams-runtime-ext",
|
||||
"testing-framework-core",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.5.18"
|
||||
@ -2194,6 +2417,12 @@ dependencies = [
|
||||
"unsafe-libyaml",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sha1_smol"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d"
|
||||
|
||||
[[package]]
|
||||
name = "sha2"
|
||||
version = "0.10.9"
|
||||
@ -2205,6 +2434,15 @@ dependencies = [
|
||||
"digest",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sharded-slab"
|
||||
version = "0.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "shlex"
|
||||
version = "1.3.0"
|
||||
@ -2249,6 +2487,16 @@ version = "1.15.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03"
|
||||
|
||||
[[package]]
|
||||
name = "socket2"
|
||||
version = "0.5.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "socket2"
|
||||
version = "0.6.2"
|
||||
@ -2465,6 +2713,15 @@ dependencies = [
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thread_local"
|
||||
version = "1.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "time"
|
||||
version = "0.3.47"
|
||||
@ -2514,9 +2771,10 @@ dependencies = [
|
||||
"bytes",
|
||||
"libc",
|
||||
"mio",
|
||||
"parking_lot",
|
||||
"pin-project-lite",
|
||||
"signal-hook-registry",
|
||||
"socket2",
|
||||
"socket2 0.6.2",
|
||||
"tokio-macros",
|
||||
"windows-sys 0.61.2",
|
||||
]
|
||||
@ -2657,6 +2915,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a"
|
||||
dependencies = [
|
||||
"once_cell",
|
||||
"valuable",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-log"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
|
||||
dependencies = [
|
||||
"log",
|
||||
"once_cell",
|
||||
"tracing-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-subscriber"
|
||||
version = "0.3.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319"
|
||||
dependencies = [
|
||||
"matchers",
|
||||
"nu-ansi-term",
|
||||
"once_cell",
|
||||
"regex-automata",
|
||||
"sharded-slab",
|
||||
"smallvec",
|
||||
"thread_local",
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"tracing-log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -2737,6 +3025,12 @@ dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "valuable"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
|
||||
|
||||
[[package]]
|
||||
name = "vcpkg"
|
||||
version = "0.2.15"
|
||||
|
||||
@ -4,6 +4,13 @@ members = [
|
||||
"cfgsync/artifacts",
|
||||
"cfgsync/core",
|
||||
"cfgsync/runtime",
|
||||
"examples/metrics_counter/examples",
|
||||
"examples/metrics_counter/metrics-counter-node",
|
||||
"examples/metrics_counter/testing/integration",
|
||||
"examples/metrics_counter/testing/workloads",
|
||||
"examples/redis_streams/examples",
|
||||
"examples/redis_streams/testing/integration",
|
||||
"examples/redis_streams/testing/workloads",
|
||||
"testing-framework/core",
|
||||
"testing-framework/deployers/compose",
|
||||
"testing-framework/deployers/k8s",
|
||||
|
||||
24
examples/metrics_counter/Dockerfile
Normal file
24
examples/metrics_counter/Dockerfile
Normal file
@ -0,0 +1,24 @@
|
||||
FROM rustlang/rust:nightly-bookworm AS builder
|
||||
|
||||
WORKDIR /build
|
||||
|
||||
COPY Cargo.toml Cargo.lock ./
|
||||
COPY cfgsync/ ./cfgsync/
|
||||
COPY examples/ ./examples/
|
||||
COPY testing-framework/ ./testing-framework/
|
||||
|
||||
RUN cargo build --release -p metrics-counter-node
|
||||
|
||||
FROM debian:bookworm-slim
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -y ca-certificates && \
|
||||
rm -rf /var/lib/apt/lists/*
|
||||
|
||||
COPY --from=builder /build/target/release/metrics-counter-node /usr/local/bin/metrics-counter-node
|
||||
|
||||
RUN mkdir -p /etc/metrics-counter
|
||||
WORKDIR /app
|
||||
|
||||
ENTRYPOINT ["/usr/local/bin/metrics-counter-node"]
|
||||
CMD ["--config", "/etc/metrics-counter/config.yaml"]
|
||||
55
examples/metrics_counter/README.md
Normal file
55
examples/metrics_counter/README.md
Normal file
@ -0,0 +1,55 @@
|
||||
# Metrics Counter Example
|
||||
|
||||
This example runs a tiny counter service together with a Prometheus scraper.
|
||||
|
||||
The scenarios increment counters through the node API and then query Prometheus
|
||||
to check the aggregate result.
|
||||
|
||||
## How TF runs this
|
||||
|
||||
Each example follows the same pattern:
|
||||
|
||||
- TF starts the counter nodes and a Prometheus scraper
|
||||
- a workload drives counter increments through the node API
|
||||
- an expectation, or the manual scenario, checks the Prometheus query result
|
||||
|
||||
## API
|
||||
|
||||
Each node exposes:
|
||||
|
||||
- `POST /counter/inc` to increment the local counter
|
||||
- `GET /counter/value` to read the current counter value
|
||||
- `GET /metrics` for Prometheus scraping
|
||||
|
||||
## Scenarios
|
||||
|
||||
- `compose_prometheus_expectation` runs the app and Prometheus in Docker Compose, then checks the Prometheus query result
|
||||
- `k8s_prometheus_expectation` runs the same check on Kubernetes
|
||||
- `k8s_manual_prometheus` starts the nodes through the k8s manual cluster API, restarts one node, and checks the Prometheus aggregate again
|
||||
|
||||
## Run with Docker Compose
|
||||
|
||||
```bash
|
||||
LOGOS_BLOCKCHAIN_METRICS_QUERY_URL=http://127.0.0.1:19091 \
|
||||
cargo run -p metrics-counter-examples --bin compose_prometheus_expectation
|
||||
```
|
||||
|
||||
## Run with Kubernetes
|
||||
|
||||
```bash
|
||||
docker build -t metrics-counter-node:local -f examples/metrics_counter/Dockerfile .
|
||||
LOGOS_BLOCKCHAIN_METRICS_QUERY_URL=http://127.0.0.1:30991 \
|
||||
cargo run -p metrics-counter-examples --bin k8s_prometheus_expectation
|
||||
```
|
||||
|
||||
Overrides:
|
||||
- `METRICS_COUNTER_K8S_IMAGE` (falls back to `METRICS_COUNTER_IMAGE`, then `metrics-counter-node:local`)
|
||||
- `METRICS_COUNTER_K8S_PROMETHEUS_NODE_PORT` (defaults to `30991`)
|
||||
|
||||
## Run with Kubernetes manual cluster
|
||||
|
||||
```bash
|
||||
docker build -t metrics-counter-node:local -f examples/metrics_counter/Dockerfile .
|
||||
LOGOS_BLOCKCHAIN_METRICS_QUERY_URL=http://127.0.0.1:30991 \
|
||||
cargo run -p metrics-counter-examples --bin k8s_manual_prometheus
|
||||
```
|
||||
21
examples/metrics_counter/examples/Cargo.toml
Normal file
21
examples/metrics_counter/examples/Cargo.toml
Normal file
@ -0,0 +1,21 @@
|
||||
[package]
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
name = "metrics-counter-examples"
|
||||
version.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
metrics-counter-node = { path = "../metrics-counter-node" }
|
||||
metrics-counter-runtime-ext = { path = "../testing/integration" }
|
||||
metrics-counter-runtime-workloads = { path = "../testing/workloads" }
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
testing-framework-core = { workspace = true }
|
||||
testing-framework-runner-compose = { workspace = true }
|
||||
testing-framework-runner-k8s = { workspace = true }
|
||||
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
@ -0,0 +1,61 @@
|
||||
use std::{env, time::Duration};
|
||||
|
||||
use anyhow::{Context as _, Result};
|
||||
use metrics_counter_runtime_workloads::{
|
||||
CounterIncrementWorkload, MetricsCounterBuilderExt, MetricsCounterScenarioBuilder,
|
||||
MetricsCounterTopology, PrometheusCounterAtLeast,
|
||||
};
|
||||
use testing_framework_core::scenario::{Deployer, ObservabilityBuilderExt};
|
||||
use testing_framework_runner_compose::ComposeRunnerError;
|
||||
use tracing::{info, warn};
|
||||
|
||||
const DEFAULT_PROM_URL: &str = "http://127.0.0.1:19091";
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
.init();
|
||||
|
||||
let metrics_url = env::var("LOGOS_BLOCKCHAIN_METRICS_QUERY_URL")
|
||||
.ok()
|
||||
.filter(|value| !value.trim().is_empty())
|
||||
.unwrap_or_else(|| DEFAULT_PROM_URL.to_owned());
|
||||
|
||||
let mut scenario =
|
||||
MetricsCounterScenarioBuilder::deployment_with(|_| MetricsCounterTopology::new(3))
|
||||
.enable_observability()
|
||||
.with_metrics_query_url_str(&metrics_url)
|
||||
.with_run_duration(Duration::from_secs(20))
|
||||
.with_workload(
|
||||
CounterIncrementWorkload::new()
|
||||
.operations(300)
|
||||
.rate_per_sec(30),
|
||||
)
|
||||
.with_expectation(PrometheusCounterAtLeast::new(300.0))
|
||||
.build()?;
|
||||
|
||||
let deployer = metrics_counter_runtime_ext::MetricsCounterComposeDeployer::new();
|
||||
let runner = match deployer.deploy(&scenario).await {
|
||||
Ok(runner) => runner,
|
||||
Err(ComposeRunnerError::DockerUnavailable) => {
|
||||
warn!("docker unavailable; skipping compose metrics-counter run");
|
||||
return Ok(());
|
||||
}
|
||||
Err(error) => {
|
||||
return Err(anyhow::Error::new(error))
|
||||
.context("deploying metrics-counter compose stack");
|
||||
}
|
||||
};
|
||||
|
||||
info!(
|
||||
metrics_url,
|
||||
"running metrics-counter compose prometheus scenario"
|
||||
);
|
||||
runner
|
||||
.run(&mut scenario)
|
||||
.await
|
||||
.context("running metrics-counter compose scenario")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -0,0 +1,181 @@
|
||||
use std::{env, time::Duration};
|
||||
|
||||
use anyhow::{Context as _, Result, anyhow};
|
||||
use metrics_counter_node::MetricsCounterHttpClient;
|
||||
use metrics_counter_runtime_ext::{MetricsCounterK8sDeployer, MetricsCounterTopology};
|
||||
use reqwest::Url;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use testing_framework_runner_k8s::ManualClusterError;
|
||||
use tracing::{info, warn};
|
||||
|
||||
const DEFAULT_PROM_URL: &str = "http://127.0.0.1:30991";
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct IncrementRequest {}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct CounterView {
|
||||
value: u64,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct PrometheusQueryResponse {
|
||||
data: PrometheusData,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct PrometheusData {
|
||||
result: Vec<PrometheusSample>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct PrometheusSample {
|
||||
value: (serde_json::Value, String),
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
.init();
|
||||
|
||||
let metrics_url = env::var("LOGOS_BLOCKCHAIN_METRICS_QUERY_URL")
|
||||
.ok()
|
||||
.filter(|value| !value.trim().is_empty())
|
||||
.unwrap_or_else(|| DEFAULT_PROM_URL.to_owned());
|
||||
|
||||
let deployer = MetricsCounterK8sDeployer::new();
|
||||
let cluster = match deployer
|
||||
.manual_cluster_from_descriptors(MetricsCounterTopology::new(3))
|
||||
.await
|
||||
{
|
||||
Ok(cluster) => cluster,
|
||||
Err(ManualClusterError::ClientInit { source }) => {
|
||||
warn!("k8s unavailable ({source}); skipping metrics-counter k8s manual run");
|
||||
return Ok(());
|
||||
}
|
||||
Err(ManualClusterError::InstallStack { source })
|
||||
if k8s_cluster_unavailable(&source.to_string()) =>
|
||||
{
|
||||
warn!("k8s unavailable ({source}); skipping metrics-counter k8s manual run");
|
||||
return Ok(());
|
||||
}
|
||||
Err(error) => {
|
||||
return Err(anyhow::Error::new(error))
|
||||
.context("creating metrics-counter k8s manual cluster");
|
||||
}
|
||||
};
|
||||
|
||||
let node0 = cluster.start_node("node-0").await?.client;
|
||||
let node1 = cluster.start_node("node-1").await?.client;
|
||||
let node2 = cluster.start_node("node-2").await?.client;
|
||||
|
||||
cluster.wait_network_ready().await?;
|
||||
|
||||
increment_many(&node0, 40).await?;
|
||||
increment_many(&node1, 30).await?;
|
||||
increment_many(&node2, 20).await?;
|
||||
|
||||
wait_for_counter_value(&node0, 40).await?;
|
||||
wait_for_counter_value(&node1, 30).await?;
|
||||
wait_for_counter_value(&node2, 20).await?;
|
||||
wait_for_prometheus_sum(&metrics_url, 90.0).await?;
|
||||
|
||||
info!("restarting node-1 in manual cluster");
|
||||
cluster.restart_node("node-1").await?;
|
||||
cluster.wait_network_ready().await?;
|
||||
|
||||
let restarted_node1 = cluster
|
||||
.node_client("node-1")
|
||||
.ok_or_else(|| anyhow!("node-1 client missing after restart"))?;
|
||||
|
||||
wait_for_counter_value(&restarted_node1, 0).await?;
|
||||
|
||||
increment_many(&node0, 10).await?;
|
||||
increment_many(&restarted_node1, 5).await?;
|
||||
|
||||
wait_for_counter_value(&node0, 50).await?;
|
||||
wait_for_counter_value(&restarted_node1, 5).await?;
|
||||
wait_for_counter_value(&node2, 20).await?;
|
||||
wait_for_prometheus_sum(&metrics_url, 75.0).await?;
|
||||
|
||||
cluster.stop_all();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn increment_many(client: &MetricsCounterHttpClient, operations: usize) -> Result<()> {
|
||||
for _ in 0..operations {
|
||||
let _: CounterView = client
|
||||
.post("/counter/inc", &IncrementRequest {})
|
||||
.await
|
||||
.map_err(|error| anyhow!(error.to_string()))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn wait_for_counter_value(client: &MetricsCounterHttpClient, expected: u64) -> Result<()> {
|
||||
let deadline = tokio::time::Instant::now() + Duration::from_secs(20);
|
||||
|
||||
while tokio::time::Instant::now() < deadline {
|
||||
let view: CounterView = client
|
||||
.get("/counter/value")
|
||||
.await
|
||||
.map_err(|error| anyhow!(error.to_string()))?;
|
||||
if view.value == expected {
|
||||
return Ok(());
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(300)).await;
|
||||
}
|
||||
|
||||
Err(anyhow!("counter did not reach expected value {expected}"))
|
||||
}
|
||||
|
||||
async fn wait_for_prometheus_sum(metrics_url: &str, expected: f64) -> Result<()> {
|
||||
let base_url = Url::parse(metrics_url)?;
|
||||
let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
|
||||
|
||||
while tokio::time::Instant::now() < deadline {
|
||||
let total = query_prometheus_sum(&base_url).await?;
|
||||
if (total - expected).abs() < f64::EPSILON {
|
||||
info!(total, expected, "prometheus sum reached expected value");
|
||||
return Ok(());
|
||||
}
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
|
||||
Err(anyhow!(
|
||||
"prometheus sum did not reach expected value {expected}"
|
||||
))
|
||||
}
|
||||
|
||||
async fn query_prometheus_sum(base_url: &Url) -> Result<f64> {
|
||||
let client = reqwest::Client::new();
|
||||
let mut url = base_url.join("/api/v1/query")?;
|
||||
url.query_pairs_mut()
|
||||
.append_pair("query", "sum(metrics_counter_value)");
|
||||
|
||||
let response: PrometheusQueryResponse = client
|
||||
.get(url)
|
||||
.send()
|
||||
.await?
|
||||
.error_for_status()?
|
||||
.json()
|
||||
.await?;
|
||||
|
||||
let Some(sample) = response.data.result.first() else {
|
||||
return Ok(0.0);
|
||||
};
|
||||
|
||||
sample
|
||||
.value
|
||||
.1
|
||||
.parse()
|
||||
.map_err(|error| anyhow!("invalid prometheus value: {error}"))
|
||||
}
|
||||
|
||||
fn k8s_cluster_unavailable(message: &str) -> bool {
|
||||
message.contains("Unable to connect to the server")
|
||||
|| message.contains("TLS handshake timeout")
|
||||
|| message.contains("connection refused")
|
||||
}
|
||||
@ -0,0 +1,73 @@
|
||||
use std::{env, time::Duration};
|
||||
|
||||
use anyhow::{Context as _, Result};
|
||||
use metrics_counter_runtime_ext::MetricsCounterK8sDeployer;
|
||||
use metrics_counter_runtime_workloads::{
|
||||
CounterIncrementWorkload, MetricsCounterBuilderExt, MetricsCounterScenarioBuilder,
|
||||
MetricsCounterTopology, PrometheusCounterAtLeast,
|
||||
};
|
||||
use testing_framework_core::scenario::{Deployer, ObservabilityBuilderExt};
|
||||
use testing_framework_runner_k8s::K8sRunnerError;
|
||||
use tracing::{info, warn};
|
||||
|
||||
const DEFAULT_PROM_URL: &str = "http://127.0.0.1:30991";
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
.init();
|
||||
|
||||
let metrics_url = env::var("LOGOS_BLOCKCHAIN_METRICS_QUERY_URL")
|
||||
.ok()
|
||||
.filter(|value| !value.trim().is_empty())
|
||||
.unwrap_or_else(|| DEFAULT_PROM_URL.to_owned());
|
||||
|
||||
let mut scenario =
|
||||
MetricsCounterScenarioBuilder::deployment_with(|_| MetricsCounterTopology::new(3))
|
||||
.enable_observability()
|
||||
.with_metrics_query_url_str(&metrics_url)
|
||||
.with_run_duration(Duration::from_secs(25))
|
||||
.with_workload(
|
||||
CounterIncrementWorkload::new()
|
||||
.operations(240)
|
||||
.rate_per_sec(20),
|
||||
)
|
||||
.with_expectation(PrometheusCounterAtLeast::new(240.0))
|
||||
.build()?;
|
||||
|
||||
let deployer = MetricsCounterK8sDeployer::new();
|
||||
let runner = match deployer.deploy(&scenario).await {
|
||||
Ok(runner) => runner,
|
||||
Err(K8sRunnerError::ClientInit { source }) => {
|
||||
warn!("k8s unavailable ({source}); skipping metrics-counter k8s run");
|
||||
return Ok(());
|
||||
}
|
||||
Err(K8sRunnerError::InstallStack { source })
|
||||
if k8s_cluster_unavailable(&source.to_string()) =>
|
||||
{
|
||||
warn!("k8s unavailable ({source}); skipping metrics-counter k8s run");
|
||||
return Ok(());
|
||||
}
|
||||
Err(error) => {
|
||||
return Err(anyhow::Error::new(error)).context("deploying metrics-counter k8s stack");
|
||||
}
|
||||
};
|
||||
|
||||
info!(
|
||||
metrics_url,
|
||||
"running metrics-counter k8s prometheus scenario"
|
||||
);
|
||||
runner
|
||||
.run(&mut scenario)
|
||||
.await
|
||||
.context("running metrics-counter k8s scenario")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn k8s_cluster_unavailable(message: &str) -> bool {
|
||||
message.contains("Unable to connect to the server")
|
||||
|| message.contains("TLS handshake timeout")
|
||||
|| message.contains("connection refused")
|
||||
}
|
||||
22
examples/metrics_counter/metrics-counter-node/Cargo.toml
Normal file
22
examples/metrics_counter/metrics-counter-node/Cargo.toml
Normal file
@ -0,0 +1,22 @@
|
||||
[package]
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
name = "metrics-counter-node"
|
||||
version.workspace = true
|
||||
|
||||
[[bin]]
|
||||
name = "metrics-counter-node"
|
||||
path = "src/main.rs"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
axum = "0.7"
|
||||
clap = { version = "4.0", features = ["derive"] }
|
||||
prometheus = "0.14"
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
serde = { workspace = true }
|
||||
serde_yaml = { workspace = true }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
tower-http = { version = "0.6", features = ["trace"] }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
40
examples/metrics_counter/metrics-counter-node/src/client.rs
Normal file
40
examples/metrics_counter/metrics-counter-node/src/client.rs
Normal file
@ -0,0 +1,40 @@
|
||||
use reqwest::Url;
|
||||
use serde::Serialize;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct MetricsCounterHttpClient {
|
||||
base_url: Url,
|
||||
client: reqwest::Client,
|
||||
}
|
||||
|
||||
impl MetricsCounterHttpClient {
|
||||
#[must_use]
|
||||
pub fn new(base_url: Url) -> Self {
|
||||
Self {
|
||||
base_url,
|
||||
client: reqwest::Client::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get<T: serde::de::DeserializeOwned>(&self, path: &str) -> anyhow::Result<T> {
|
||||
let url = self.base_url.join(path)?;
|
||||
let response = self.client.get(url).send().await?.error_for_status()?;
|
||||
Ok(response.json().await?)
|
||||
}
|
||||
|
||||
pub async fn post<B: Serialize, T: serde::de::DeserializeOwned>(
|
||||
&self,
|
||||
path: &str,
|
||||
body: &B,
|
||||
) -> anyhow::Result<T> {
|
||||
let url = self.base_url.join(path)?;
|
||||
let response = self
|
||||
.client
|
||||
.post(url)
|
||||
.json(body)
|
||||
.send()
|
||||
.await?
|
||||
.error_for_status()?;
|
||||
Ok(response.json().await?)
|
||||
}
|
||||
}
|
||||
14
examples/metrics_counter/metrics-counter-node/src/config.rs
Normal file
14
examples/metrics_counter/metrics-counter-node/src/config.rs
Normal file
@ -0,0 +1,14 @@
|
||||
use clap::Parser;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct CounterConfig {
|
||||
pub node_id: u64,
|
||||
pub http_port: u16,
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
pub struct Cli {
|
||||
#[arg(long)]
|
||||
pub config: std::path::PathBuf,
|
||||
}
|
||||
3
examples/metrics_counter/metrics-counter-node/src/lib.rs
Normal file
3
examples/metrics_counter/metrics-counter-node/src/lib.rs
Normal file
@ -0,0 +1,3 @@
|
||||
pub mod client;
|
||||
|
||||
pub use client::MetricsCounterHttpClient;
|
||||
19
examples/metrics_counter/metrics-counter-node/src/main.rs
Normal file
19
examples/metrics_counter/metrics-counter-node/src/main.rs
Normal file
@ -0,0 +1,19 @@
|
||||
mod config;
|
||||
mod server;
|
||||
mod state;
|
||||
|
||||
use clap::Parser;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
.init();
|
||||
|
||||
let cli = config::Cli::parse();
|
||||
let raw = std::fs::read_to_string(cli.config)?;
|
||||
let config: config::CounterConfig = serde_yaml::from_str(&raw)?;
|
||||
let state = state::CounterState::new();
|
||||
|
||||
server::start_server(config, state).await
|
||||
}
|
||||
83
examples/metrics_counter/metrics-counter-node/src/server.rs
Normal file
83
examples/metrics_counter/metrics-counter-node/src/server.rs
Normal file
@ -0,0 +1,83 @@
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use axum::{
|
||||
Router,
|
||||
extract::State,
|
||||
http::StatusCode,
|
||||
response::{IntoResponse, Json, Response},
|
||||
routing::{get, post},
|
||||
};
|
||||
use prometheus::{Encoder, TextEncoder};
|
||||
use serde::Serialize;
|
||||
use tower_http::trace::TraceLayer;
|
||||
|
||||
use crate::{
|
||||
config::CounterConfig,
|
||||
state::{CounterState, CounterView},
|
||||
};
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct HealthResponse {
|
||||
status: &'static str,
|
||||
}
|
||||
|
||||
pub async fn start_server(config: CounterConfig, state: CounterState) -> anyhow::Result<()> {
|
||||
let app = Router::new()
|
||||
.route("/health/live", get(health_live))
|
||||
.route("/health/ready", get(health_ready))
|
||||
.route("/counter/inc", post(increment))
|
||||
.route("/counter/value", get(counter_value))
|
||||
.route("/metrics", get(metrics))
|
||||
.layer(TraceLayer::new_for_http())
|
||||
.with_state(state.clone());
|
||||
|
||||
let addr = SocketAddr::from(([0, 0, 0, 0], config.http_port));
|
||||
let listener = tokio::net::TcpListener::bind(addr).await?;
|
||||
|
||||
state.set_ready(true).await;
|
||||
tracing::info!(node_id = config.node_id, %addr, "metrics-counter node ready");
|
||||
|
||||
axum::serve(listener, app).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn health_live() -> (StatusCode, Json<HealthResponse>) {
|
||||
(StatusCode::OK, Json(HealthResponse { status: "alive" }))
|
||||
}
|
||||
|
||||
async fn health_ready(State(state): State<CounterState>) -> (StatusCode, Json<HealthResponse>) {
|
||||
if state.is_ready().await {
|
||||
(StatusCode::OK, Json(HealthResponse { status: "ready" }))
|
||||
} else {
|
||||
(
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
Json(HealthResponse {
|
||||
status: "not-ready",
|
||||
}),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
async fn increment(State(state): State<CounterState>) -> Json<CounterView> {
|
||||
Json(state.increment())
|
||||
}
|
||||
|
||||
async fn counter_value(State(state): State<CounterState>) -> Json<CounterView> {
|
||||
Json(state.view())
|
||||
}
|
||||
|
||||
async fn metrics() -> Response {
|
||||
let metric_families = prometheus::gather();
|
||||
let mut buffer = Vec::new();
|
||||
let encoder = TextEncoder::new();
|
||||
if encoder.encode(&metric_families, &mut buffer).is_err() {
|
||||
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
|
||||
}
|
||||
|
||||
(
|
||||
StatusCode::OK,
|
||||
[("content-type", encoder.format_type().to_string())],
|
||||
buffer,
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
64
examples/metrics_counter/metrics-counter-node/src/state.rs
Normal file
64
examples/metrics_counter/metrics-counter-node/src/state.rs
Normal file
@ -0,0 +1,64 @@
|
||||
use std::sync::{
|
||||
Arc,
|
||||
atomic::{AtomicU64, Ordering},
|
||||
};
|
||||
|
||||
use prometheus::{IntCounter, IntGauge};
|
||||
use serde::Serialize;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct CounterView {
|
||||
pub value: u64,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct CounterState {
|
||||
ready: Arc<RwLock<bool>>,
|
||||
value: Arc<AtomicU64>,
|
||||
increments_total: IntCounter,
|
||||
counter_value: IntGauge,
|
||||
}
|
||||
|
||||
impl CounterState {
|
||||
pub fn new() -> Self {
|
||||
let increments_total = IntCounter::new(
|
||||
"metrics_counter_increments_total",
|
||||
"Total increment operations",
|
||||
)
|
||||
.expect("metrics_counter_increments_total");
|
||||
let counter_value =
|
||||
IntGauge::new("metrics_counter_value", "Current counter value").expect("counter gauge");
|
||||
|
||||
let _ = prometheus::register(Box::new(increments_total.clone()));
|
||||
let _ = prometheus::register(Box::new(counter_value.clone()));
|
||||
|
||||
Self {
|
||||
ready: Arc::new(RwLock::new(false)),
|
||||
value: Arc::new(AtomicU64::new(0)),
|
||||
increments_total,
|
||||
counter_value,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn set_ready(&self, value: bool) {
|
||||
*self.ready.write().await = value;
|
||||
}
|
||||
|
||||
pub async fn is_ready(&self) -> bool {
|
||||
*self.ready.read().await
|
||||
}
|
||||
|
||||
pub fn increment(&self) -> CounterView {
|
||||
let value = self.value.fetch_add(1, Ordering::SeqCst) + 1;
|
||||
self.increments_total.inc();
|
||||
self.counter_value.set(value as i64);
|
||||
CounterView { value }
|
||||
}
|
||||
|
||||
pub fn view(&self) -> CounterView {
|
||||
CounterView {
|
||||
value: self.value.load(Ordering::SeqCst),
|
||||
}
|
||||
}
|
||||
}
|
||||
18
examples/metrics_counter/testing/integration/Cargo.toml
Normal file
18
examples/metrics_counter/testing/integration/Cargo.toml
Normal file
@ -0,0 +1,18 @@
|
||||
[package]
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
name = "metrics-counter-runtime-ext"
|
||||
version.workspace = true
|
||||
|
||||
[dependencies]
|
||||
testing-framework-core = { workspace = true }
|
||||
testing-framework-runner-compose = { workspace = true }
|
||||
testing-framework-runner-k8s = { workspace = true }
|
||||
testing-framework-runner-local = { workspace = true }
|
||||
|
||||
anyhow = "1.0"
|
||||
async-trait = { workspace = true }
|
||||
metrics-counter-node = { path = "../../metrics-counter-node" }
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
serde = { workspace = true }
|
||||
serde_yaml = { workspace = true }
|
||||
49
examples/metrics_counter/testing/integration/src/app.rs
Normal file
49
examples/metrics_counter/testing/integration/src/app.rs
Normal file
@ -0,0 +1,49 @@
|
||||
use async_trait::async_trait;
|
||||
use metrics_counter_node::MetricsCounterHttpClient;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use testing_framework_core::{
|
||||
cfgsync::{StaticNodeConfigProvider, serialize_yaml_config},
|
||||
scenario::{Application, DynError, NodeAccess},
|
||||
};
|
||||
|
||||
pub type MetricsCounterTopology = testing_framework_core::topology::ClusterTopology;
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct MetricsCounterNodeConfig {
|
||||
pub node_id: u64,
|
||||
pub http_port: u16,
|
||||
}
|
||||
|
||||
pub struct MetricsCounterEnv;
|
||||
|
||||
#[async_trait]
|
||||
impl Application for MetricsCounterEnv {
|
||||
type Deployment = MetricsCounterTopology;
|
||||
type NodeClient = MetricsCounterHttpClient;
|
||||
type NodeConfig = MetricsCounterNodeConfig;
|
||||
fn build_node_client(access: &NodeAccess) -> Result<Self::NodeClient, DynError> {
|
||||
Ok(MetricsCounterHttpClient::new(access.api_base_url()?))
|
||||
}
|
||||
|
||||
fn node_readiness_path() -> &'static str {
|
||||
"/health/ready"
|
||||
}
|
||||
}
|
||||
|
||||
impl StaticNodeConfigProvider for MetricsCounterEnv {
|
||||
type Error = serde_yaml::Error;
|
||||
|
||||
fn build_node_config(
|
||||
_deployment: &Self::Deployment,
|
||||
node_index: usize,
|
||||
) -> Result<Self::NodeConfig, Self::Error> {
|
||||
Ok(MetricsCounterNodeConfig {
|
||||
node_id: node_index as u64,
|
||||
http_port: 8080,
|
||||
})
|
||||
}
|
||||
|
||||
fn serialize_node_config(config: &Self::NodeConfig) -> Result<String, Self::Error> {
|
||||
serialize_yaml_config(config)
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,94 @@
|
||||
use std::{env, fs, path::Path};
|
||||
|
||||
use reqwest::Url;
|
||||
use testing_framework_core::scenario::DynError;
|
||||
use testing_framework_runner_compose::{
|
||||
BinaryConfigNodeSpec, ComposeBinaryApp, EnvEntry, NodeDescriptor,
|
||||
};
|
||||
|
||||
use crate::MetricsCounterEnv;
|
||||
|
||||
const NODE_CONFIG_PATH: &str = "/etc/metrics-counter/config.yaml";
|
||||
const PROM_CONFIG_MOUNT_PATH: &str = "/etc/prometheus/prometheus.yml:ro";
|
||||
const PROMETHEUS_SERVICE_NAME: &str = "prometheus";
|
||||
const PROMETHEUS_CONTAINER_PORT: u16 = 9090;
|
||||
const DEFAULT_PROMETHEUS_QUERY_PORT: u16 = 19091;
|
||||
|
||||
impl ComposeBinaryApp for MetricsCounterEnv {
|
||||
fn compose_node_spec() -> BinaryConfigNodeSpec {
|
||||
BinaryConfigNodeSpec::conventional(
|
||||
"/usr/local/bin/metrics-counter-node",
|
||||
NODE_CONFIG_PATH,
|
||||
vec![8080, 8081],
|
||||
)
|
||||
}
|
||||
|
||||
fn prepare_compose_workspace(
|
||||
path: &Path,
|
||||
topology: &<Self as testing_framework_core::scenario::Application>::Deployment,
|
||||
_metrics_otlp_ingest_url: Option<&Url>,
|
||||
) -> Result<(), DynError> {
|
||||
if prometheus_query_port().is_none() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let stack_dir = path
|
||||
.parent()
|
||||
.ok_or_else(|| anyhow::anyhow!("cfgsync path has no parent"))?;
|
||||
let configs_dir = stack_dir.join("configs");
|
||||
fs::create_dir_all(&configs_dir)?;
|
||||
fs::write(
|
||||
configs_dir.join("prometheus.yml"),
|
||||
render_prometheus_config(topology),
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn compose_extra_services(
|
||||
_topology: &<Self as testing_framework_core::scenario::Application>::Deployment,
|
||||
) -> Result<Vec<NodeDescriptor>, DynError> {
|
||||
let Some(prom_port) = prometheus_query_port() else {
|
||||
return Ok(Vec::new());
|
||||
};
|
||||
|
||||
Ok(vec![NodeDescriptor::new(
|
||||
PROMETHEUS_SERVICE_NAME,
|
||||
"prom/prometheus:v2.54.1",
|
||||
vec![
|
||||
"/bin/prometheus".to_owned(),
|
||||
"--config.file=/etc/prometheus/prometheus.yml".to_owned(),
|
||||
"--web.listen-address=0.0.0.0:9090".to_owned(),
|
||||
],
|
||||
vec![prometheus_config_volume()],
|
||||
vec![],
|
||||
vec![format!("127.0.0.1:{prom_port}:{PROMETHEUS_CONTAINER_PORT}")],
|
||||
vec![PROMETHEUS_CONTAINER_PORT],
|
||||
vec![EnvEntry::new("TZ", "UTC")],
|
||||
None,
|
||||
)])
|
||||
}
|
||||
}
|
||||
|
||||
fn prometheus_config_volume() -> String {
|
||||
format!("./stack/configs/prometheus.yml:{PROM_CONFIG_MOUNT_PATH}")
|
||||
}
|
||||
|
||||
fn prometheus_query_port() -> Option<u16> {
|
||||
let raw = env::var("LOGOS_BLOCKCHAIN_METRICS_QUERY_URL")
|
||||
.unwrap_or_else(|_| format!("http://127.0.0.1:{DEFAULT_PROMETHEUS_QUERY_PORT}"));
|
||||
let parsed = Url::parse(raw.trim()).ok()?;
|
||||
parsed
|
||||
.port_or_known_default()
|
||||
.or(Some(DEFAULT_PROMETHEUS_QUERY_PORT))
|
||||
}
|
||||
|
||||
fn render_prometheus_config(topology: &crate::MetricsCounterTopology) -> String {
|
||||
let targets = (0..topology.node_count)
|
||||
.map(|index| format!("\"node-{index}:8080\""))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
|
||||
format!(
|
||||
"global:\n scrape_interval: 1s\nscrape_configs:\n - job_name: metrics_counter\n metrics_path: /metrics\n static_configs:\n - targets: [{targets}]\n"
|
||||
)
|
||||
}
|
||||
183
examples/metrics_counter/testing/integration/src/k8s_env.rs
Normal file
183
examples/metrics_counter/testing/integration/src/k8s_env.rs
Normal file
@ -0,0 +1,183 @@
|
||||
use std::{collections::BTreeMap, env};
|
||||
|
||||
use testing_framework_core::scenario::DynError;
|
||||
use testing_framework_runner_k8s::{
|
||||
BinaryConfigK8sSpec, HelmManifest, K8sBinaryApp,
|
||||
k8s_openapi::{
|
||||
api::{
|
||||
apps::v1::{Deployment, DeploymentSpec},
|
||||
core::v1::{
|
||||
ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, PodSpec,
|
||||
PodTemplateSpec, Service, ServicePort, ServiceSpec, Volume, VolumeMount,
|
||||
},
|
||||
},
|
||||
apimachinery::pkg::{
|
||||
apis::meta::v1::{LabelSelector, ObjectMeta},
|
||||
util::intstr::IntOrString,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
use crate::MetricsCounterEnv;
|
||||
|
||||
const CHART_NAME: &str = "metrics-counter";
|
||||
const NODE_NAME_PREFIX: &str = "metrics-counter-node";
|
||||
const NODE_CONFIG_PATH: &str = "/etc/metrics-counter/config.yaml";
|
||||
const CONTAINER_HTTP_PORT: u16 = 8080;
|
||||
const SERVICE_TESTING_PORT: u16 = 8081;
|
||||
const PROMETHEUS_SERVICE_NAME: &str = "metrics-counter-prometheus";
|
||||
const PROMETHEUS_CONTAINER_PORT: u16 = 9090;
|
||||
const DEFAULT_PROMETHEUS_NODE_PORT: u16 = 30991;
|
||||
|
||||
impl K8sBinaryApp for MetricsCounterEnv {
|
||||
fn k8s_binary_spec() -> BinaryConfigK8sSpec {
|
||||
BinaryConfigK8sSpec::conventional(
|
||||
CHART_NAME,
|
||||
NODE_NAME_PREFIX,
|
||||
"/usr/local/bin/metrics-counter-node",
|
||||
NODE_CONFIG_PATH,
|
||||
CONTAINER_HTTP_PORT,
|
||||
SERVICE_TESTING_PORT,
|
||||
)
|
||||
}
|
||||
|
||||
fn extend_k8s_manifest(
|
||||
topology: &Self::Deployment,
|
||||
manifest: &mut HelmManifest,
|
||||
) -> Result<(), DynError> {
|
||||
manifest.extend(render_prometheus_assets(topology)?);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn render_prometheus_assets(
|
||||
topology: &crate::MetricsCounterTopology,
|
||||
) -> Result<HelmManifest, DynError> {
|
||||
let mut manifest = HelmManifest::new();
|
||||
manifest.push_yaml(&prometheus_config_map(topology))?;
|
||||
manifest.push_yaml(&prometheus_deployment())?;
|
||||
manifest.push_yaml(&prometheus_service())?;
|
||||
Ok(manifest)
|
||||
}
|
||||
|
||||
fn prometheus_config_map(topology: &crate::MetricsCounterTopology) -> ConfigMap {
|
||||
let mut data = BTreeMap::new();
|
||||
data.insert(
|
||||
"prometheus.yml".to_owned(),
|
||||
render_prometheus_config(topology),
|
||||
);
|
||||
|
||||
ConfigMap {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(prometheus_config_name()),
|
||||
..Default::default()
|
||||
},
|
||||
data: Some(data),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn prometheus_deployment() -> Deployment {
|
||||
let labels = prometheus_labels();
|
||||
|
||||
Deployment {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(PROMETHEUS_SERVICE_NAME.to_owned()),
|
||||
..Default::default()
|
||||
},
|
||||
spec: Some(DeploymentSpec {
|
||||
replicas: Some(1),
|
||||
selector: LabelSelector {
|
||||
match_labels: Some(labels.clone()),
|
||||
..Default::default()
|
||||
},
|
||||
template: PodTemplateSpec {
|
||||
metadata: Some(ObjectMeta {
|
||||
labels: Some(labels),
|
||||
..Default::default()
|
||||
}),
|
||||
spec: Some(PodSpec {
|
||||
containers: vec![Container {
|
||||
name: "prometheus".to_owned(),
|
||||
image: Some("prom/prometheus:v2.54.1".to_owned()),
|
||||
args: Some(vec![
|
||||
"--config.file=/etc/prometheus/prometheus.yml".to_owned(),
|
||||
format!("--web.listen-address=0.0.0.0:{PROMETHEUS_CONTAINER_PORT}"),
|
||||
]),
|
||||
ports: Some(vec![ContainerPort {
|
||||
container_port: i32::from(PROMETHEUS_CONTAINER_PORT),
|
||||
..Default::default()
|
||||
}]),
|
||||
volume_mounts: Some(vec![VolumeMount {
|
||||
name: "config".to_owned(),
|
||||
mount_path: "/etc/prometheus/prometheus.yml".to_owned(),
|
||||
sub_path: Some("prometheus.yml".to_owned()),
|
||||
..Default::default()
|
||||
}]),
|
||||
..Default::default()
|
||||
}],
|
||||
volumes: Some(vec![Volume {
|
||||
name: "config".to_owned(),
|
||||
config_map: Some(ConfigMapVolumeSource {
|
||||
name: prometheus_config_name(),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
}]),
|
||||
..Default::default()
|
||||
}),
|
||||
},
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn prometheus_service() -> Service {
|
||||
Service {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(PROMETHEUS_SERVICE_NAME.to_owned()),
|
||||
..Default::default()
|
||||
},
|
||||
spec: Some(ServiceSpec {
|
||||
selector: Some(prometheus_labels()),
|
||||
type_: Some("NodePort".to_owned()),
|
||||
ports: Some(vec![ServicePort {
|
||||
name: Some("http".to_owned()),
|
||||
port: i32::from(PROMETHEUS_CONTAINER_PORT),
|
||||
target_port: Some(IntOrString::Int(i32::from(PROMETHEUS_CONTAINER_PORT))),
|
||||
node_port: Some(i32::from(prometheus_query_port())),
|
||||
protocol: Some("TCP".to_owned()),
|
||||
..Default::default()
|
||||
}]),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn render_prometheus_config(topology: &crate::MetricsCounterTopology) -> String {
|
||||
let targets = (0..topology.node_count)
|
||||
.map(|index| format!("\"{NODE_NAME_PREFIX}-{index}:{CONTAINER_HTTP_PORT}\""))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
|
||||
format!(
|
||||
"global:\n scrape_interval: 1s\nscrape_configs:\n - job_name: metrics_counter\n metrics_path: /metrics\n static_configs:\n - targets: [{targets}]\n"
|
||||
)
|
||||
}
|
||||
|
||||
fn prometheus_query_port() -> u16 {
|
||||
env::var("METRICS_COUNTER_K8S_PROMETHEUS_NODE_PORT")
|
||||
.ok()
|
||||
.and_then(|value| value.trim().parse().ok())
|
||||
.unwrap_or(DEFAULT_PROMETHEUS_NODE_PORT)
|
||||
}
|
||||
|
||||
fn prometheus_labels() -> BTreeMap<String, String> {
|
||||
BTreeMap::from([("app".to_owned(), PROMETHEUS_SERVICE_NAME.to_owned())])
|
||||
}
|
||||
|
||||
fn prometheus_config_name() -> String {
|
||||
format!("{PROMETHEUS_SERVICE_NAME}-config")
|
||||
}
|
||||
12
examples/metrics_counter/testing/integration/src/lib.rs
Normal file
12
examples/metrics_counter/testing/integration/src/lib.rs
Normal file
@ -0,0 +1,12 @@
|
||||
mod app;
|
||||
mod compose_env;
|
||||
mod k8s_env;
|
||||
mod local_env;
|
||||
pub mod scenario;
|
||||
|
||||
pub use app::*;
|
||||
pub use scenario::{MetricsCounterBuilderExt, MetricsCounterScenarioBuilder};
|
||||
|
||||
pub type MetricsCounterComposeDeployer =
|
||||
testing_framework_runner_compose::ComposeDeployer<MetricsCounterEnv>;
|
||||
pub type MetricsCounterK8sDeployer = testing_framework_runner_k8s::K8sDeployer<MetricsCounterEnv>;
|
||||
@ -0,0 +1,44 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use testing_framework_core::scenario::{DynError, StartNodeOptions};
|
||||
use testing_framework_runner_local::{
|
||||
LocalBinaryApp, LocalNodePorts, LocalPeerNode, LocalProcessSpec, yaml_node_config,
|
||||
};
|
||||
|
||||
use crate::{MetricsCounterEnv, MetricsCounterNodeConfig};
|
||||
|
||||
impl LocalBinaryApp for MetricsCounterEnv {
|
||||
fn initial_node_name_prefix() -> &'static str {
|
||||
"metrics-counter-node"
|
||||
}
|
||||
|
||||
fn build_local_node_config_with_peers(
|
||||
_topology: &Self::Deployment,
|
||||
index: usize,
|
||||
ports: &LocalNodePorts,
|
||||
_peers: &[LocalPeerNode],
|
||||
_peer_ports_by_name: &HashMap<String, u16>,
|
||||
_options: &StartNodeOptions<Self>,
|
||||
_template_config: Option<
|
||||
&<Self as testing_framework_core::scenario::Application>::NodeConfig,
|
||||
>,
|
||||
) -> Result<<Self as testing_framework_core::scenario::Application>::NodeConfig, DynError> {
|
||||
Ok(MetricsCounterNodeConfig {
|
||||
node_id: index as u64,
|
||||
http_port: ports.network_port(),
|
||||
})
|
||||
}
|
||||
|
||||
fn local_process_spec() -> LocalProcessSpec {
|
||||
LocalProcessSpec::new("METRICS_COUNTER_NODE_BIN", "metrics-counter-node")
|
||||
.with_rust_log("metrics_counter_node=info")
|
||||
}
|
||||
|
||||
fn render_local_config(config: &MetricsCounterNodeConfig) -> Result<Vec<u8>, DynError> {
|
||||
yaml_node_config(config)
|
||||
}
|
||||
|
||||
fn http_api_port(config: &MetricsCounterNodeConfig) -> u16 {
|
||||
config.http_port
|
||||
}
|
||||
}
|
||||
15
examples/metrics_counter/testing/integration/src/scenario.rs
Normal file
15
examples/metrics_counter/testing/integration/src/scenario.rs
Normal file
@ -0,0 +1,15 @@
|
||||
use testing_framework_core::scenario::ScenarioBuilder;
|
||||
|
||||
use crate::{MetricsCounterEnv, MetricsCounterTopology};
|
||||
|
||||
pub type MetricsCounterScenarioBuilder = ScenarioBuilder<MetricsCounterEnv>;
|
||||
|
||||
pub trait MetricsCounterBuilderExt: Sized {
|
||||
fn deployment_with(f: impl FnOnce(MetricsCounterTopology) -> MetricsCounterTopology) -> Self;
|
||||
}
|
||||
|
||||
impl MetricsCounterBuilderExt for MetricsCounterScenarioBuilder {
|
||||
fn deployment_with(f: impl FnOnce(MetricsCounterTopology) -> MetricsCounterTopology) -> Self {
|
||||
MetricsCounterScenarioBuilder::with_deployment(f(MetricsCounterTopology::new(3)))
|
||||
}
|
||||
}
|
||||
15
examples/metrics_counter/testing/workloads/Cargo.toml
Normal file
15
examples/metrics_counter/testing/workloads/Cargo.toml
Normal file
@ -0,0 +1,15 @@
|
||||
[package]
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
name = "metrics-counter-runtime-workloads"
|
||||
version.workspace = true
|
||||
|
||||
[dependencies]
|
||||
metrics-counter-runtime-ext = { path = "../integration" }
|
||||
testing-framework-core = { workspace = true }
|
||||
|
||||
async-trait = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
tracing = { workspace = true }
|
||||
@ -0,0 +1,50 @@
|
||||
use async_trait::async_trait;
|
||||
use metrics_counter_runtime_ext::MetricsCounterEnv;
|
||||
use testing_framework_core::scenario::{DynError, Expectation, RunContext};
|
||||
use tracing::info;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PrometheusCounterAtLeast {
|
||||
min_total: f64,
|
||||
}
|
||||
|
||||
impl PrometheusCounterAtLeast {
|
||||
#[must_use]
|
||||
pub const fn new(min_total: f64) -> Self {
|
||||
Self { min_total }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Expectation<MetricsCounterEnv> for PrometheusCounterAtLeast {
|
||||
fn name(&self) -> &str {
|
||||
"prometheus_counter_at_least"
|
||||
}
|
||||
|
||||
async fn evaluate(&mut self, ctx: &RunContext<MetricsCounterEnv>) -> Result<(), DynError> {
|
||||
if !ctx.telemetry().is_configured() {
|
||||
return Err(
|
||||
"prometheus endpoint unavailable; set LOGOS_BLOCKCHAIN_METRICS_QUERY_URL".into(),
|
||||
);
|
||||
}
|
||||
|
||||
let total = ctx
|
||||
.telemetry()
|
||||
.counter_value("sum(metrics_counter_increments_total)")?;
|
||||
|
||||
if total < self.min_total {
|
||||
return Err(format!(
|
||||
"metrics_counter_increments_total below threshold: total={total}, min={}",
|
||||
self.min_total
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
info!(
|
||||
total,
|
||||
min = self.min_total,
|
||||
"prometheus counter threshold met"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
83
examples/metrics_counter/testing/workloads/src/increment.rs
Normal file
83
examples/metrics_counter/testing/workloads/src/increment.rs
Normal file
@ -0,0 +1,83 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use metrics_counter_runtime_ext::MetricsCounterEnv;
|
||||
use serde::Serialize;
|
||||
use testing_framework_core::scenario::{DynError, RunContext, Workload};
|
||||
use tracing::info;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct CounterIncrementWorkload {
|
||||
operations: usize,
|
||||
rate_per_sec: Option<usize>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct IncrementRequest {}
|
||||
|
||||
impl CounterIncrementWorkload {
|
||||
#[must_use]
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
operations: 200,
|
||||
rate_per_sec: Some(25),
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub const fn operations(mut self, value: usize) -> Self {
|
||||
self.operations = value;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub const fn rate_per_sec(mut self, value: usize) -> Self {
|
||||
self.rate_per_sec = Some(value);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CounterIncrementWorkload {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Workload<MetricsCounterEnv> for CounterIncrementWorkload {
|
||||
fn name(&self) -> &str {
|
||||
"counter_increment_workload"
|
||||
}
|
||||
|
||||
async fn start(&self, ctx: &RunContext<MetricsCounterEnv>) -> Result<(), DynError> {
|
||||
let clients = ctx.node_clients().snapshot();
|
||||
if clients.is_empty() {
|
||||
return Err("no metrics-counter node clients available".into());
|
||||
}
|
||||
|
||||
let interval = self.rate_per_sec.and_then(compute_interval);
|
||||
info!(operations = self.operations, rate_per_sec = ?self.rate_per_sec, "starting counter increment workload");
|
||||
|
||||
for idx in 0..self.operations {
|
||||
let client = &clients[idx % clients.len()];
|
||||
let _: serde_json::Value = client.post("/counter/inc", &IncrementRequest {}).await?;
|
||||
|
||||
if (idx + 1) % 50 == 0 {
|
||||
info!(completed = idx + 1, "counter increment progress");
|
||||
}
|
||||
|
||||
if let Some(delay) = interval {
|
||||
tokio::time::sleep(delay).await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn compute_interval(rate_per_sec: usize) -> Option<Duration> {
|
||||
if rate_per_sec == 0 {
|
||||
return None;
|
||||
}
|
||||
Some(Duration::from_millis((1000 / rate_per_sec as u64).max(1)))
|
||||
}
|
||||
9
examples/metrics_counter/testing/workloads/src/lib.rs
Normal file
9
examples/metrics_counter/testing/workloads/src/lib.rs
Normal file
@ -0,0 +1,9 @@
|
||||
mod expectations;
|
||||
mod increment;
|
||||
|
||||
pub use expectations::PrometheusCounterAtLeast;
|
||||
pub use increment::CounterIncrementWorkload;
|
||||
pub use metrics_counter_runtime_ext::{
|
||||
MetricsCounterBuilderExt, MetricsCounterEnv, MetricsCounterScenarioBuilder,
|
||||
MetricsCounterTopology,
|
||||
};
|
||||
35
examples/redis_streams/README.md
Normal file
35
examples/redis_streams/README.md
Normal file
@ -0,0 +1,35 @@
|
||||
# Redis Streams Example
|
||||
|
||||
This example uses Redis Streams consumer groups.
|
||||
|
||||
There are two scenarios. One produces messages, consumes them, acknowledges
|
||||
them, and checks that the pending queue drains to zero. The other leaves
|
||||
messages pending on one consumer and has a second consumer reclaim them with
|
||||
`XAUTOCLAIM`.
|
||||
|
||||
This example is compose-only.
|
||||
|
||||
## How TF runs this
|
||||
|
||||
Each example follows the same pattern:
|
||||
|
||||
- TF starts Redis in Docker Compose
|
||||
- a workload drives stream and consumer-group operations through the Redis client
|
||||
- an expectation checks health and pending-message state
|
||||
|
||||
## Scenarios
|
||||
|
||||
- `compose_roundtrip` covers stream setup, production, consumer-group reads, acknowledgements, and pending drain
|
||||
- `compose_failover` simulates pending-message reclaim after one consumer stops acknowledging
|
||||
|
||||
## Run with Docker Compose
|
||||
|
||||
```bash
|
||||
cargo run -p redis-streams-examples --bin compose_roundtrip
|
||||
```
|
||||
|
||||
## Run the reclaim scenario
|
||||
|
||||
```bash
|
||||
cargo run -p redis-streams-examples --bin compose_failover
|
||||
```
|
||||
15
examples/redis_streams/examples/Cargo.toml
Normal file
15
examples/redis_streams/examples/Cargo.toml
Normal file
@ -0,0 +1,15 @@
|
||||
[package]
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
name = "redis-streams-examples"
|
||||
version.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
redis-streams-runtime-ext = { path = "../testing/integration" }
|
||||
redis-streams-runtime-workloads = { path = "../testing/workloads" }
|
||||
testing-framework-core = { workspace = true }
|
||||
testing-framework-runner-compose = { workspace = true }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
tracing = { workspace = true }
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
49
examples/redis_streams/examples/src/bin/compose_failover.rs
Normal file
49
examples/redis_streams/examples/src/bin/compose_failover.rs
Normal file
@ -0,0 +1,49 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{Context as _, Result};
|
||||
use redis_streams_runtime_ext::RedisStreamsComposeDeployer;
|
||||
use redis_streams_runtime_workloads::{
|
||||
RedisStreamsBuilderExt, RedisStreamsClusterHealthy, RedisStreamsReclaimFailoverWorkload,
|
||||
RedisStreamsScenarioBuilder,
|
||||
};
|
||||
use testing_framework_core::scenario::Deployer;
|
||||
use testing_framework_runner_compose::ComposeRunnerError;
|
||||
use tracing::{info, warn};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
.init();
|
||||
|
||||
let mut scenario = RedisStreamsScenarioBuilder::deployment_with(|topology| topology)
|
||||
.with_run_duration(Duration::from_secs(30))
|
||||
.with_workload(
|
||||
RedisStreamsReclaimFailoverWorkload::new("tf-stream", "tf-group")
|
||||
.messages(300)
|
||||
.batch(64),
|
||||
)
|
||||
.with_expectation(RedisStreamsClusterHealthy::new())
|
||||
.build()?;
|
||||
|
||||
let deployer = RedisStreamsComposeDeployer::new();
|
||||
let runner = match deployer.deploy(&scenario).await {
|
||||
Ok(runner) => runner,
|
||||
Err(ComposeRunnerError::DockerUnavailable) => {
|
||||
warn!("docker unavailable; skipping redis streams compose failover run");
|
||||
return Ok(());
|
||||
}
|
||||
Err(error) => {
|
||||
return Err(anyhow::Error::new(error))
|
||||
.context("deploying redis streams compose failover stack");
|
||||
}
|
||||
};
|
||||
|
||||
info!("running redis streams compose failover scenario");
|
||||
runner
|
||||
.run(&mut scenario)
|
||||
.await
|
||||
.context("running redis streams compose failover scenario")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
48
examples/redis_streams/examples/src/bin/compose_roundtrip.rs
Normal file
48
examples/redis_streams/examples/src/bin/compose_roundtrip.rs
Normal file
@ -0,0 +1,48 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{Context as _, Result};
|
||||
use redis_streams_runtime_ext::RedisStreamsComposeDeployer;
|
||||
use redis_streams_runtime_workloads::{
|
||||
RedisStreamsBuilderExt, RedisStreamsClusterHealthy, RedisStreamsRoundTripWorkload,
|
||||
RedisStreamsScenarioBuilder,
|
||||
};
|
||||
use testing_framework_core::scenario::Deployer;
|
||||
use testing_framework_runner_compose::ComposeRunnerError;
|
||||
use tracing::{info, warn};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
|
||||
.init();
|
||||
|
||||
let mut scenario = RedisStreamsScenarioBuilder::deployment_with(|topology| topology)
|
||||
.with_run_duration(Duration::from_secs(30))
|
||||
.with_workload(
|
||||
RedisStreamsRoundTripWorkload::new("tf-stream", "tf-group")
|
||||
.messages(300)
|
||||
.read_batch(50),
|
||||
)
|
||||
.with_expectation(RedisStreamsClusterHealthy::new())
|
||||
.build()?;
|
||||
|
||||
let deployer = RedisStreamsComposeDeployer::new();
|
||||
let runner = match deployer.deploy(&scenario).await {
|
||||
Ok(runner) => runner,
|
||||
Err(ComposeRunnerError::DockerUnavailable) => {
|
||||
warn!("docker unavailable; skipping redis streams compose run");
|
||||
return Ok(());
|
||||
}
|
||||
Err(error) => {
|
||||
return Err(anyhow::Error::new(error)).context("deploying redis streams compose stack");
|
||||
}
|
||||
};
|
||||
|
||||
info!("running redis streams compose roundtrip scenario");
|
||||
runner
|
||||
.run(&mut scenario)
|
||||
.await
|
||||
.context("running redis streams compose scenario")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
13
examples/redis_streams/testing/integration/Cargo.toml
Normal file
13
examples/redis_streams/testing/integration/Cargo.toml
Normal file
@ -0,0 +1,13 @@
|
||||
[package]
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
name = "redis-streams-runtime-ext"
|
||||
version.workspace = true
|
||||
|
||||
[dependencies]
|
||||
testing-framework-core = { workspace = true }
|
||||
testing-framework-runner-compose = { workspace = true }
|
||||
|
||||
async-trait = { workspace = true }
|
||||
redis = { version = "0.29", features = ["aio", "connection-manager", "streams", "tokio-comp"] }
|
||||
reqwest = { workspace = true }
|
||||
172
examples/redis_streams/testing/integration/src/app.rs
Normal file
172
examples/redis_streams/testing/integration/src/app.rs
Normal file
@ -0,0 +1,172 @@
|
||||
use async_trait::async_trait;
|
||||
use redis::{
|
||||
AsyncCommands, Client,
|
||||
aio::ConnectionManager,
|
||||
streams::{StreamAutoClaimOptions, StreamAutoClaimReply, StreamPendingReply, StreamReadReply},
|
||||
};
|
||||
use testing_framework_core::{
|
||||
cfgsync::{StaticNodeConfigProvider, serialize_plain_text_config},
|
||||
scenario::{Application, DynError, NodeAccess},
|
||||
};
|
||||
|
||||
pub type RedisStreamsTopology = testing_framework_core::topology::ClusterTopology;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RedisStreamsClient {
|
||||
url: String,
|
||||
client: Client,
|
||||
}
|
||||
|
||||
impl RedisStreamsClient {
|
||||
pub fn new(url: String) -> Result<Self, DynError> {
|
||||
let client = Client::open(url.clone())?;
|
||||
Ok(Self { url, client })
|
||||
}
|
||||
|
||||
pub fn url(&self) -> &str {
|
||||
&self.url
|
||||
}
|
||||
|
||||
async fn connection(&self) -> Result<ConnectionManager, DynError> {
|
||||
Ok(self.client.get_connection_manager().await?)
|
||||
}
|
||||
|
||||
pub async fn ping(&self) -> Result<(), DynError> {
|
||||
let mut conn = self.connection().await?;
|
||||
redis::cmd("PING").query_async::<String>(&mut conn).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn ensure_group(&self, stream: &str, group: &str) -> Result<(), DynError> {
|
||||
let mut conn = self.connection().await?;
|
||||
let result = redis::cmd("XGROUP")
|
||||
.arg("CREATE")
|
||||
.arg(stream)
|
||||
.arg(group)
|
||||
.arg("$")
|
||||
.arg("MKSTREAM")
|
||||
.query_async::<()>(&mut conn)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Ok(()) => Ok(()),
|
||||
Err(error) if error.to_string().contains("BUSYGROUP") => Ok(()),
|
||||
Err(error) => Err(error.into()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn append_message(&self, stream: &str, payload: &str) -> Result<String, DynError> {
|
||||
let mut conn = self.connection().await?;
|
||||
let id = redis::cmd("XADD")
|
||||
.arg(stream)
|
||||
.arg("*")
|
||||
.arg("payload")
|
||||
.arg(payload)
|
||||
.query_async::<String>(&mut conn)
|
||||
.await?;
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
pub async fn read_group_batch(
|
||||
&self,
|
||||
stream: &str,
|
||||
group: &str,
|
||||
consumer: &str,
|
||||
count: usize,
|
||||
block_ms: usize,
|
||||
) -> Result<Vec<String>, DynError> {
|
||||
let mut conn = self.connection().await?;
|
||||
|
||||
let options = redis::streams::StreamReadOptions::default()
|
||||
.group(group, consumer)
|
||||
.count(count)
|
||||
.block(block_ms);
|
||||
|
||||
let reply: StreamReadReply = conn.xread_options(&[stream], &[">"], &options).await?;
|
||||
|
||||
let mut ids = Vec::new();
|
||||
for key in reply.keys {
|
||||
for entry in key.ids {
|
||||
ids.push(entry.id);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ids)
|
||||
}
|
||||
|
||||
pub async fn ack_messages(
|
||||
&self,
|
||||
stream: &str,
|
||||
group: &str,
|
||||
ids: &[String],
|
||||
) -> Result<u64, DynError> {
|
||||
if ids.is_empty() {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let mut conn = self.connection().await?;
|
||||
let acked = redis::cmd("XACK")
|
||||
.arg(stream)
|
||||
.arg(group)
|
||||
.arg(ids)
|
||||
.query_async::<u64>(&mut conn)
|
||||
.await?;
|
||||
Ok(acked)
|
||||
}
|
||||
|
||||
pub async fn autoclaim_batch(
|
||||
&self,
|
||||
stream: &str,
|
||||
group: &str,
|
||||
consumer: &str,
|
||||
min_idle_ms: u64,
|
||||
start: &str,
|
||||
count: usize,
|
||||
) -> Result<(String, Vec<String>), DynError> {
|
||||
let mut conn = self.connection().await?;
|
||||
let options = StreamAutoClaimOptions::default().count(count);
|
||||
let reply: StreamAutoClaimReply = conn
|
||||
.xautoclaim_options(stream, group, consumer, min_idle_ms, start, options)
|
||||
.await?;
|
||||
|
||||
Ok((
|
||||
reply.next_stream_id,
|
||||
reply.claimed.into_iter().map(|entry| entry.id).collect(),
|
||||
))
|
||||
}
|
||||
|
||||
pub async fn pending_count(&self, stream: &str, group: &str) -> Result<u64, DynError> {
|
||||
let mut conn = self.connection().await?;
|
||||
let reply: StreamPendingReply = conn.xpending(stream, group).await?;
|
||||
Ok(reply.count() as u64)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RedisStreamsEnv;
|
||||
|
||||
#[async_trait]
|
||||
impl Application for RedisStreamsEnv {
|
||||
type Deployment = RedisStreamsTopology;
|
||||
type NodeClient = RedisStreamsClient;
|
||||
type NodeConfig = String;
|
||||
|
||||
fn build_node_client(access: &NodeAccess) -> Result<Self::NodeClient, DynError> {
|
||||
let port = access.testing_port().unwrap_or(access.api_port());
|
||||
RedisStreamsClient::new(format!("redis://{}:{port}", access.host()))
|
||||
}
|
||||
}
|
||||
|
||||
impl StaticNodeConfigProvider for RedisStreamsEnv {
|
||||
type Error = std::convert::Infallible;
|
||||
|
||||
fn build_node_config(
|
||||
_deployment: &Self::Deployment,
|
||||
_node_index: usize,
|
||||
) -> Result<Self::NodeConfig, Self::Error> {
|
||||
Ok("appendonly yes\nprotected-mode no\n".to_owned())
|
||||
}
|
||||
|
||||
fn serialize_node_config(config: &Self::NodeConfig) -> Result<String, Self::Error> {
|
||||
serialize_plain_text_config(config)
|
||||
}
|
||||
}
|
||||
105
examples/redis_streams/testing/integration/src/compose_env.rs
Normal file
105
examples/redis_streams/testing/integration/src/compose_env.rs
Normal file
@ -0,0 +1,105 @@
|
||||
use std::{env, fs, io::Error, path::Path};
|
||||
|
||||
use testing_framework_core::{
|
||||
cfgsync::StaticArtifactRenderer, scenario::DynError, topology::DeploymentDescriptor,
|
||||
};
|
||||
use testing_framework_runner_compose::{
|
||||
ComposeDeployEnv, ComposeNodeConfigFileName, ComposeReadinessProbe, EnvEntry,
|
||||
LoopbackNodeRuntimeSpec, infrastructure::ports::NodeHostPorts,
|
||||
};
|
||||
|
||||
use crate::{RedisStreamsClient, RedisStreamsEnv};
|
||||
|
||||
const NODE_CONFIG_PATH: &str = "/usr/local/etc/redis/redis.conf";
|
||||
const REDIS_PORT: u16 = 6379;
|
||||
|
||||
impl ComposeDeployEnv for RedisStreamsEnv {
|
||||
fn prepare_compose_configs(
|
||||
path: &Path,
|
||||
topology: &Self::Deployment,
|
||||
_cfgsync_port: u16,
|
||||
_metrics_otlp_ingest_url: Option<&reqwest::Url>,
|
||||
) -> Result<(), DynError> {
|
||||
let hostnames = Self::cfgsync_hostnames(topology);
|
||||
let configs_dir = path
|
||||
.parent()
|
||||
.ok_or_else(|| Error::other("cfgsync path has no parent"))?
|
||||
.join("configs");
|
||||
fs::create_dir_all(&configs_dir)?;
|
||||
|
||||
for index in 0..topology.node_count() {
|
||||
let mut config = <Self as StaticArtifactRenderer>::build_node_config(topology, index)?;
|
||||
<Self as StaticArtifactRenderer>::rewrite_for_hostnames(
|
||||
topology,
|
||||
index,
|
||||
&hostnames,
|
||||
&mut config,
|
||||
)?;
|
||||
let rendered = <Self as StaticArtifactRenderer>::serialize_node_config(&config)?;
|
||||
fs::write(
|
||||
configs_dir.join(Self::static_node_config_file_name(index)),
|
||||
rendered,
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn static_node_config_file_name(index: usize) -> String {
|
||||
ComposeNodeConfigFileName::FixedExtension("conf").resolve(index)
|
||||
}
|
||||
|
||||
fn loopback_node_runtime_spec(
|
||||
_topology: &Self::Deployment,
|
||||
index: usize,
|
||||
) -> Result<Option<LoopbackNodeRuntimeSpec>, DynError> {
|
||||
Ok(Some(build_redis_runtime(index)))
|
||||
}
|
||||
|
||||
fn node_client_from_ports(
|
||||
ports: &NodeHostPorts,
|
||||
host: &str,
|
||||
) -> Result<Self::NodeClient, DynError> {
|
||||
redis_client_from_ports(ports, host)
|
||||
}
|
||||
|
||||
fn readiness_probe() -> ComposeReadinessProbe {
|
||||
ComposeReadinessProbe::Tcp
|
||||
}
|
||||
}
|
||||
|
||||
fn build_redis_runtime(index: usize) -> LoopbackNodeRuntimeSpec {
|
||||
let image = env::var("REDIS_STREAMS_IMAGE").unwrap_or_else(|_| "redis:7".to_owned());
|
||||
let platform = env::var("REDIS_STREAMS_PLATFORM").ok();
|
||||
LoopbackNodeRuntimeSpec {
|
||||
image,
|
||||
entrypoint: build_redis_entrypoint(),
|
||||
volumes: vec![format!(
|
||||
"./stack/configs/node-{index}.conf:{NODE_CONFIG_PATH}:ro"
|
||||
)],
|
||||
extra_hosts: vec![],
|
||||
container_ports: vec![REDIS_PORT],
|
||||
environment: vec![EnvEntry::new("RUST_LOG", "info")],
|
||||
platform,
|
||||
}
|
||||
}
|
||||
|
||||
fn redis_client_from_ports(
|
||||
ports: &NodeHostPorts,
|
||||
host: &str,
|
||||
) -> Result<RedisStreamsClient, DynError> {
|
||||
RedisStreamsClient::new(format!("redis://{host}:{}", ports.api))
|
||||
}
|
||||
|
||||
fn build_redis_entrypoint() -> Vec<String> {
|
||||
vec![
|
||||
"redis-server".to_owned(),
|
||||
NODE_CONFIG_PATH.to_owned(),
|
||||
"--bind".to_owned(),
|
||||
"0.0.0.0".to_owned(),
|
||||
"--port".to_owned(),
|
||||
REDIS_PORT.to_string(),
|
||||
"--protected-mode".to_owned(),
|
||||
"no".to_owned(),
|
||||
]
|
||||
}
|
||||
9
examples/redis_streams/testing/integration/src/lib.rs
Normal file
9
examples/redis_streams/testing/integration/src/lib.rs
Normal file
@ -0,0 +1,9 @@
|
||||
mod app;
|
||||
mod compose_env;
|
||||
pub mod scenario;
|
||||
|
||||
pub use app::*;
|
||||
pub use scenario::{RedisStreamsBuilderExt, RedisStreamsScenarioBuilder};
|
||||
|
||||
pub type RedisStreamsComposeDeployer =
|
||||
testing_framework_runner_compose::ComposeDeployer<RedisStreamsEnv>;
|
||||
15
examples/redis_streams/testing/integration/src/scenario.rs
Normal file
15
examples/redis_streams/testing/integration/src/scenario.rs
Normal file
@ -0,0 +1,15 @@
|
||||
use testing_framework_core::scenario::ScenarioBuilder;
|
||||
|
||||
use crate::{RedisStreamsEnv, RedisStreamsTopology};
|
||||
|
||||
pub type RedisStreamsScenarioBuilder = ScenarioBuilder<RedisStreamsEnv>;
|
||||
|
||||
pub trait RedisStreamsBuilderExt: Sized {
|
||||
fn deployment_with(f: impl FnOnce(RedisStreamsTopology) -> RedisStreamsTopology) -> Self;
|
||||
}
|
||||
|
||||
impl RedisStreamsBuilderExt for RedisStreamsScenarioBuilder {
|
||||
fn deployment_with(f: impl FnOnce(RedisStreamsTopology) -> RedisStreamsTopology) -> Self {
|
||||
RedisStreamsScenarioBuilder::with_deployment(f(RedisStreamsTopology::new(3)))
|
||||
}
|
||||
}
|
||||
13
examples/redis_streams/testing/workloads/Cargo.toml
Normal file
13
examples/redis_streams/testing/workloads/Cargo.toml
Normal file
@ -0,0 +1,13 @@
|
||||
[package]
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
name = "redis-streams-runtime-workloads"
|
||||
version.workspace = true
|
||||
|
||||
[dependencies]
|
||||
redis-streams-runtime-ext = { path = "../integration" }
|
||||
testing-framework-core = { workspace = true }
|
||||
|
||||
async-trait = { workspace = true }
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
tracing = { workspace = true }
|
||||
74
examples/redis_streams/testing/workloads/src/health.rs
Normal file
74
examples/redis_streams/testing/workloads/src/health.rs
Normal file
@ -0,0 +1,74 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use redis_streams_runtime_ext::{RedisStreamsClient, RedisStreamsEnv};
|
||||
use testing_framework_core::scenario::{DynError, Expectation, RunContext};
|
||||
use tokio::time::Instant;
|
||||
use tracing::info;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RedisStreamsClusterHealthy {
|
||||
timeout: Duration,
|
||||
poll_interval: Duration,
|
||||
}
|
||||
|
||||
impl RedisStreamsClusterHealthy {
|
||||
#[must_use]
|
||||
pub const fn new() -> Self {
|
||||
Self {
|
||||
timeout: Duration::from_secs(20),
|
||||
poll_interval: Duration::from_millis(500),
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub const fn timeout(mut self, timeout: Duration) -> Self {
|
||||
self.timeout = timeout;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for RedisStreamsClusterHealthy {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Expectation<RedisStreamsEnv> for RedisStreamsClusterHealthy {
|
||||
fn name(&self) -> &str {
|
||||
"redis_streams_cluster_healthy"
|
||||
}
|
||||
|
||||
async fn evaluate(&mut self, ctx: &RunContext<RedisStreamsEnv>) -> Result<(), DynError> {
|
||||
let clients = ctx.node_clients().snapshot();
|
||||
if clients.is_empty() {
|
||||
return Err("no redis streams node clients available".into());
|
||||
}
|
||||
|
||||
let deadline = Instant::now() + self.timeout;
|
||||
while Instant::now() < deadline {
|
||||
if all_nodes_healthy(&clients).await? {
|
||||
info!(nodes = clients.len(), "redis streams cluster healthy");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
tokio::time::sleep(self.poll_interval).await;
|
||||
}
|
||||
|
||||
Err(format!(
|
||||
"redis streams cluster not healthy within {:?}",
|
||||
self.timeout
|
||||
)
|
||||
.into())
|
||||
}
|
||||
}
|
||||
|
||||
async fn all_nodes_healthy(clients: &[RedisStreamsClient]) -> Result<bool, DynError> {
|
||||
for client in clients {
|
||||
if client.ping().await.is_err() {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
Ok(true)
|
||||
}
|
||||
10
examples/redis_streams/testing/workloads/src/lib.rs
Normal file
10
examples/redis_streams/testing/workloads/src/lib.rs
Normal file
@ -0,0 +1,10 @@
|
||||
mod health;
|
||||
mod reclaim_failover;
|
||||
mod roundtrip;
|
||||
|
||||
pub use health::RedisStreamsClusterHealthy;
|
||||
pub use reclaim_failover::RedisStreamsReclaimFailoverWorkload;
|
||||
pub use redis_streams_runtime_ext::{
|
||||
RedisStreamsBuilderExt, RedisStreamsEnv, RedisStreamsScenarioBuilder, RedisStreamsTopology,
|
||||
};
|
||||
pub use roundtrip::RedisStreamsRoundTripWorkload;
|
||||
202
examples/redis_streams/testing/workloads/src/reclaim_failover.rs
Normal file
202
examples/redis_streams/testing/workloads/src/reclaim_failover.rs
Normal file
@ -0,0 +1,202 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use redis_streams_runtime_ext::{RedisStreamsClient, RedisStreamsEnv};
|
||||
use testing_framework_core::scenario::{DynError, RunContext, Workload};
|
||||
use tokio::time::Instant;
|
||||
use tracing::info;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RedisStreamsReclaimFailoverWorkload {
|
||||
stream: String,
|
||||
group: String,
|
||||
producer_consumer: String,
|
||||
failover_consumer: String,
|
||||
messages: usize,
|
||||
batch: usize,
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
impl RedisStreamsReclaimFailoverWorkload {
|
||||
#[must_use]
|
||||
pub fn new(stream: impl Into<String>, group: impl Into<String>) -> Self {
|
||||
Self {
|
||||
stream: stream.into(),
|
||||
group: group.into(),
|
||||
producer_consumer: "worker-a".to_owned(),
|
||||
failover_consumer: "worker-b".to_owned(),
|
||||
messages: 200,
|
||||
batch: 64,
|
||||
timeout: Duration::from_secs(20),
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub const fn messages(mut self, value: usize) -> Self {
|
||||
self.messages = value;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub const fn batch(mut self, value: usize) -> Self {
|
||||
self.batch = value;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn producer_consumer(mut self, value: impl Into<String>) -> Self {
|
||||
self.producer_consumer = value.into();
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn failover_consumer(mut self, value: impl Into<String>) -> Self {
|
||||
self.failover_consumer = value.into();
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub const fn timeout(mut self, value: Duration) -> Self {
|
||||
self.timeout = value;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Workload<RedisStreamsEnv> for RedisStreamsReclaimFailoverWorkload {
|
||||
fn name(&self) -> &str {
|
||||
"redis_streams_reclaim_failover_workload"
|
||||
}
|
||||
|
||||
async fn start(&self, ctx: &RunContext<RedisStreamsEnv>) -> Result<(), DynError> {
|
||||
let clients = ctx.node_clients().snapshot();
|
||||
if clients.is_empty() {
|
||||
return Err("redis streams failover workload requires at least 1 node client".into());
|
||||
}
|
||||
|
||||
let driver = &clients[0];
|
||||
driver.ensure_group(&self.stream, &self.group).await?;
|
||||
|
||||
info!(messages = self.messages, stream = %self.stream, group = %self.group, "redis streams failover: produce phase");
|
||||
produce_messages(driver, &self.stream, self.messages).await?;
|
||||
|
||||
info!(messages = self.messages, consumer = %self.producer_consumer, "redis streams failover: create pending phase");
|
||||
let pending_count = create_pending_messages(
|
||||
driver,
|
||||
&self.stream,
|
||||
&self.group,
|
||||
&self.producer_consumer,
|
||||
self.messages,
|
||||
self.batch,
|
||||
self.timeout,
|
||||
)
|
||||
.await?;
|
||||
|
||||
info!(pending = pending_count, from = %self.producer_consumer, to = %self.failover_consumer, "redis streams failover: reclaim+ack phase");
|
||||
reclaim_and_ack_pending(
|
||||
driver,
|
||||
&self.stream,
|
||||
&self.group,
|
||||
&self.failover_consumer,
|
||||
pending_count,
|
||||
self.batch,
|
||||
self.timeout,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let pending = driver.pending_count(&self.stream, &self.group).await?;
|
||||
if pending != 0 {
|
||||
return Err(
|
||||
format!("redis streams pending entries remain after reclaim: {pending}").into(),
|
||||
);
|
||||
}
|
||||
|
||||
info!(
|
||||
pending = pending_count,
|
||||
"redis streams failover reclaim complete"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn produce_messages(
|
||||
client: &RedisStreamsClient,
|
||||
stream: &str,
|
||||
messages: usize,
|
||||
) -> Result<(), DynError> {
|
||||
for idx in 0..messages {
|
||||
let payload = format!("msg-{idx}");
|
||||
client.append_message(stream, &payload).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_pending_messages(
|
||||
client: &RedisStreamsClient,
|
||||
stream: &str,
|
||||
group: &str,
|
||||
consumer: &str,
|
||||
expected: usize,
|
||||
batch: usize,
|
||||
timeout: Duration,
|
||||
) -> Result<usize, DynError> {
|
||||
let mut claimed = 0usize;
|
||||
let deadline = Instant::now() + timeout;
|
||||
|
||||
while claimed < expected && Instant::now() < deadline {
|
||||
let ids = client
|
||||
.read_group_batch(stream, group, consumer, batch, 500)
|
||||
.await?;
|
||||
|
||||
if ids.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
claimed += ids.len();
|
||||
}
|
||||
|
||||
if claimed == expected {
|
||||
return Ok(claimed);
|
||||
}
|
||||
|
||||
Err(format!("redis streams pending creation timed out: claimed {claimed}/{expected}").into())
|
||||
}
|
||||
|
||||
async fn reclaim_and_ack_pending(
|
||||
client: &RedisStreamsClient,
|
||||
stream: &str,
|
||||
group: &str,
|
||||
failover_consumer: &str,
|
||||
expected: usize,
|
||||
batch: usize,
|
||||
timeout: Duration,
|
||||
) -> Result<(), DynError> {
|
||||
let mut reclaimed = 0usize;
|
||||
let mut cursor = "0-0".to_owned();
|
||||
let deadline = Instant::now() + timeout;
|
||||
|
||||
while reclaimed < expected && Instant::now() < deadline {
|
||||
let (next_cursor, ids) = client
|
||||
.autoclaim_batch(stream, group, failover_consumer, 0, &cursor, batch)
|
||||
.await?;
|
||||
|
||||
if ids.is_empty() {
|
||||
if next_cursor == "0-0" {
|
||||
break;
|
||||
}
|
||||
cursor = next_cursor;
|
||||
continue;
|
||||
}
|
||||
|
||||
let acked = client.ack_messages(stream, group, &ids).await? as usize;
|
||||
reclaimed += acked;
|
||||
cursor = next_cursor;
|
||||
}
|
||||
|
||||
if reclaimed == expected {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Err(format!("redis streams reclaim timed out: acked {reclaimed}/{expected}").into())
|
||||
}
|
||||
131
examples/redis_streams/testing/workloads/src/roundtrip.rs
Normal file
131
examples/redis_streams/testing/workloads/src/roundtrip.rs
Normal file
@ -0,0 +1,131 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use redis_streams_runtime_ext::RedisStreamsEnv;
|
||||
use testing_framework_core::scenario::{DynError, RunContext, Workload};
|
||||
use tokio::time::Instant;
|
||||
use tracing::info;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RedisStreamsRoundTripWorkload {
|
||||
stream: String,
|
||||
group: String,
|
||||
consumer: String,
|
||||
messages: usize,
|
||||
read_batch: usize,
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
impl RedisStreamsRoundTripWorkload {
|
||||
#[must_use]
|
||||
pub fn new(stream: impl Into<String>, group: impl Into<String>) -> Self {
|
||||
Self {
|
||||
stream: stream.into(),
|
||||
group: group.into(),
|
||||
consumer: "worker-1".to_owned(),
|
||||
messages: 200,
|
||||
read_batch: 32,
|
||||
timeout: Duration::from_secs(20),
|
||||
}
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub const fn messages(mut self, value: usize) -> Self {
|
||||
self.messages = value;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub const fn read_batch(mut self, value: usize) -> Self {
|
||||
self.read_batch = value;
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub fn consumer(mut self, value: impl Into<String>) -> Self {
|
||||
self.consumer = value.into();
|
||||
self
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
pub const fn timeout(mut self, value: Duration) -> Self {
|
||||
self.timeout = value;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Workload<RedisStreamsEnv> for RedisStreamsRoundTripWorkload {
|
||||
fn name(&self) -> &str {
|
||||
"redis_streams_roundtrip_workload"
|
||||
}
|
||||
|
||||
async fn start(&self, ctx: &RunContext<RedisStreamsEnv>) -> Result<(), DynError> {
|
||||
let clients = ctx.node_clients().snapshot();
|
||||
if clients.is_empty() {
|
||||
return Err("redis streams workload requires at least 1 node".into());
|
||||
}
|
||||
|
||||
let driver = &clients[0];
|
||||
|
||||
driver.ensure_group(&self.stream, &self.group).await?;
|
||||
|
||||
info!(messages = self.messages, stream = %self.stream, group = %self.group, "redis streams produce phase");
|
||||
for idx in 0..self.messages {
|
||||
let payload = format!("msg-{idx}");
|
||||
driver.append_message(&self.stream, &payload).await?;
|
||||
}
|
||||
|
||||
info!(messages = self.messages, stream = %self.stream, group = %self.group, "redis streams consume+ack phase");
|
||||
consume_and_ack(
|
||||
driver,
|
||||
&self.stream,
|
||||
&self.group,
|
||||
&self.consumer,
|
||||
self.messages,
|
||||
self.read_batch,
|
||||
self.timeout,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let pending = driver.pending_count(&self.stream, &self.group).await?;
|
||||
if pending != 0 {
|
||||
return Err(format!("redis streams pending entries remain: {pending}").into());
|
||||
}
|
||||
|
||||
info!(messages = self.messages, stream = %self.stream, group = %self.group, "redis streams roundtrip finished");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn consume_and_ack(
|
||||
client: &redis_streams_runtime_ext::RedisStreamsClient,
|
||||
stream: &str,
|
||||
group: &str,
|
||||
consumer: &str,
|
||||
expected: usize,
|
||||
batch: usize,
|
||||
timeout: Duration,
|
||||
) -> Result<(), DynError> {
|
||||
let mut acked_total = 0usize;
|
||||
let deadline = Instant::now() + timeout;
|
||||
|
||||
while acked_total < expected && Instant::now() < deadline {
|
||||
let ids = client
|
||||
.read_group_batch(stream, group, consumer, batch, 500)
|
||||
.await?;
|
||||
|
||||
if ids.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let acked = client.ack_messages(stream, group, &ids).await? as usize;
|
||||
acked_total += acked;
|
||||
}
|
||||
|
||||
if acked_total == expected {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Err(format!("redis streams timed out: acked {acked_total}/{expected} messages").into())
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user