Merge 633c048be6113d687429574021308ce2f3f39f47 into a5dcc977631d668785ec679a94110b0b7bca6132

This commit is contained in:
Andrus Salumets 2026-04-11 05:09:14 +00:00 committed by GitHub
commit f7719eb56f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
38 changed files with 2418 additions and 2 deletions

295
Cargo.lock generated
View File

@ -95,6 +95,15 @@ version = "1.0.101"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea"
[[package]]
name = "arc-swap"
version = "1.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a3a1fd6f75306b68087b831f025c712524bcb19aad54e557b1129cfa0a2b207"
dependencies = [
"rustversion",
]
[[package]]
name = "async-broadcast"
version = "0.7.2"
@ -177,11 +186,13 @@ dependencies = [
"serde",
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
@ -202,6 +213,7 @@ dependencies = [
"sync_wrapper",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
@ -405,6 +417,20 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
[[package]]
name = "combine"
version = "4.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd"
dependencies = [
"bytes",
"futures-core",
"memchr",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
name = "concurrent-queue"
version = "2.5.0"
@ -1026,7 +1052,7 @@ dependencies = [
"libc",
"percent-encoding",
"pin-project-lite",
"socket2",
"socket2 0.6.2",
"tokio",
"tower-service",
"tracing",
@ -1436,6 +1462,15 @@ version = "0.4.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897"
[[package]]
name = "matchers"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
dependencies = [
"regex-automata",
]
[[package]]
name = "matchit"
version = "0.7.3"
@ -1448,6 +1483,68 @@ version = "2.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273"
[[package]]
name = "metrics-counter-examples"
version = "0.1.0"
dependencies = [
"anyhow",
"metrics-counter-runtime-ext",
"metrics-counter-runtime-workloads",
"reqwest",
"serde",
"serde_json",
"testing-framework-core",
"testing-framework-runner-compose",
"testing-framework-runner-k8s",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "metrics-counter-node"
version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"clap",
"prometheus",
"serde",
"serde_yaml",
"tokio",
"tower-http",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "metrics-counter-runtime-ext"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"reqwest",
"serde",
"serde_yaml",
"testing-framework-core",
"testing-framework-runner-compose",
"testing-framework-runner-k8s",
"testing-framework-runner-local",
]
[[package]]
name = "metrics-counter-runtime-workloads"
version = "0.1.0"
dependencies = [
"async-trait",
"metrics-counter-runtime-ext",
"serde",
"serde_json",
"testing-framework-core",
"tokio",
"tracing",
]
[[package]]
name = "mime"
version = "0.3.17"
@ -1482,12 +1579,40 @@ dependencies = [
"tempfile",
]
[[package]]
name = "nu-ansi-term"
version = "0.50.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "num-bigint"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9"
dependencies = [
"num-integer",
"num-traits",
]
[[package]]
name = "num-conv"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050"
[[package]]
name = "num-integer"
version = "0.1.46"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f"
dependencies = [
"num-traits",
]
[[package]]
name = "num-traits"
version = "0.2.19"
@ -1789,6 +1914,21 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "prometheus"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ca5326d8d0b950a9acd87e6a3f94745394f62e4dae1b1ee22b2bc0c394af43a"
dependencies = [
"cfg-if",
"fnv",
"lazy_static",
"memchr",
"parking_lot",
"protobuf",
"thiserror 2.0.18",
]
[[package]]
name = "prometheus-http-query"
version = "0.8.3"
@ -1803,6 +1943,26 @@ dependencies = [
"url",
]
[[package]]
name = "protobuf"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4"
dependencies = [
"once_cell",
"protobuf-support",
"thiserror 1.0.69",
]
[[package]]
name = "protobuf-support"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6"
dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "quote"
version = "1.0.44"
@ -1848,6 +2008,66 @@ dependencies = [
"getrandom 0.2.17",
]
[[package]]
name = "redis"
version = "0.29.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bc42f3a12fd4408ce64d8efef67048a924e543bd35c6591c0447fda9054695f"
dependencies = [
"arc-swap",
"backon",
"bytes",
"combine",
"futures-channel",
"futures-util",
"itoa",
"num-bigint",
"percent-encoding",
"pin-project-lite",
"ryu",
"sha1_smol",
"socket2 0.5.10",
"tokio",
"tokio-util",
"url",
]
[[package]]
name = "redis-streams-examples"
version = "0.1.0"
dependencies = [
"anyhow",
"redis-streams-runtime-ext",
"redis-streams-runtime-workloads",
"testing-framework-core",
"testing-framework-runner-compose",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "redis-streams-runtime-ext"
version = "0.1.0"
dependencies = [
"async-trait",
"redis",
"reqwest",
"testing-framework-core",
"testing-framework-runner-compose",
]
[[package]]
name = "redis-streams-runtime-workloads"
version = "0.1.0"
dependencies = [
"async-trait",
"redis-streams-runtime-ext",
"testing-framework-core",
"tokio",
"tracing",
]
[[package]]
name = "redox_syscall"
version = "0.5.18"
@ -2194,6 +2414,12 @@ dependencies = [
"unsafe-libyaml",
]
[[package]]
name = "sha1_smol"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d"
[[package]]
name = "sha2"
version = "0.10.9"
@ -2205,6 +2431,15 @@ dependencies = [
"digest",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]]
name = "shlex"
version = "1.3.0"
@ -2249,6 +2484,16 @@ version = "1.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03"
[[package]]
name = "socket2"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "socket2"
version = "0.6.2"
@ -2465,6 +2710,15 @@ dependencies = [
"syn",
]
[[package]]
name = "thread_local"
version = "1.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185"
dependencies = [
"cfg-if",
]
[[package]]
name = "time"
version = "0.3.47"
@ -2514,9 +2768,10 @@ dependencies = [
"bytes",
"libc",
"mio",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"socket2 0.6.2",
"tokio-macros",
"windows-sys 0.61.2",
]
@ -2657,6 +2912,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex-automata",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]
[[package]]
@ -2737,6 +3022,12 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "valuable"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "vcpkg"
version = "0.2.15"

View File

@ -4,6 +4,13 @@ members = [
"cfgsync/artifacts",
"cfgsync/core",
"cfgsync/runtime",
"examples/metrics_counter/examples",
"examples/metrics_counter/metrics-counter-node",
"examples/metrics_counter/testing/integration",
"examples/metrics_counter/testing/workloads",
"examples/redis_streams/examples",
"examples/redis_streams/testing/integration",
"examples/redis_streams/testing/workloads",
"testing-framework/core",
"testing-framework/deployers/compose",
"testing-framework/deployers/k8s",

View File

@ -0,0 +1,24 @@
FROM rustlang/rust:nightly-bookworm AS builder
WORKDIR /build
COPY Cargo.toml Cargo.lock ./
COPY cfgsync/ ./cfgsync/
COPY examples/ ./examples/
COPY testing-framework/ ./testing-framework/
RUN cargo build --release -p metrics-counter-node
FROM debian:bookworm-slim
RUN apt-get update && \
apt-get install -y ca-certificates && \
rm -rf /var/lib/apt/lists/*
COPY --from=builder /build/target/release/metrics-counter-node /usr/local/bin/metrics-counter-node
RUN mkdir -p /etc/metrics-counter
WORKDIR /app
ENTRYPOINT ["/usr/local/bin/metrics-counter-node"]
CMD ["--config", "/etc/metrics-counter/config.yaml"]

View File

@ -0,0 +1,55 @@
# Metrics Counter Example
This example runs a tiny counter service together with a Prometheus scraper.
The scenarios increment counters through the node API and then query Prometheus
to check the aggregate result.
## How TF runs this
Each example follows the same pattern:
- TF starts the counter nodes and a Prometheus scraper
- a workload drives counter increments through the node API
- an expectation, or the manual scenario, checks the Prometheus query result
## API
Each node exposes:
- `POST /counter/inc` to increment the local counter
- `GET /counter/value` to read the current counter value
- `GET /metrics` for Prometheus scraping
## Scenarios
- `compose_prometheus_expectation` runs the app and Prometheus in Docker Compose, then checks the Prometheus query result
- `k8s_prometheus_expectation` runs the same check on Kubernetes
- `k8s_manual_prometheus` starts the nodes through the k8s manual cluster API, restarts one node, and checks the Prometheus aggregate again
## Run with Docker Compose
```bash
LOGOS_BLOCKCHAIN_METRICS_QUERY_URL=http://127.0.0.1:19091 \
cargo run -p metrics-counter-examples --bin compose_prometheus_expectation
```
## Run with Kubernetes
```bash
docker build -t metrics-counter-node:local -f examples/metrics_counter/Dockerfile .
LOGOS_BLOCKCHAIN_METRICS_QUERY_URL=http://127.0.0.1:30991 \
cargo run -p metrics-counter-examples --bin k8s_prometheus_expectation
```
Overrides:
- `METRICS_COUNTER_K8S_IMAGE` (falls back to `METRICS_COUNTER_IMAGE`, then `metrics-counter-node:local`)
- `METRICS_COUNTER_K8S_PROMETHEUS_NODE_PORT` (defaults to `30991`)
## Run with Kubernetes manual cluster
```bash
docker build -t metrics-counter-node:local -f examples/metrics_counter/Dockerfile .
LOGOS_BLOCKCHAIN_METRICS_QUERY_URL=http://127.0.0.1:30991 \
cargo run -p metrics-counter-examples --bin k8s_manual_prometheus
```

View File

@ -0,0 +1,20 @@
[package]
edition.workspace = true
license.workspace = true
name = "metrics-counter-examples"
version.workspace = true
[dependencies]
anyhow = "1.0"
metrics-counter-runtime-ext = { path = "../testing/integration" }
metrics-counter-runtime-workloads = { path = "../testing/workloads" }
reqwest = { workspace = true, features = ["json"] }
serde = { workspace = true }
serde_json = { workspace = true }
testing-framework-core = { workspace = true }
testing-framework-runner-compose = { workspace = true }
testing-framework-runner-k8s = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@ -0,0 +1,61 @@
use std::{env, time::Duration};
use anyhow::{Context as _, Result};
use metrics_counter_runtime_workloads::{
CounterIncrementWorkload, MetricsCounterBuilderExt, MetricsCounterScenarioBuilder,
MetricsCounterTopology, PrometheusCounterAtLeast,
};
use testing_framework_core::scenario::{Deployer, ObservabilityBuilderExt};
use testing_framework_runner_compose::ComposeRunnerError;
use tracing::{info, warn};
const DEFAULT_PROM_URL: &str = "http://127.0.0.1:19091";
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let metrics_url = env::var("LOGOS_BLOCKCHAIN_METRICS_QUERY_URL")
.ok()
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| DEFAULT_PROM_URL.to_owned());
let mut scenario =
MetricsCounterScenarioBuilder::deployment_with(|_| MetricsCounterTopology::new(3))
.enable_observability()
.with_metrics_query_url_str(&metrics_url)
.with_run_duration(Duration::from_secs(20))
.with_workload(
CounterIncrementWorkload::new()
.operations(300)
.rate_per_sec(30),
)
.with_expectation(PrometheusCounterAtLeast::new(300.0))
.build()?;
let deployer = metrics_counter_runtime_ext::MetricsCounterComposeDeployer::new();
let runner = match deployer.deploy(&scenario).await {
Ok(runner) => runner,
Err(ComposeRunnerError::DockerUnavailable) => {
warn!("docker unavailable; skipping compose metrics-counter run");
return Ok(());
}
Err(error) => {
return Err(anyhow::Error::new(error))
.context("deploying metrics-counter compose stack");
}
};
info!(
metrics_url,
"running metrics-counter compose prometheus scenario"
);
runner
.run(&mut scenario)
.await
.context("running metrics-counter compose scenario")?;
Ok(())
}

View File

@ -0,0 +1,182 @@
use std::{env, time::Duration};
use anyhow::{Context as _, Result, anyhow};
use metrics_counter_runtime_ext::{
MetricsCounterHttpClient, MetricsCounterK8sDeployer, MetricsCounterTopology,
};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use testing_framework_runner_k8s::ManualClusterError;
use tracing::{info, warn};
const DEFAULT_PROM_URL: &str = "http://127.0.0.1:30991";
#[derive(Serialize)]
struct IncrementRequest {}
#[derive(Deserialize)]
struct CounterView {
value: u64,
}
#[derive(Deserialize)]
struct PrometheusQueryResponse {
data: PrometheusData,
}
#[derive(Deserialize)]
struct PrometheusData {
result: Vec<PrometheusSample>,
}
#[derive(Deserialize)]
struct PrometheusSample {
value: (serde_json::Value, String),
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let metrics_url = env::var("LOGOS_BLOCKCHAIN_METRICS_QUERY_URL")
.ok()
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| DEFAULT_PROM_URL.to_owned());
let deployer = MetricsCounterK8sDeployer::new();
let cluster = match deployer
.manual_cluster_from_descriptors(MetricsCounterTopology::new(3))
.await
{
Ok(cluster) => cluster,
Err(ManualClusterError::ClientInit { source }) => {
warn!("k8s unavailable ({source}); skipping metrics-counter k8s manual run");
return Ok(());
}
Err(ManualClusterError::InstallStack { source })
if k8s_cluster_unavailable(&source.to_string()) =>
{
warn!("k8s unavailable ({source}); skipping metrics-counter k8s manual run");
return Ok(());
}
Err(error) => {
return Err(anyhow::Error::new(error))
.context("creating metrics-counter k8s manual cluster");
}
};
let node0 = cluster.start_node("node-0").await?.client;
let node1 = cluster.start_node("node-1").await?.client;
let node2 = cluster.start_node("node-2").await?.client;
cluster.wait_network_ready().await?;
increment_many(&node0, 40).await?;
increment_many(&node1, 30).await?;
increment_many(&node2, 20).await?;
wait_for_counter_value(&node0, 40).await?;
wait_for_counter_value(&node1, 30).await?;
wait_for_counter_value(&node2, 20).await?;
wait_for_prometheus_sum(&metrics_url, 90.0).await?;
info!("restarting node-1 in manual cluster");
cluster.restart_node("node-1").await?;
cluster.wait_network_ready().await?;
let restarted_node1 = cluster
.node_client("node-1")
.ok_or_else(|| anyhow!("node-1 client missing after restart"))?;
wait_for_counter_value(&restarted_node1, 0).await?;
increment_many(&node0, 10).await?;
increment_many(&restarted_node1, 5).await?;
wait_for_counter_value(&node0, 50).await?;
wait_for_counter_value(&restarted_node1, 5).await?;
wait_for_counter_value(&node2, 20).await?;
wait_for_prometheus_sum(&metrics_url, 75.0).await?;
cluster.stop_all();
Ok(())
}
async fn increment_many(client: &MetricsCounterHttpClient, operations: usize) -> Result<()> {
for _ in 0..operations {
let _: CounterView = client
.post("/counter/inc", &IncrementRequest {})
.await
.map_err(|error| anyhow!(error.to_string()))?;
}
Ok(())
}
async fn wait_for_counter_value(client: &MetricsCounterHttpClient, expected: u64) -> Result<()> {
let deadline = tokio::time::Instant::now() + Duration::from_secs(20);
while tokio::time::Instant::now() < deadline {
let view: CounterView = client
.get("/counter/value")
.await
.map_err(|error| anyhow!(error.to_string()))?;
if view.value == expected {
return Ok(());
}
tokio::time::sleep(Duration::from_millis(300)).await;
}
Err(anyhow!("counter did not reach expected value {expected}"))
}
async fn wait_for_prometheus_sum(metrics_url: &str, expected: f64) -> Result<()> {
let base_url = Url::parse(metrics_url)?;
let deadline = tokio::time::Instant::now() + Duration::from_secs(30);
while tokio::time::Instant::now() < deadline {
let total = query_prometheus_sum(&base_url).await?;
if (total - expected).abs() < f64::EPSILON {
info!(total, expected, "prometheus sum reached expected value");
return Ok(());
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
Err(anyhow!(
"prometheus sum did not reach expected value {expected}"
))
}
async fn query_prometheus_sum(base_url: &Url) -> Result<f64> {
let client = reqwest::Client::new();
let mut url = base_url.join("/api/v1/query")?;
url.query_pairs_mut()
.append_pair("query", "sum(metrics_counter_value)");
let response: PrometheusQueryResponse = client
.get(url)
.send()
.await?
.error_for_status()?
.json()
.await?;
let Some(sample) = response.data.result.first() else {
return Ok(0.0);
};
sample
.value
.1
.parse()
.map_err(|error| anyhow!("invalid prometheus value: {error}"))
}
fn k8s_cluster_unavailable(message: &str) -> bool {
message.contains("Unable to connect to the server")
|| message.contains("TLS handshake timeout")
|| message.contains("connection refused")
}

View File

@ -0,0 +1,73 @@
use std::{env, time::Duration};
use anyhow::{Context as _, Result};
use metrics_counter_runtime_ext::MetricsCounterK8sDeployer;
use metrics_counter_runtime_workloads::{
CounterIncrementWorkload, MetricsCounterBuilderExt, MetricsCounterScenarioBuilder,
MetricsCounterTopology, PrometheusCounterAtLeast,
};
use testing_framework_core::scenario::{Deployer, ObservabilityBuilderExt};
use testing_framework_runner_k8s::K8sRunnerError;
use tracing::{info, warn};
const DEFAULT_PROM_URL: &str = "http://127.0.0.1:30991";
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let metrics_url = env::var("LOGOS_BLOCKCHAIN_METRICS_QUERY_URL")
.ok()
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| DEFAULT_PROM_URL.to_owned());
let mut scenario =
MetricsCounterScenarioBuilder::deployment_with(|_| MetricsCounterTopology::new(3))
.enable_observability()
.with_metrics_query_url_str(&metrics_url)
.with_run_duration(Duration::from_secs(25))
.with_workload(
CounterIncrementWorkload::new()
.operations(240)
.rate_per_sec(20),
)
.with_expectation(PrometheusCounterAtLeast::new(240.0))
.build()?;
let deployer = MetricsCounterK8sDeployer::new();
let runner = match deployer.deploy(&scenario).await {
Ok(runner) => runner,
Err(K8sRunnerError::ClientInit { source }) => {
warn!("k8s unavailable ({source}); skipping metrics-counter k8s run");
return Ok(());
}
Err(K8sRunnerError::InstallStack { source })
if k8s_cluster_unavailable(&source.to_string()) =>
{
warn!("k8s unavailable ({source}); skipping metrics-counter k8s run");
return Ok(());
}
Err(error) => {
return Err(anyhow::Error::new(error)).context("deploying metrics-counter k8s stack");
}
};
info!(
metrics_url,
"running metrics-counter k8s prometheus scenario"
);
runner
.run(&mut scenario)
.await
.context("running metrics-counter k8s scenario")?;
Ok(())
}
fn k8s_cluster_unavailable(message: &str) -> bool {
message.contains("Unable to connect to the server")
|| message.contains("TLS handshake timeout")
|| message.contains("connection refused")
}

View File

@ -0,0 +1,21 @@
[package]
edition.workspace = true
license.workspace = true
name = "metrics-counter-node"
version.workspace = true
[[bin]]
name = "metrics-counter-node"
path = "src/main.rs"
[dependencies]
anyhow = "1.0"
axum = "0.7"
clap = { version = "4.0", features = ["derive"] }
prometheus = "0.14"
serde = { workspace = true }
serde_yaml = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tower-http = { version = "0.6", features = ["trace"] }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@ -0,0 +1,14 @@
use clap::Parser;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CounterConfig {
pub node_id: u64,
pub http_port: u16,
}
#[derive(Debug, Parser)]
pub struct Cli {
#[arg(long)]
pub config: std::path::PathBuf,
}

View File

@ -0,0 +1,19 @@
mod config;
mod server;
mod state;
use clap::Parser;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let cli = config::Cli::parse();
let raw = std::fs::read_to_string(cli.config)?;
let config: config::CounterConfig = serde_yaml::from_str(&raw)?;
let state = state::CounterState::new();
server::start_server(config, state).await
}

View File

@ -0,0 +1,83 @@
use std::net::SocketAddr;
use axum::{
Router,
extract::State,
http::StatusCode,
response::{IntoResponse, Json, Response},
routing::{get, post},
};
use prometheus::{Encoder, TextEncoder};
use serde::Serialize;
use tower_http::trace::TraceLayer;
use crate::{
config::CounterConfig,
state::{CounterState, CounterView},
};
#[derive(Serialize)]
struct HealthResponse {
status: &'static str,
}
pub async fn start_server(config: CounterConfig, state: CounterState) -> anyhow::Result<()> {
let app = Router::new()
.route("/health/live", get(health_live))
.route("/health/ready", get(health_ready))
.route("/counter/inc", post(increment))
.route("/counter/value", get(counter_value))
.route("/metrics", get(metrics))
.layer(TraceLayer::new_for_http())
.with_state(state.clone());
let addr = SocketAddr::from(([0, 0, 0, 0], config.http_port));
let listener = tokio::net::TcpListener::bind(addr).await?;
state.set_ready(true).await;
tracing::info!(node_id = config.node_id, %addr, "metrics-counter node ready");
axum::serve(listener, app).await?;
Ok(())
}
async fn health_live() -> (StatusCode, Json<HealthResponse>) {
(StatusCode::OK, Json(HealthResponse { status: "alive" }))
}
async fn health_ready(State(state): State<CounterState>) -> (StatusCode, Json<HealthResponse>) {
if state.is_ready().await {
(StatusCode::OK, Json(HealthResponse { status: "ready" }))
} else {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(HealthResponse {
status: "not-ready",
}),
)
}
}
async fn increment(State(state): State<CounterState>) -> Json<CounterView> {
Json(state.increment())
}
async fn counter_value(State(state): State<CounterState>) -> Json<CounterView> {
Json(state.view())
}
async fn metrics() -> Response {
let metric_families = prometheus::gather();
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
if encoder.encode(&metric_families, &mut buffer).is_err() {
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
}
(
StatusCode::OK,
[("content-type", encoder.format_type().to_string())],
buffer,
)
.into_response()
}

View File

@ -0,0 +1,64 @@
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
};
use prometheus::{IntCounter, IntGauge};
use serde::Serialize;
use tokio::sync::RwLock;
#[derive(Clone, Debug, Serialize)]
pub struct CounterView {
pub value: u64,
}
#[derive(Clone)]
pub struct CounterState {
ready: Arc<RwLock<bool>>,
value: Arc<AtomicU64>,
increments_total: IntCounter,
counter_value: IntGauge,
}
impl CounterState {
pub fn new() -> Self {
let increments_total = IntCounter::new(
"metrics_counter_increments_total",
"Total increment operations",
)
.expect("metrics_counter_increments_total");
let counter_value =
IntGauge::new("metrics_counter_value", "Current counter value").expect("counter gauge");
let _ = prometheus::register(Box::new(increments_total.clone()));
let _ = prometheus::register(Box::new(counter_value.clone()));
Self {
ready: Arc::new(RwLock::new(false)),
value: Arc::new(AtomicU64::new(0)),
increments_total,
counter_value,
}
}
pub async fn set_ready(&self, value: bool) {
*self.ready.write().await = value;
}
pub async fn is_ready(&self) -> bool {
*self.ready.read().await
}
pub fn increment(&self) -> CounterView {
let value = self.value.fetch_add(1, Ordering::SeqCst) + 1;
self.increments_total.inc();
self.counter_value.set(value as i64);
CounterView { value }
}
pub fn view(&self) -> CounterView {
CounterView {
value: self.value.load(Ordering::SeqCst),
}
}
}

View File

@ -0,0 +1,17 @@
[package]
edition.workspace = true
license.workspace = true
name = "metrics-counter-runtime-ext"
version.workspace = true
[dependencies]
testing-framework-core = { workspace = true }
testing-framework-runner-compose = { workspace = true }
testing-framework-runner-k8s = { workspace = true }
testing-framework-runner-local = { workspace = true }
anyhow = "1.0"
async-trait = { workspace = true }
reqwest = { workspace = true, features = ["json"] }
serde = { workspace = true }
serde_yaml = { workspace = true }

View File

@ -0,0 +1,88 @@
use async_trait::async_trait;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use testing_framework_core::{
cfgsync::{StaticNodeConfigProvider, serialize_yaml_config},
scenario::{Application, DefaultFeedRuntime, DynError, NodeAccess},
};
pub type MetricsCounterTopology = testing_framework_core::topology::ClusterTopology;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MetricsCounterNodeConfig {
pub node_id: u64,
pub http_port: u16,
}
#[derive(Clone)]
pub struct MetricsCounterHttpClient {
base_url: Url,
client: reqwest::Client,
}
impl MetricsCounterHttpClient {
#[must_use]
pub fn new(base_url: Url) -> Self {
Self {
base_url,
client: reqwest::Client::new(),
}
}
pub async fn get<T: serde::de::DeserializeOwned>(&self, path: &str) -> Result<T, DynError> {
let url = self.base_url.join(path)?;
let response = self.client.get(url).send().await?.error_for_status()?;
Ok(response.json().await?)
}
pub async fn post<B: Serialize, T: serde::de::DeserializeOwned>(
&self,
path: &str,
body: &B,
) -> Result<T, DynError> {
let url = self.base_url.join(path)?;
let response = self
.client
.post(url)
.json(body)
.send()
.await?
.error_for_status()?;
Ok(response.json().await?)
}
}
pub struct MetricsCounterEnv;
#[async_trait]
impl Application for MetricsCounterEnv {
type Deployment = MetricsCounterTopology;
type NodeClient = MetricsCounterHttpClient;
type NodeConfig = MetricsCounterNodeConfig;
type FeedRuntime = DefaultFeedRuntime;
fn build_node_client(access: &NodeAccess) -> Result<Self::NodeClient, DynError> {
Ok(MetricsCounterHttpClient::new(access.api_base_url()?))
}
fn node_readiness_path() -> &'static str {
"/health/ready"
}
}
impl StaticNodeConfigProvider for MetricsCounterEnv {
type Error = serde_yaml::Error;
fn build_node_config(
_deployment: &Self::Deployment,
node_index: usize,
) -> Result<Self::NodeConfig, Self::Error> {
Ok(MetricsCounterNodeConfig {
node_id: node_index as u64,
http_port: 8080,
})
}
fn serialize_node_config(config: &Self::NodeConfig) -> Result<String, Self::Error> {
serialize_yaml_config(config)
}
}

View File

@ -0,0 +1,94 @@
use std::{env, fs, path::Path};
use reqwest::Url;
use testing_framework_core::scenario::DynError;
use testing_framework_runner_compose::{
BinaryConfigNodeSpec, ComposeBinaryApp, EnvEntry, NodeDescriptor,
};
use crate::MetricsCounterEnv;
const NODE_CONFIG_PATH: &str = "/etc/metrics-counter/config.yaml";
const PROM_CONFIG_MOUNT_PATH: &str = "/etc/prometheus/prometheus.yml:ro";
const PROMETHEUS_SERVICE_NAME: &str = "prometheus";
const PROMETHEUS_CONTAINER_PORT: u16 = 9090;
const DEFAULT_PROMETHEUS_QUERY_PORT: u16 = 19091;
impl ComposeBinaryApp for MetricsCounterEnv {
fn compose_node_spec() -> BinaryConfigNodeSpec {
BinaryConfigNodeSpec::conventional(
"/usr/local/bin/metrics-counter-node",
NODE_CONFIG_PATH,
vec![8080, 8081],
)
}
fn prepare_compose_workspace(
path: &Path,
topology: &<Self as testing_framework_core::scenario::Application>::Deployment,
_metrics_otlp_ingest_url: Option<&Url>,
) -> Result<(), DynError> {
if prometheus_query_port().is_none() {
return Ok(());
}
let stack_dir = path
.parent()
.ok_or_else(|| anyhow::anyhow!("cfgsync path has no parent"))?;
let configs_dir = stack_dir.join("configs");
fs::create_dir_all(&configs_dir)?;
fs::write(
configs_dir.join("prometheus.yml"),
render_prometheus_config(topology),
)?;
Ok(())
}
fn compose_extra_services(
_topology: &<Self as testing_framework_core::scenario::Application>::Deployment,
) -> Result<Vec<NodeDescriptor>, DynError> {
let Some(prom_port) = prometheus_query_port() else {
return Ok(Vec::new());
};
Ok(vec![NodeDescriptor::new(
PROMETHEUS_SERVICE_NAME,
"prom/prometheus:v2.54.1",
vec![
"/bin/prometheus".to_owned(),
"--config.file=/etc/prometheus/prometheus.yml".to_owned(),
"--web.listen-address=0.0.0.0:9090".to_owned(),
],
vec![prometheus_config_volume()],
vec![],
vec![format!("127.0.0.1:{prom_port}:{PROMETHEUS_CONTAINER_PORT}")],
vec![PROMETHEUS_CONTAINER_PORT],
vec![EnvEntry::new("TZ", "UTC")],
None,
)])
}
}
fn prometheus_config_volume() -> String {
format!("./stack/configs/prometheus.yml:{PROM_CONFIG_MOUNT_PATH}")
}
fn prometheus_query_port() -> Option<u16> {
let raw = env::var("LOGOS_BLOCKCHAIN_METRICS_QUERY_URL")
.unwrap_or_else(|_| format!("http://127.0.0.1:{DEFAULT_PROMETHEUS_QUERY_PORT}"));
let parsed = Url::parse(raw.trim()).ok()?;
parsed
.port_or_known_default()
.or(Some(DEFAULT_PROMETHEUS_QUERY_PORT))
}
fn render_prometheus_config(topology: &crate::MetricsCounterTopology) -> String {
let targets = (0..topology.node_count)
.map(|index| format!("\"node-{index}:8080\""))
.collect::<Vec<_>>()
.join(", ");
format!(
"global:\n scrape_interval: 1s\nscrape_configs:\n - job_name: metrics_counter\n metrics_path: /metrics\n static_configs:\n - targets: [{targets}]\n"
)
}

View File

@ -0,0 +1,183 @@
use std::{collections::BTreeMap, env};
use testing_framework_core::scenario::DynError;
use testing_framework_runner_k8s::{
BinaryConfigK8sSpec, HelmManifest, K8sBinaryApp,
k8s_openapi::{
api::{
apps::v1::{Deployment, DeploymentSpec},
core::v1::{
ConfigMap, ConfigMapVolumeSource, Container, ContainerPort, PodSpec,
PodTemplateSpec, Service, ServicePort, ServiceSpec, Volume, VolumeMount,
},
},
apimachinery::pkg::{
apis::meta::v1::{LabelSelector, ObjectMeta},
util::intstr::IntOrString,
},
},
};
use crate::MetricsCounterEnv;
const CHART_NAME: &str = "metrics-counter";
const NODE_NAME_PREFIX: &str = "metrics-counter-node";
const NODE_CONFIG_PATH: &str = "/etc/metrics-counter/config.yaml";
const CONTAINER_HTTP_PORT: u16 = 8080;
const SERVICE_TESTING_PORT: u16 = 8081;
const PROMETHEUS_SERVICE_NAME: &str = "metrics-counter-prometheus";
const PROMETHEUS_CONTAINER_PORT: u16 = 9090;
const DEFAULT_PROMETHEUS_NODE_PORT: u16 = 30991;
impl K8sBinaryApp for MetricsCounterEnv {
fn k8s_binary_spec() -> BinaryConfigK8sSpec {
BinaryConfigK8sSpec::conventional(
CHART_NAME,
NODE_NAME_PREFIX,
"/usr/local/bin/metrics-counter-node",
NODE_CONFIG_PATH,
CONTAINER_HTTP_PORT,
SERVICE_TESTING_PORT,
)
}
fn extend_k8s_manifest(
topology: &Self::Deployment,
manifest: &mut HelmManifest,
) -> Result<(), DynError> {
manifest.extend(render_prometheus_assets(topology)?);
Ok(())
}
}
fn render_prometheus_assets(
topology: &crate::MetricsCounterTopology,
) -> Result<HelmManifest, DynError> {
let mut manifest = HelmManifest::new();
manifest.push_yaml(&prometheus_config_map(topology))?;
manifest.push_yaml(&prometheus_deployment())?;
manifest.push_yaml(&prometheus_service())?;
Ok(manifest)
}
fn prometheus_config_map(topology: &crate::MetricsCounterTopology) -> ConfigMap {
let mut data = BTreeMap::new();
data.insert(
"prometheus.yml".to_owned(),
render_prometheus_config(topology),
);
ConfigMap {
metadata: ObjectMeta {
name: Some(prometheus_config_name()),
..Default::default()
},
data: Some(data),
..Default::default()
}
}
fn prometheus_deployment() -> Deployment {
let labels = prometheus_labels();
Deployment {
metadata: ObjectMeta {
name: Some(PROMETHEUS_SERVICE_NAME.to_owned()),
..Default::default()
},
spec: Some(DeploymentSpec {
replicas: Some(1),
selector: LabelSelector {
match_labels: Some(labels.clone()),
..Default::default()
},
template: PodTemplateSpec {
metadata: Some(ObjectMeta {
labels: Some(labels),
..Default::default()
}),
spec: Some(PodSpec {
containers: vec![Container {
name: "prometheus".to_owned(),
image: Some("prom/prometheus:v2.54.1".to_owned()),
args: Some(vec![
"--config.file=/etc/prometheus/prometheus.yml".to_owned(),
format!("--web.listen-address=0.0.0.0:{PROMETHEUS_CONTAINER_PORT}"),
]),
ports: Some(vec![ContainerPort {
container_port: i32::from(PROMETHEUS_CONTAINER_PORT),
..Default::default()
}]),
volume_mounts: Some(vec![VolumeMount {
name: "config".to_owned(),
mount_path: "/etc/prometheus/prometheus.yml".to_owned(),
sub_path: Some("prometheus.yml".to_owned()),
..Default::default()
}]),
..Default::default()
}],
volumes: Some(vec![Volume {
name: "config".to_owned(),
config_map: Some(ConfigMapVolumeSource {
name: prometheus_config_name(),
..Default::default()
}),
..Default::default()
}]),
..Default::default()
}),
},
..Default::default()
}),
..Default::default()
}
}
fn prometheus_service() -> Service {
Service {
metadata: ObjectMeta {
name: Some(PROMETHEUS_SERVICE_NAME.to_owned()),
..Default::default()
},
spec: Some(ServiceSpec {
selector: Some(prometheus_labels()),
type_: Some("NodePort".to_owned()),
ports: Some(vec![ServicePort {
name: Some("http".to_owned()),
port: i32::from(PROMETHEUS_CONTAINER_PORT),
target_port: Some(IntOrString::Int(i32::from(PROMETHEUS_CONTAINER_PORT))),
node_port: Some(i32::from(prometheus_query_port())),
protocol: Some("TCP".to_owned()),
..Default::default()
}]),
..Default::default()
}),
..Default::default()
}
}
fn render_prometheus_config(topology: &crate::MetricsCounterTopology) -> String {
let targets = (0..topology.node_count)
.map(|index| format!("\"{NODE_NAME_PREFIX}-{index}:{CONTAINER_HTTP_PORT}\""))
.collect::<Vec<_>>()
.join(", ");
format!(
"global:\n scrape_interval: 1s\nscrape_configs:\n - job_name: metrics_counter\n metrics_path: /metrics\n static_configs:\n - targets: [{targets}]\n"
)
}
fn prometheus_query_port() -> u16 {
env::var("METRICS_COUNTER_K8S_PROMETHEUS_NODE_PORT")
.ok()
.and_then(|value| value.trim().parse().ok())
.unwrap_or(DEFAULT_PROMETHEUS_NODE_PORT)
}
fn prometheus_labels() -> BTreeMap<String, String> {
BTreeMap::from([("app".to_owned(), PROMETHEUS_SERVICE_NAME.to_owned())])
}
fn prometheus_config_name() -> String {
format!("{PROMETHEUS_SERVICE_NAME}-config")
}

View File

@ -0,0 +1,12 @@
mod app;
mod compose_env;
mod k8s_env;
mod local_env;
pub mod scenario;
pub use app::*;
pub use scenario::{MetricsCounterBuilderExt, MetricsCounterScenarioBuilder};
pub type MetricsCounterComposeDeployer =
testing_framework_runner_compose::ComposeDeployer<MetricsCounterEnv>;
pub type MetricsCounterK8sDeployer = testing_framework_runner_k8s::K8sDeployer<MetricsCounterEnv>;

View File

@ -0,0 +1,44 @@
use std::collections::HashMap;
use testing_framework_core::scenario::{DynError, StartNodeOptions};
use testing_framework_runner_local::{
LocalBinaryApp, LocalNodePorts, LocalPeerNode, LocalProcessSpec, yaml_node_config,
};
use crate::{MetricsCounterEnv, MetricsCounterNodeConfig};
impl LocalBinaryApp for MetricsCounterEnv {
fn initial_node_name_prefix() -> &'static str {
"metrics-counter-node"
}
fn build_local_node_config_with_peers(
_topology: &Self::Deployment,
index: usize,
ports: &LocalNodePorts,
_peers: &[LocalPeerNode],
_peer_ports_by_name: &HashMap<String, u16>,
_options: &StartNodeOptions<Self>,
_template_config: Option<
&<Self as testing_framework_core::scenario::Application>::NodeConfig,
>,
) -> Result<<Self as testing_framework_core::scenario::Application>::NodeConfig, DynError> {
Ok(MetricsCounterNodeConfig {
node_id: index as u64,
http_port: ports.network_port(),
})
}
fn local_process_spec() -> LocalProcessSpec {
LocalProcessSpec::new("METRICS_COUNTER_NODE_BIN", "metrics-counter-node")
.with_rust_log("metrics_counter_node=info")
}
fn render_local_config(config: &MetricsCounterNodeConfig) -> Result<Vec<u8>, DynError> {
yaml_node_config(config)
}
fn http_api_port(config: &MetricsCounterNodeConfig) -> u16 {
config.http_port
}
}

View File

@ -0,0 +1,15 @@
use testing_framework_core::scenario::ScenarioBuilder;
use crate::{MetricsCounterEnv, MetricsCounterTopology};
pub type MetricsCounterScenarioBuilder = ScenarioBuilder<MetricsCounterEnv>;
pub trait MetricsCounterBuilderExt: Sized {
fn deployment_with(f: impl FnOnce(MetricsCounterTopology) -> MetricsCounterTopology) -> Self;
}
impl MetricsCounterBuilderExt for MetricsCounterScenarioBuilder {
fn deployment_with(f: impl FnOnce(MetricsCounterTopology) -> MetricsCounterTopology) -> Self {
MetricsCounterScenarioBuilder::with_deployment(f(MetricsCounterTopology::new(3)))
}
}

View File

@ -0,0 +1,15 @@
[package]
edition.workspace = true
license.workspace = true
name = "metrics-counter-runtime-workloads"
version.workspace = true
[dependencies]
metrics-counter-runtime-ext = { path = "../integration" }
testing-framework-core = { workspace = true }
async-trait = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }

View File

@ -0,0 +1,50 @@
use async_trait::async_trait;
use metrics_counter_runtime_ext::MetricsCounterEnv;
use testing_framework_core::scenario::{DynError, Expectation, RunContext};
use tracing::info;
#[derive(Clone)]
pub struct PrometheusCounterAtLeast {
min_total: f64,
}
impl PrometheusCounterAtLeast {
#[must_use]
pub const fn new(min_total: f64) -> Self {
Self { min_total }
}
}
#[async_trait]
impl Expectation<MetricsCounterEnv> for PrometheusCounterAtLeast {
fn name(&self) -> &str {
"prometheus_counter_at_least"
}
async fn evaluate(&mut self, ctx: &RunContext<MetricsCounterEnv>) -> Result<(), DynError> {
if !ctx.telemetry().is_configured() {
return Err(
"prometheus endpoint unavailable; set LOGOS_BLOCKCHAIN_METRICS_QUERY_URL".into(),
);
}
let total = ctx
.telemetry()
.counter_value("sum(metrics_counter_increments_total)")?;
if total < self.min_total {
return Err(format!(
"metrics_counter_increments_total below threshold: total={total}, min={}",
self.min_total
)
.into());
}
info!(
total,
min = self.min_total,
"prometheus counter threshold met"
);
Ok(())
}
}

View File

@ -0,0 +1,83 @@
use std::time::Duration;
use async_trait::async_trait;
use metrics_counter_runtime_ext::MetricsCounterEnv;
use serde::Serialize;
use testing_framework_core::scenario::{DynError, RunContext, Workload};
use tracing::info;
#[derive(Clone)]
pub struct CounterIncrementWorkload {
operations: usize,
rate_per_sec: Option<usize>,
}
#[derive(Serialize)]
struct IncrementRequest {}
impl CounterIncrementWorkload {
#[must_use]
pub fn new() -> Self {
Self {
operations: 200,
rate_per_sec: Some(25),
}
}
#[must_use]
pub const fn operations(mut self, value: usize) -> Self {
self.operations = value;
self
}
#[must_use]
pub const fn rate_per_sec(mut self, value: usize) -> Self {
self.rate_per_sec = Some(value);
self
}
}
impl Default for CounterIncrementWorkload {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Workload<MetricsCounterEnv> for CounterIncrementWorkload {
fn name(&self) -> &str {
"counter_increment_workload"
}
async fn start(&self, ctx: &RunContext<MetricsCounterEnv>) -> Result<(), DynError> {
let clients = ctx.node_clients().snapshot();
if clients.is_empty() {
return Err("no metrics-counter node clients available".into());
}
let interval = self.rate_per_sec.and_then(compute_interval);
info!(operations = self.operations, rate_per_sec = ?self.rate_per_sec, "starting counter increment workload");
for idx in 0..self.operations {
let client = &clients[idx % clients.len()];
let _: serde_json::Value = client.post("/counter/inc", &IncrementRequest {}).await?;
if (idx + 1) % 50 == 0 {
info!(completed = idx + 1, "counter increment progress");
}
if let Some(delay) = interval {
tokio::time::sleep(delay).await;
}
}
Ok(())
}
}
fn compute_interval(rate_per_sec: usize) -> Option<Duration> {
if rate_per_sec == 0 {
return None;
}
Some(Duration::from_millis((1000 / rate_per_sec as u64).max(1)))
}

View File

@ -0,0 +1,9 @@
mod expectations;
mod increment;
pub use expectations::PrometheusCounterAtLeast;
pub use increment::CounterIncrementWorkload;
pub use metrics_counter_runtime_ext::{
MetricsCounterBuilderExt, MetricsCounterEnv, MetricsCounterScenarioBuilder,
MetricsCounterTopology,
};

View File

@ -0,0 +1,35 @@
# Redis Streams Example
This example uses Redis Streams consumer groups.
There are two scenarios. One produces messages, consumes them, acknowledges
them, and checks that the pending queue drains to zero. The other leaves
messages pending on one consumer and has a second consumer reclaim them with
`XAUTOCLAIM`.
This example is compose-only.
## How TF runs this
Each example follows the same pattern:
- TF starts Redis in Docker Compose
- a workload drives stream and consumer-group operations through the Redis client
- an expectation checks health and pending-message state
## Scenarios
- `compose_roundtrip` covers stream setup, production, consumer-group reads, acknowledgements, and pending drain
- `compose_failover` simulates pending-message reclaim after one consumer stops acknowledging
## Run with Docker Compose
```bash
cargo run -p redis-streams-examples --bin compose_roundtrip
```
## Run the reclaim scenario
```bash
cargo run -p redis-streams-examples --bin compose_failover
```

View File

@ -0,0 +1,15 @@
[package]
edition.workspace = true
license.workspace = true
name = "redis-streams-examples"
version.workspace = true
[dependencies]
anyhow = "1.0"
redis-streams-runtime-ext = { path = "../testing/integration" }
redis-streams-runtime-workloads = { path = "../testing/workloads" }
testing-framework-core = { workspace = true }
testing-framework-runner-compose = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

View File

@ -0,0 +1,49 @@
use std::time::Duration;
use anyhow::{Context as _, Result};
use redis_streams_runtime_ext::RedisStreamsComposeDeployer;
use redis_streams_runtime_workloads::{
RedisStreamsBuilderExt, RedisStreamsClusterHealthy, RedisStreamsReclaimFailoverWorkload,
RedisStreamsScenarioBuilder,
};
use testing_framework_core::scenario::Deployer;
use testing_framework_runner_compose::ComposeRunnerError;
use tracing::{info, warn};
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let mut scenario = RedisStreamsScenarioBuilder::deployment_with(|topology| topology)
.with_run_duration(Duration::from_secs(30))
.with_workload(
RedisStreamsReclaimFailoverWorkload::new("tf-stream", "tf-group")
.messages(300)
.batch(64),
)
.with_expectation(RedisStreamsClusterHealthy::new())
.build()?;
let deployer = RedisStreamsComposeDeployer::new();
let runner = match deployer.deploy(&scenario).await {
Ok(runner) => runner,
Err(ComposeRunnerError::DockerUnavailable) => {
warn!("docker unavailable; skipping redis streams compose failover run");
return Ok(());
}
Err(error) => {
return Err(anyhow::Error::new(error))
.context("deploying redis streams compose failover stack");
}
};
info!("running redis streams compose failover scenario");
runner
.run(&mut scenario)
.await
.context("running redis streams compose failover scenario")?;
Ok(())
}

View File

@ -0,0 +1,48 @@
use std::time::Duration;
use anyhow::{Context as _, Result};
use redis_streams_runtime_ext::RedisStreamsComposeDeployer;
use redis_streams_runtime_workloads::{
RedisStreamsBuilderExt, RedisStreamsClusterHealthy, RedisStreamsRoundTripWorkload,
RedisStreamsScenarioBuilder,
};
use testing_framework_core::scenario::Deployer;
use testing_framework_runner_compose::ComposeRunnerError;
use tracing::{info, warn};
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let mut scenario = RedisStreamsScenarioBuilder::deployment_with(|topology| topology)
.with_run_duration(Duration::from_secs(30))
.with_workload(
RedisStreamsRoundTripWorkload::new("tf-stream", "tf-group")
.messages(300)
.read_batch(50),
)
.with_expectation(RedisStreamsClusterHealthy::new())
.build()?;
let deployer = RedisStreamsComposeDeployer::new();
let runner = match deployer.deploy(&scenario).await {
Ok(runner) => runner,
Err(ComposeRunnerError::DockerUnavailable) => {
warn!("docker unavailable; skipping redis streams compose run");
return Ok(());
}
Err(error) => {
return Err(anyhow::Error::new(error)).context("deploying redis streams compose stack");
}
};
info!("running redis streams compose roundtrip scenario");
runner
.run(&mut scenario)
.await
.context("running redis streams compose scenario")?;
Ok(())
}

View File

@ -0,0 +1,13 @@
[package]
edition.workspace = true
license.workspace = true
name = "redis-streams-runtime-ext"
version.workspace = true
[dependencies]
testing-framework-core = { workspace = true }
testing-framework-runner-compose = { workspace = true }
async-trait = { workspace = true }
redis = { version = "0.29", features = ["aio", "connection-manager", "streams", "tokio-comp"] }
reqwest = { workspace = true }

View File

@ -0,0 +1,173 @@
use async_trait::async_trait;
use redis::{
AsyncCommands, Client,
aio::ConnectionManager,
streams::{StreamAutoClaimOptions, StreamAutoClaimReply, StreamPendingReply, StreamReadReply},
};
use testing_framework_core::{
cfgsync::{StaticNodeConfigProvider, serialize_plain_text_config},
scenario::{Application, DefaultFeedRuntime, DynError, NodeAccess},
};
pub type RedisStreamsTopology = testing_framework_core::topology::ClusterTopology;
#[derive(Clone)]
pub struct RedisStreamsClient {
url: String,
client: Client,
}
impl RedisStreamsClient {
pub fn new(url: String) -> Result<Self, DynError> {
let client = Client::open(url.clone())?;
Ok(Self { url, client })
}
pub fn url(&self) -> &str {
&self.url
}
async fn connection(&self) -> Result<ConnectionManager, DynError> {
Ok(self.client.get_connection_manager().await?)
}
pub async fn ping(&self) -> Result<(), DynError> {
let mut conn = self.connection().await?;
redis::cmd("PING").query_async::<String>(&mut conn).await?;
Ok(())
}
pub async fn ensure_group(&self, stream: &str, group: &str) -> Result<(), DynError> {
let mut conn = self.connection().await?;
let result = redis::cmd("XGROUP")
.arg("CREATE")
.arg(stream)
.arg(group)
.arg("$")
.arg("MKSTREAM")
.query_async::<()>(&mut conn)
.await;
match result {
Ok(()) => Ok(()),
Err(error) if error.to_string().contains("BUSYGROUP") => Ok(()),
Err(error) => Err(error.into()),
}
}
pub async fn append_message(&self, stream: &str, payload: &str) -> Result<String, DynError> {
let mut conn = self.connection().await?;
let id = redis::cmd("XADD")
.arg(stream)
.arg("*")
.arg("payload")
.arg(payload)
.query_async::<String>(&mut conn)
.await?;
Ok(id)
}
pub async fn read_group_batch(
&self,
stream: &str,
group: &str,
consumer: &str,
count: usize,
block_ms: usize,
) -> Result<Vec<String>, DynError> {
let mut conn = self.connection().await?;
let options = redis::streams::StreamReadOptions::default()
.group(group, consumer)
.count(count)
.block(block_ms);
let reply: StreamReadReply = conn.xread_options(&[stream], &[">"], &options).await?;
let mut ids = Vec::new();
for key in reply.keys {
for entry in key.ids {
ids.push(entry.id);
}
}
Ok(ids)
}
pub async fn ack_messages(
&self,
stream: &str,
group: &str,
ids: &[String],
) -> Result<u64, DynError> {
if ids.is_empty() {
return Ok(0);
}
let mut conn = self.connection().await?;
let acked = redis::cmd("XACK")
.arg(stream)
.arg(group)
.arg(ids)
.query_async::<u64>(&mut conn)
.await?;
Ok(acked)
}
pub async fn autoclaim_batch(
&self,
stream: &str,
group: &str,
consumer: &str,
min_idle_ms: u64,
start: &str,
count: usize,
) -> Result<(String, Vec<String>), DynError> {
let mut conn = self.connection().await?;
let options = StreamAutoClaimOptions::default().count(count);
let reply: StreamAutoClaimReply = conn
.xautoclaim_options(stream, group, consumer, min_idle_ms, start, options)
.await?;
Ok((
reply.next_stream_id,
reply.claimed.into_iter().map(|entry| entry.id).collect(),
))
}
pub async fn pending_count(&self, stream: &str, group: &str) -> Result<u64, DynError> {
let mut conn = self.connection().await?;
let reply: StreamPendingReply = conn.xpending(stream, group).await?;
Ok(reply.count() as u64)
}
}
pub struct RedisStreamsEnv;
#[async_trait]
impl Application for RedisStreamsEnv {
type Deployment = RedisStreamsTopology;
type NodeClient = RedisStreamsClient;
type NodeConfig = String;
type FeedRuntime = DefaultFeedRuntime;
fn build_node_client(access: &NodeAccess) -> Result<Self::NodeClient, DynError> {
let port = access.testing_port().unwrap_or(access.api_port());
RedisStreamsClient::new(format!("redis://{}:{port}", access.host()))
}
}
impl StaticNodeConfigProvider for RedisStreamsEnv {
type Error = std::convert::Infallible;
fn build_node_config(
_deployment: &Self::Deployment,
_node_index: usize,
) -> Result<Self::NodeConfig, Self::Error> {
Ok("appendonly yes\nprotected-mode no\n".to_owned())
}
fn serialize_node_config(config: &Self::NodeConfig) -> Result<String, Self::Error> {
serialize_plain_text_config(config)
}
}

View File

@ -0,0 +1,105 @@
use std::{env, fs, io::Error, path::Path};
use testing_framework_core::{
cfgsync::StaticArtifactRenderer, scenario::DynError, topology::DeploymentDescriptor,
};
use testing_framework_runner_compose::{
ComposeDeployEnv, ComposeNodeConfigFileName, ComposeReadinessProbe, EnvEntry,
LoopbackNodeRuntimeSpec, infrastructure::ports::NodeHostPorts,
};
use crate::{RedisStreamsClient, RedisStreamsEnv};
const NODE_CONFIG_PATH: &str = "/usr/local/etc/redis/redis.conf";
const REDIS_PORT: u16 = 6379;
impl ComposeDeployEnv for RedisStreamsEnv {
fn prepare_compose_configs(
path: &Path,
topology: &Self::Deployment,
_cfgsync_port: u16,
_metrics_otlp_ingest_url: Option<&reqwest::Url>,
) -> Result<(), DynError> {
let hostnames = Self::cfgsync_hostnames(topology);
let configs_dir = path
.parent()
.ok_or_else(|| Error::other("cfgsync path has no parent"))?
.join("configs");
fs::create_dir_all(&configs_dir)?;
for index in 0..topology.node_count() {
let mut config = <Self as StaticArtifactRenderer>::build_node_config(topology, index)?;
<Self as StaticArtifactRenderer>::rewrite_for_hostnames(
topology,
index,
&hostnames,
&mut config,
)?;
let rendered = <Self as StaticArtifactRenderer>::serialize_node_config(&config)?;
fs::write(
configs_dir.join(Self::static_node_config_file_name(index)),
rendered,
)?;
}
Ok(())
}
fn static_node_config_file_name(index: usize) -> String {
ComposeNodeConfigFileName::FixedExtension("conf").resolve(index)
}
fn loopback_node_runtime_spec(
_topology: &Self::Deployment,
index: usize,
) -> Result<Option<LoopbackNodeRuntimeSpec>, DynError> {
Ok(Some(build_redis_runtime(index)))
}
fn node_client_from_ports(
ports: &NodeHostPorts,
host: &str,
) -> Result<Self::NodeClient, DynError> {
redis_client_from_ports(ports, host)
}
fn readiness_probe() -> ComposeReadinessProbe {
ComposeReadinessProbe::Tcp
}
}
fn build_redis_runtime(index: usize) -> LoopbackNodeRuntimeSpec {
let image = env::var("REDIS_STREAMS_IMAGE").unwrap_or_else(|_| "redis:7".to_owned());
let platform = env::var("REDIS_STREAMS_PLATFORM").ok();
LoopbackNodeRuntimeSpec {
image,
entrypoint: build_redis_entrypoint(),
volumes: vec![format!(
"./stack/configs/node-{index}.conf:{NODE_CONFIG_PATH}:ro"
)],
extra_hosts: vec![],
container_ports: vec![REDIS_PORT],
environment: vec![EnvEntry::new("RUST_LOG", "info")],
platform,
}
}
fn redis_client_from_ports(
ports: &NodeHostPorts,
host: &str,
) -> Result<RedisStreamsClient, DynError> {
RedisStreamsClient::new(format!("redis://{host}:{}", ports.api))
}
fn build_redis_entrypoint() -> Vec<String> {
vec![
"redis-server".to_owned(),
NODE_CONFIG_PATH.to_owned(),
"--bind".to_owned(),
"0.0.0.0".to_owned(),
"--port".to_owned(),
REDIS_PORT.to_string(),
"--protected-mode".to_owned(),
"no".to_owned(),
]
}

View File

@ -0,0 +1,9 @@
mod app;
mod compose_env;
pub mod scenario;
pub use app::*;
pub use scenario::{RedisStreamsBuilderExt, RedisStreamsScenarioBuilder};
pub type RedisStreamsComposeDeployer =
testing_framework_runner_compose::ComposeDeployer<RedisStreamsEnv>;

View File

@ -0,0 +1,15 @@
use testing_framework_core::scenario::ScenarioBuilder;
use crate::{RedisStreamsEnv, RedisStreamsTopology};
pub type RedisStreamsScenarioBuilder = ScenarioBuilder<RedisStreamsEnv>;
pub trait RedisStreamsBuilderExt: Sized {
fn deployment_with(f: impl FnOnce(RedisStreamsTopology) -> RedisStreamsTopology) -> Self;
}
impl RedisStreamsBuilderExt for RedisStreamsScenarioBuilder {
fn deployment_with(f: impl FnOnce(RedisStreamsTopology) -> RedisStreamsTopology) -> Self {
RedisStreamsScenarioBuilder::with_deployment(f(RedisStreamsTopology::new(3)))
}
}

View File

@ -0,0 +1,13 @@
[package]
edition.workspace = true
license.workspace = true
name = "redis-streams-runtime-workloads"
version.workspace = true
[dependencies]
redis-streams-runtime-ext = { path = "../integration" }
testing-framework-core = { workspace = true }
async-trait = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }

View File

@ -0,0 +1,74 @@
use std::time::Duration;
use async_trait::async_trait;
use redis_streams_runtime_ext::{RedisStreamsClient, RedisStreamsEnv};
use testing_framework_core::scenario::{DynError, Expectation, RunContext};
use tokio::time::Instant;
use tracing::info;
#[derive(Clone)]
pub struct RedisStreamsClusterHealthy {
timeout: Duration,
poll_interval: Duration,
}
impl RedisStreamsClusterHealthy {
#[must_use]
pub const fn new() -> Self {
Self {
timeout: Duration::from_secs(20),
poll_interval: Duration::from_millis(500),
}
}
#[must_use]
pub const fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
}
impl Default for RedisStreamsClusterHealthy {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Expectation<RedisStreamsEnv> for RedisStreamsClusterHealthy {
fn name(&self) -> &str {
"redis_streams_cluster_healthy"
}
async fn evaluate(&mut self, ctx: &RunContext<RedisStreamsEnv>) -> Result<(), DynError> {
let clients = ctx.node_clients().snapshot();
if clients.is_empty() {
return Err("no redis streams node clients available".into());
}
let deadline = Instant::now() + self.timeout;
while Instant::now() < deadline {
if all_nodes_healthy(&clients).await? {
info!(nodes = clients.len(), "redis streams cluster healthy");
return Ok(());
}
tokio::time::sleep(self.poll_interval).await;
}
Err(format!(
"redis streams cluster not healthy within {:?}",
self.timeout
)
.into())
}
}
async fn all_nodes_healthy(clients: &[RedisStreamsClient]) -> Result<bool, DynError> {
for client in clients {
if client.ping().await.is_err() {
return Ok(false);
}
}
Ok(true)
}

View File

@ -0,0 +1,10 @@
mod health;
mod reclaim_failover;
mod roundtrip;
pub use health::RedisStreamsClusterHealthy;
pub use reclaim_failover::RedisStreamsReclaimFailoverWorkload;
pub use redis_streams_runtime_ext::{
RedisStreamsBuilderExt, RedisStreamsEnv, RedisStreamsScenarioBuilder, RedisStreamsTopology,
};
pub use roundtrip::RedisStreamsRoundTripWorkload;

View File

@ -0,0 +1,202 @@
use std::time::Duration;
use async_trait::async_trait;
use redis_streams_runtime_ext::{RedisStreamsClient, RedisStreamsEnv};
use testing_framework_core::scenario::{DynError, RunContext, Workload};
use tokio::time::Instant;
use tracing::info;
#[derive(Clone)]
pub struct RedisStreamsReclaimFailoverWorkload {
stream: String,
group: String,
producer_consumer: String,
failover_consumer: String,
messages: usize,
batch: usize,
timeout: Duration,
}
impl RedisStreamsReclaimFailoverWorkload {
#[must_use]
pub fn new(stream: impl Into<String>, group: impl Into<String>) -> Self {
Self {
stream: stream.into(),
group: group.into(),
producer_consumer: "worker-a".to_owned(),
failover_consumer: "worker-b".to_owned(),
messages: 200,
batch: 64,
timeout: Duration::from_secs(20),
}
}
#[must_use]
pub const fn messages(mut self, value: usize) -> Self {
self.messages = value;
self
}
#[must_use]
pub const fn batch(mut self, value: usize) -> Self {
self.batch = value;
self
}
#[must_use]
pub fn producer_consumer(mut self, value: impl Into<String>) -> Self {
self.producer_consumer = value.into();
self
}
#[must_use]
pub fn failover_consumer(mut self, value: impl Into<String>) -> Self {
self.failover_consumer = value.into();
self
}
#[must_use]
pub const fn timeout(mut self, value: Duration) -> Self {
self.timeout = value;
self
}
}
#[async_trait]
impl Workload<RedisStreamsEnv> for RedisStreamsReclaimFailoverWorkload {
fn name(&self) -> &str {
"redis_streams_reclaim_failover_workload"
}
async fn start(&self, ctx: &RunContext<RedisStreamsEnv>) -> Result<(), DynError> {
let clients = ctx.node_clients().snapshot();
if clients.is_empty() {
return Err("redis streams failover workload requires at least 1 node client".into());
}
let driver = &clients[0];
driver.ensure_group(&self.stream, &self.group).await?;
info!(messages = self.messages, stream = %self.stream, group = %self.group, "redis streams failover: produce phase");
produce_messages(driver, &self.stream, self.messages).await?;
info!(messages = self.messages, consumer = %self.producer_consumer, "redis streams failover: create pending phase");
let pending_count = create_pending_messages(
driver,
&self.stream,
&self.group,
&self.producer_consumer,
self.messages,
self.batch,
self.timeout,
)
.await?;
info!(pending = pending_count, from = %self.producer_consumer, to = %self.failover_consumer, "redis streams failover: reclaim+ack phase");
reclaim_and_ack_pending(
driver,
&self.stream,
&self.group,
&self.failover_consumer,
pending_count,
self.batch,
self.timeout,
)
.await?;
let pending = driver.pending_count(&self.stream, &self.group).await?;
if pending != 0 {
return Err(
format!("redis streams pending entries remain after reclaim: {pending}").into(),
);
}
info!(
pending = pending_count,
"redis streams failover reclaim complete"
);
Ok(())
}
}
async fn produce_messages(
client: &RedisStreamsClient,
stream: &str,
messages: usize,
) -> Result<(), DynError> {
for idx in 0..messages {
let payload = format!("msg-{idx}");
client.append_message(stream, &payload).await?;
}
Ok(())
}
async fn create_pending_messages(
client: &RedisStreamsClient,
stream: &str,
group: &str,
consumer: &str,
expected: usize,
batch: usize,
timeout: Duration,
) -> Result<usize, DynError> {
let mut claimed = 0usize;
let deadline = Instant::now() + timeout;
while claimed < expected && Instant::now() < deadline {
let ids = client
.read_group_batch(stream, group, consumer, batch, 500)
.await?;
if ids.is_empty() {
continue;
}
claimed += ids.len();
}
if claimed == expected {
return Ok(claimed);
}
Err(format!("redis streams pending creation timed out: claimed {claimed}/{expected}").into())
}
async fn reclaim_and_ack_pending(
client: &RedisStreamsClient,
stream: &str,
group: &str,
failover_consumer: &str,
expected: usize,
batch: usize,
timeout: Duration,
) -> Result<(), DynError> {
let mut reclaimed = 0usize;
let mut cursor = "0-0".to_owned();
let deadline = Instant::now() + timeout;
while reclaimed < expected && Instant::now() < deadline {
let (next_cursor, ids) = client
.autoclaim_batch(stream, group, failover_consumer, 0, &cursor, batch)
.await?;
if ids.is_empty() {
if next_cursor == "0-0" {
break;
}
cursor = next_cursor;
continue;
}
let acked = client.ack_messages(stream, group, &ids).await? as usize;
reclaimed += acked;
cursor = next_cursor;
}
if reclaimed == expected {
return Ok(());
}
Err(format!("redis streams reclaim timed out: acked {reclaimed}/{expected}").into())
}

View File

@ -0,0 +1,131 @@
use std::time::Duration;
use async_trait::async_trait;
use redis_streams_runtime_ext::RedisStreamsEnv;
use testing_framework_core::scenario::{DynError, RunContext, Workload};
use tokio::time::Instant;
use tracing::info;
#[derive(Clone)]
pub struct RedisStreamsRoundTripWorkload {
stream: String,
group: String,
consumer: String,
messages: usize,
read_batch: usize,
timeout: Duration,
}
impl RedisStreamsRoundTripWorkload {
#[must_use]
pub fn new(stream: impl Into<String>, group: impl Into<String>) -> Self {
Self {
stream: stream.into(),
group: group.into(),
consumer: "worker-1".to_owned(),
messages: 200,
read_batch: 32,
timeout: Duration::from_secs(20),
}
}
#[must_use]
pub const fn messages(mut self, value: usize) -> Self {
self.messages = value;
self
}
#[must_use]
pub const fn read_batch(mut self, value: usize) -> Self {
self.read_batch = value;
self
}
#[must_use]
pub fn consumer(mut self, value: impl Into<String>) -> Self {
self.consumer = value.into();
self
}
#[must_use]
pub const fn timeout(mut self, value: Duration) -> Self {
self.timeout = value;
self
}
}
#[async_trait]
impl Workload<RedisStreamsEnv> for RedisStreamsRoundTripWorkload {
fn name(&self) -> &str {
"redis_streams_roundtrip_workload"
}
async fn start(&self, ctx: &RunContext<RedisStreamsEnv>) -> Result<(), DynError> {
let clients = ctx.node_clients().snapshot();
if clients.is_empty() {
return Err("redis streams workload requires at least 1 node".into());
}
let driver = &clients[0];
driver.ensure_group(&self.stream, &self.group).await?;
info!(messages = self.messages, stream = %self.stream, group = %self.group, "redis streams produce phase");
for idx in 0..self.messages {
let payload = format!("msg-{idx}");
driver.append_message(&self.stream, &payload).await?;
}
info!(messages = self.messages, stream = %self.stream, group = %self.group, "redis streams consume+ack phase");
consume_and_ack(
driver,
&self.stream,
&self.group,
&self.consumer,
self.messages,
self.read_batch,
self.timeout,
)
.await?;
let pending = driver.pending_count(&self.stream, &self.group).await?;
if pending != 0 {
return Err(format!("redis streams pending entries remain: {pending}").into());
}
info!(messages = self.messages, stream = %self.stream, group = %self.group, "redis streams roundtrip finished");
Ok(())
}
}
async fn consume_and_ack(
client: &redis_streams_runtime_ext::RedisStreamsClient,
stream: &str,
group: &str,
consumer: &str,
expected: usize,
batch: usize,
timeout: Duration,
) -> Result<(), DynError> {
let mut acked_total = 0usize;
let deadline = Instant::now() + timeout;
while acked_total < expected && Instant::now() < deadline {
let ids = client
.read_group_batch(stream, group, consumer, batch, 500)
.await?;
if ids.is_empty() {
continue;
}
let acked = client.ack_messages(stream, group, &ids).await? as usize;
acked_total += acked;
}
if acked_total == expected {
return Ok(());
}
Err(format!("redis streams timed out: acked {acked_total}/{expected} messages").into())
}