From 662b8d44ecc8cfa2476f12535a4374a277240223 Mon Sep 17 00:00:00 2001 From: andrussal Date: Fri, 10 Apr 2026 10:04:09 +0200 Subject: [PATCH] feat(examples): add protocol demo apps --- .cargo-deny.toml | 1 + Cargo.lock | 513 ++++++++++++++++++ Cargo.toml | 7 + examples/nats/README.md | 45 ++ examples/nats/examples/Cargo.toml | 16 + .../nats/examples/src/bin/basic_roundtrip.rs | 28 + .../examples/src/bin/compose_roundtrip.rs | 45 ++ .../nats/examples/src/bin/parity_check.rs | 75 +++ examples/nats/testing/integration/Cargo.toml | 15 + examples/nats/testing/integration/src/app.rs | 127 +++++ .../testing/integration/src/compose_env.rs | 86 +++ examples/nats/testing/integration/src/lib.rs | 10 + .../nats/testing/integration/src/local_env.rs | 68 +++ .../nats/testing/integration/src/scenario.rs | 15 + examples/nats/testing/workloads/Cargo.toml | 14 + examples/nats/testing/workloads/src/health.rs | 70 +++ examples/nats/testing/workloads/src/lib.rs | 6 + .../nats/testing/workloads/src/roundtrip.rs | 114 ++++ examples/pubsub/Dockerfile | 24 + examples/pubsub/README.md | 61 +++ examples/pubsub/examples/Cargo.toml | 17 + .../examples/src/bin/basic_ws_reconnect.rs | 36 ++ .../examples/src/bin/basic_ws_roundtrip.rs | 34 ++ .../examples/src/bin/compose_ws_reconnect.rs | 52 ++ .../examples/src/bin/compose_ws_roundtrip.rs | 50 ++ .../src/bin/k8s_manual_ws_roundtrip.rs | 246 +++++++++ .../examples/src/bin/k8s_ws_roundtrip.rs | 64 +++ examples/pubsub/pubsub-node/Cargo.toml | 23 + examples/pubsub/pubsub-node/src/config.rs | 29 + examples/pubsub/pubsub-node/src/main.rs | 36 ++ examples/pubsub/pubsub-node/src/server.rs | 167 ++++++ examples/pubsub/pubsub-node/src/state.rs | 179 ++++++ examples/pubsub/pubsub-node/src/sync.rs | 100 ++++ .../pubsub/testing/integration/Cargo.toml | 18 + .../pubsub/testing/integration/src/app.rs | 230 ++++++++ .../testing/integration/src/compose_env.rs | 15 + .../pubsub/testing/integration/src/k8s_env.rs | 21 + .../pubsub/testing/integration/src/lib.rs | 12 + .../testing/integration/src/local_env.rs | 41 ++ .../testing/integration/src/scenario.rs | 15 + examples/pubsub/testing/workloads/Cargo.toml | 13 + .../testing/workloads/src/convergence.rs | 104 ++++ examples/pubsub/testing/workloads/src/lib.rs | 8 + .../testing/workloads/src/ws_reconnect.rs | 201 +++++++ .../testing/workloads/src/ws_roundtrip.rs | 142 +++++ 45 files changed, 3193 insertions(+) create mode 100644 examples/nats/README.md create mode 100644 examples/nats/examples/Cargo.toml create mode 100644 examples/nats/examples/src/bin/basic_roundtrip.rs create mode 100644 examples/nats/examples/src/bin/compose_roundtrip.rs create mode 100644 examples/nats/examples/src/bin/parity_check.rs create mode 100644 examples/nats/testing/integration/Cargo.toml create mode 100644 examples/nats/testing/integration/src/app.rs create mode 100644 examples/nats/testing/integration/src/compose_env.rs create mode 100644 examples/nats/testing/integration/src/lib.rs create mode 100644 examples/nats/testing/integration/src/local_env.rs create mode 100644 examples/nats/testing/integration/src/scenario.rs create mode 100644 examples/nats/testing/workloads/Cargo.toml create mode 100644 examples/nats/testing/workloads/src/health.rs create mode 100644 examples/nats/testing/workloads/src/lib.rs create mode 100644 examples/nats/testing/workloads/src/roundtrip.rs create mode 100644 examples/pubsub/Dockerfile create mode 100644 examples/pubsub/README.md create mode 100644 examples/pubsub/examples/Cargo.toml create mode 100644 examples/pubsub/examples/src/bin/basic_ws_reconnect.rs create mode 100644 examples/pubsub/examples/src/bin/basic_ws_roundtrip.rs create mode 100644 examples/pubsub/examples/src/bin/compose_ws_reconnect.rs create mode 100644 examples/pubsub/examples/src/bin/compose_ws_roundtrip.rs create mode 100644 examples/pubsub/examples/src/bin/k8s_manual_ws_roundtrip.rs create mode 100644 examples/pubsub/examples/src/bin/k8s_ws_roundtrip.rs create mode 100644 examples/pubsub/pubsub-node/Cargo.toml create mode 100644 examples/pubsub/pubsub-node/src/config.rs create mode 100644 examples/pubsub/pubsub-node/src/main.rs create mode 100644 examples/pubsub/pubsub-node/src/server.rs create mode 100644 examples/pubsub/pubsub-node/src/state.rs create mode 100644 examples/pubsub/pubsub-node/src/sync.rs create mode 100644 examples/pubsub/testing/integration/Cargo.toml create mode 100644 examples/pubsub/testing/integration/src/app.rs create mode 100644 examples/pubsub/testing/integration/src/compose_env.rs create mode 100644 examples/pubsub/testing/integration/src/k8s_env.rs create mode 100644 examples/pubsub/testing/integration/src/lib.rs create mode 100644 examples/pubsub/testing/integration/src/local_env.rs create mode 100644 examples/pubsub/testing/integration/src/scenario.rs create mode 100644 examples/pubsub/testing/workloads/Cargo.toml create mode 100644 examples/pubsub/testing/workloads/src/convergence.rs create mode 100644 examples/pubsub/testing/workloads/src/lib.rs create mode 100644 examples/pubsub/testing/workloads/src/ws_reconnect.rs create mode 100644 examples/pubsub/testing/workloads/src/ws_roundtrip.rs diff --git a/.cargo-deny.toml b/.cargo-deny.toml index 687e5e2..0b5a87d 100644 --- a/.cargo-deny.toml +++ b/.cargo-deny.toml @@ -20,6 +20,7 @@ allow = [ "BSD-2-Clause", "BSD-3-Clause", "BSL-1.0", + "CDLA-Permissive-2.0", "ISC", "MIT", "Unicode-3.0", diff --git a/Cargo.lock b/Cargo.lock index 471e158..9f7d032 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -107,6 +107,42 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-nats" +version = "0.47.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07d6f157065c3461096d51aacde0c326fa49f3f6e0199e204c566842cdaa5299" +dependencies = [ + "base64", + "bytes", + "futures-util", + "memchr", + "nkeys", + "nuid", + "pin-project", + "portable-atomic", + "rand", + "regex", + "ring", + "rustls-native-certs", + "rustls-pki-types", + "rustls-webpki", + "serde", + "serde_json", + "serde_nanos", + "serde_repr", + "thiserror 1.0.69", + "time", + "tokio", + "tokio-rustls", + "tokio-stream", + "tokio-util", + "tokio-websockets", + "tracing", + "tryhard", + "url", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -160,6 +196,7 @@ checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" dependencies = [ "async-trait", "axum-core", + "base64", "bytes", "futures-util", "http", @@ -177,11 +214,15 @@ dependencies = [ "serde", "serde_json", "serde_path_to_error", + "serde_urlencoded", + "sha1", "sync_wrapper", "tokio", + "tokio-tungstenite", "tower", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -202,6 +243,7 @@ dependencies = [ "sync_wrapper", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -221,6 +263,12 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "base64ct" +version = "1.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06" + [[package]] name = "bitflags" version = "2.11.0" @@ -252,11 +300,20 @@ version = "3.19.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +dependencies = [ + "serde", +] [[package]] name = "cc" @@ -414,6 +471,12 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "const-oid" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" + [[package]] name = "core-foundation" version = "0.9.4" @@ -484,6 +547,49 @@ dependencies = [ "typenum", ] +[[package]] +name = "curve25519-dalek" +version = "4.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" +dependencies = [ + "cfg-if", + "cpufeatures", + "curve25519-dalek-derive", + "digest", + "fiat-crypto", + "rustc_version", + "subtle", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "data-encoding" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" + +[[package]] +name = "der" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7c1832837b905bbfb5101e07cc24c8deddf52f93225eee6ead5f4d63d53ddcb" +dependencies = [ + "const-oid", + "pem-rfc7468", + "zeroize", +] + [[package]] name = "deranged" version = "0.5.5" @@ -542,6 +648,28 @@ dependencies = [ "syn", ] +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "signature", +] + +[[package]] +name = "ed25519-dalek" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e796c081cee67dc755e1a36a0a172b897fab85fc3f6bc48307991f64e4eca9" +dependencies = [ + "curve25519-dalek", + "ed25519", + "sha2", + "signature", + "subtle", +] + [[package]] name = "educe" version = "0.6.0" @@ -635,6 +763,12 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +[[package]] +name = "fiat-crypto" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -1436,6 +1570,15 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "matchit" version = "0.7.3" @@ -1482,6 +1625,79 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nats-examples" +version = "0.1.0" +dependencies = [ + "anyhow", + "nats-runtime-ext", + "nats-runtime-workloads", + "testing-framework-core", + "testing-framework-runner-compose", + "tokio", + "tracing", + "tracing-subscriber", + "which", +] + +[[package]] +name = "nats-runtime-ext" +version = "0.1.0" +dependencies = [ + "async-nats", + "async-trait", + "reqwest", + "serde", + "testing-framework-core", + "testing-framework-runner-compose", + "testing-framework-runner-local", +] + +[[package]] +name = "nats-runtime-workloads" +version = "0.1.0" +dependencies = [ + "async-trait", + "futures-util", + "nats-runtime-ext", + "testing-framework-core", + "tokio", + "tracing", +] + +[[package]] +name = "nkeys" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879011babc47a1c7fdf5a935ae3cfe94f34645ca0cac1c7f6424b36fc743d1bf" +dependencies = [ + "data-encoding", + "ed25519", + "ed25519-dalek", + "getrandom 0.2.17", + "log", + "rand", + "signatory", +] + +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys 0.61.2", +] + +[[package]] +name = "nuid" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc895af95856f929163a0aa20c26a78d26bfdc839f51b9d5aa7a5b79e52b7e83" +dependencies = [ + "rand", +] + [[package]] name = "num-conv" version = "0.2.0" @@ -1616,6 +1832,15 @@ dependencies = [ "serde_core", ] +[[package]] +name = "pem-rfc7468" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88b39c9bfcfc231068454382784bb460aae594343fb030d46e9f50a645418412" +dependencies = [ + "base64ct", +] + [[package]] name = "percent-encoding" version = "2.3.2" @@ -1735,6 +1960,16 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkcs8" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" +dependencies = [ + "der", + "spki", +] + [[package]] name = "pkg-config" version = "0.3.32" @@ -1803,6 +2038,69 @@ dependencies = [ "url", ] +[[package]] +name = "pubsub-examples" +version = "0.1.0" +dependencies = [ + "anyhow", + "pubsub-runtime-ext", + "pubsub-runtime-workloads", + "serde", + "testing-framework-core", + "testing-framework-runner-compose", + "testing-framework-runner-k8s", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "pubsub-node" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum", + "clap", + "futures-util", + "reqwest", + "serde", + "serde_json", + "serde_yaml", + "tokio", + "tower-http", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "pubsub-runtime-ext" +version = "0.1.0" +dependencies = [ + "async-trait", + "futures-util", + "reqwest", + "serde", + "serde_json", + "testing-framework-core", + "testing-framework-runner-compose", + "testing-framework-runner-k8s", + "testing-framework-runner-local", + "tokio", + "tokio-tungstenite", +] + +[[package]] +name = "pubsub-runtime-workloads" +version = "0.1.0" +dependencies = [ + "async-trait", + "pubsub-runtime-ext", + "serde", + "testing-framework-core", + "tokio", + "tracing", +] + [[package]] name = "quote" version = "1.0.44" @@ -2158,6 +2456,15 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_nanos" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a93142f0367a4cc53ae0fead1bcda39e85beccfad3dcd717656cacab94b12985" +dependencies = [ + "serde", +] + [[package]] name = "serde_path_to_error" version = "0.1.20" @@ -2169,6 +2476,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_repr" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -2194,6 +2512,17 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.9" @@ -2205,6 +2534,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -2221,6 +2559,28 @@ dependencies = [ "libc", ] +[[package]] +name = "signatory" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e303f8205714074f6068773f0e29527e0453937fe837c9717d066635b65f31" +dependencies = [ + "pkcs8", + "rand_core", + "signature", + "zeroize", +] + +[[package]] +name = "signature" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" +dependencies = [ + "digest", + "rand_core", +] + [[package]] name = "siphasher" version = "1.0.2" @@ -2259,6 +2619,16 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "spki" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" +dependencies = [ + "base64ct", + "der", +] + [[package]] name = "stable_deref_trait" version = "1.2.1" @@ -2465,6 +2835,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "time" version = "0.3.47" @@ -2472,6 +2851,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" dependencies = [ "deranged", + "itoa", "num-conv", "powerfmt", "serde_core", @@ -2514,6 +2894,7 @@ dependencies = [ "bytes", "libc", "mio", + "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", @@ -2563,6 +2944,29 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -2577,6 +2981,27 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-websockets" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f591660438b3038dd04d16c938271c79e7e06260ad2ea2885a4861bfb238605d" +dependencies = [ + "base64", + "bytes", + "futures-core", + "futures-sink", + "http", + "httparse", + "rand", + "ring", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tokio-util", + "webpki-roots 0.26.11", +] + [[package]] name = "tower" version = "0.5.3" @@ -2657,6 +3082,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -2665,6 +3120,34 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tryhard" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fe58ebd5edd976e0fe0f8a14d2a04b7c81ef153ea9a54eebc42e67c2c23b4e5" +dependencies = [ + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand", + "sha1", + "thiserror 1.0.69", + "utf-8", +] + [[package]] name = "typenum" version = "1.19.0" @@ -2714,6 +3197,12 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8_iter" version = "1.0.4" @@ -2737,6 +3226,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "vcpkg" version = "0.2.15" @@ -2852,6 +3347,24 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.6", +] + +[[package]] +name = "webpki-roots" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22cfaf3c063993ff62e73cb4311efde4db1efb31ab78a3e5c457939ad5cc0bed" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "which" version = "6.0.3" diff --git a/Cargo.toml b/Cargo.toml index d24851f..9f242bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,13 @@ members = [ "cfgsync/artifacts", "cfgsync/core", "cfgsync/runtime", + "examples/nats/examples", + "examples/nats/testing/integration", + "examples/nats/testing/workloads", + "examples/pubsub/examples", + "examples/pubsub/pubsub-node", + "examples/pubsub/testing/integration", + "examples/pubsub/testing/workloads", "testing-framework/core", "testing-framework/deployers/compose", "testing-framework/deployers/k8s", diff --git a/examples/nats/README.md b/examples/nats/README.md new file mode 100644 index 0000000..bc0be91 --- /dev/null +++ b/examples/nats/README.md @@ -0,0 +1,45 @@ +# NATS Example + +This example uses `nats-server` as the system under test. + +The main scenario publishes messages and checks that they can be received back +through NATS. The same behavior can be run against a local `nats-server` +process or a Docker Compose deployment. + +## How TF runs this + +Each example follows the same pattern: + +- TF starts `nats-server` either as a local process or in Docker Compose +- a workload publishes and subscribes through the NATS client +- an expectation checks that the server stays healthy during the run + +## Scenarios + +- `basic_roundtrip` runs the roundtrip check against a local `nats-server` +- `compose_roundtrip` runs the same check in Docker Compose +- `parity_check` runs compose first and then runs local when `nats-server` is available + +## Run locally + +```bash +cargo run -p nats-examples --bin basic_roundtrip +``` + +If `nats-server` is not on `PATH`: + +```bash +NATS_SERVER_BIN=/path/to/nats-server cargo run -p nats-examples --bin basic_roundtrip +``` + +## Run with Docker Compose + +```bash +cargo run -p nats-examples --bin compose_roundtrip +``` + +## Run the parity check + +```bash +cargo run -p nats-examples --bin parity_check +``` diff --git a/examples/nats/examples/Cargo.toml b/examples/nats/examples/Cargo.toml new file mode 100644 index 0000000..8f32cb7 --- /dev/null +++ b/examples/nats/examples/Cargo.toml @@ -0,0 +1,16 @@ +[package] +edition.workspace = true +license.workspace = true +name = "nats-examples" +version.workspace = true + +[dependencies] +anyhow = "1.0" +nats-runtime-ext = { path = "../testing/integration" } +nats-runtime-workloads = { path = "../testing/workloads" } +testing-framework-core = { workspace = true } +testing-framework-runner-compose = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +which = "6.0" diff --git a/examples/nats/examples/src/bin/basic_roundtrip.rs b/examples/nats/examples/src/bin/basic_roundtrip.rs new file mode 100644 index 0000000..0b6e1fd --- /dev/null +++ b/examples/nats/examples/src/bin/basic_roundtrip.rs @@ -0,0 +1,28 @@ +use std::time::Duration; + +use nats_runtime_ext::NatsLocalDeployer; +use nats_runtime_workloads::{ + NatsBuilderExt, NatsClusterHealthy, NatsRoundTripWorkload, NatsScenarioBuilder, +}; +use testing_framework_core::scenario::Deployer; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "info,testing_framework_core=info".into()), + ) + .init(); + + let mut scenario = NatsScenarioBuilder::deployment_with(|topology| topology) + .with_run_duration(Duration::from_secs(25)) + .with_workload(NatsRoundTripWorkload::new("tf.roundtrip").messages(200)) + .with_expectation(NatsClusterHealthy::new()) + .build()?; + + let deployer = NatsLocalDeployer::default(); + let runner = deployer.deploy(&scenario).await?; + runner.run(&mut scenario).await?; + Ok(()) +} diff --git a/examples/nats/examples/src/bin/compose_roundtrip.rs b/examples/nats/examples/src/bin/compose_roundtrip.rs new file mode 100644 index 0000000..8ea55a4 --- /dev/null +++ b/examples/nats/examples/src/bin/compose_roundtrip.rs @@ -0,0 +1,45 @@ +use std::time::Duration; + +use anyhow::{Context as _, Result}; +use nats_runtime_ext::NatsComposeDeployer; +use nats_runtime_workloads::{ + NatsBuilderExt, NatsClusterHealthy, NatsRoundTripWorkload, NatsScenarioBuilder, +}; +use testing_framework_core::scenario::Deployer; +use testing_framework_runner_compose::ComposeRunnerError; +use tracing::{info, warn}; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "info,testing_framework_core=info".into()), + ) + .init(); + + let mut scenario = NatsScenarioBuilder::deployment_with(|topology| topology) + .with_run_duration(Duration::from_secs(30)) + .with_workload(NatsRoundTripWorkload::new("tf.roundtrip").messages(200)) + .with_expectation(NatsClusterHealthy::new()) + .build()?; + + let deployer = NatsComposeDeployer::new(); + let runner = match deployer.deploy(&scenario).await { + Ok(runner) => runner, + Err(ComposeRunnerError::DockerUnavailable) => { + warn!("docker unavailable; skipping compose nats run"); + return Ok(()); + } + Err(error) => { + return Err(anyhow::Error::new(error)).context("deploying nats compose stack"); + } + }; + + info!("running nats compose roundtrip scenario"); + runner + .run(&mut scenario) + .await + .context("running nats compose scenario")?; + Ok(()) +} diff --git a/examples/nats/examples/src/bin/parity_check.rs b/examples/nats/examples/src/bin/parity_check.rs new file mode 100644 index 0000000..2720569 --- /dev/null +++ b/examples/nats/examples/src/bin/parity_check.rs @@ -0,0 +1,75 @@ +use std::time::Duration; + +use anyhow::{Context as _, Result}; +use nats_runtime_ext::{NatsComposeDeployer, NatsLocalDeployer}; +use nats_runtime_workloads::{ + NatsBuilderExt, NatsClusterHealthy, NatsRoundTripWorkload, NatsScenarioBuilder, +}; +use testing_framework_core::scenario::Deployer; +use testing_framework_runner_compose::ComposeRunnerError; +use tracing::{info, warn}; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + run_compose().await?; + run_local_if_available().await?; + Ok(()) +} + +async fn run_compose() -> Result<()> { + let mut scenario = build_scenario(Duration::from_secs(30)).build()?; + let deployer = NatsComposeDeployer::new(); + + let runner = match deployer.deploy(&scenario).await { + Ok(runner) => runner, + Err(ComposeRunnerError::DockerUnavailable) => { + warn!("docker unavailable; skipping compose nats run"); + return Ok(()); + } + Err(error) => { + return Err(anyhow::Error::new(error)).context("deploying nats compose stack"); + } + }; + + info!("running nats compose parity check"); + runner + .run(&mut scenario) + .await + .context("running nats compose scenario")?; + Ok(()) +} + +async fn run_local_if_available() -> Result<()> { + if !has_local_nats_server() { + warn!( + "nats-server binary not found; skipping local parity check (set NATS_SERVER_BIN or add to PATH)" + ); + return Ok(()); + } + + let mut scenario = build_scenario(Duration::from_secs(25)).build()?; + let deployer = NatsLocalDeployer::default(); + let runner = deployer.deploy(&scenario).await?; + + info!("running nats local parity check"); + runner.run(&mut scenario).await?; + Ok(()) +} + +fn has_local_nats_server() -> bool { + std::env::var("NATS_SERVER_BIN") + .ok() + .is_some_and(|path| std::path::Path::new(&path).exists()) + || which::which("nats-server").is_ok() +} + +fn build_scenario(run_duration: Duration) -> NatsScenarioBuilder { + NatsScenarioBuilder::deployment_with(|topology| topology) + .with_run_duration(run_duration) + .with_workload(NatsRoundTripWorkload::new("tf.roundtrip").messages(200)) + .with_expectation(NatsClusterHealthy::new()) +} diff --git a/examples/nats/testing/integration/Cargo.toml b/examples/nats/testing/integration/Cargo.toml new file mode 100644 index 0000000..9a87ee4 --- /dev/null +++ b/examples/nats/testing/integration/Cargo.toml @@ -0,0 +1,15 @@ +[package] +edition.workspace = true +license.workspace = true +name = "nats-runtime-ext" +version.workspace = true + +[dependencies] +testing-framework-core = { workspace = true } +testing-framework-runner-compose = { workspace = true } +testing-framework-runner-local = { workspace = true } + +async-nats = "0.47" +async-trait = { workspace = true } +reqwest = { workspace = true, features = ["json"] } +serde = { workspace = true } diff --git a/examples/nats/testing/integration/src/app.rs b/examples/nats/testing/integration/src/app.rs new file mode 100644 index 0000000..42c8675 --- /dev/null +++ b/examples/nats/testing/integration/src/app.rs @@ -0,0 +1,127 @@ +use std::io::Error; + +use async_trait::async_trait; +use reqwest::Url; +use serde::{Deserialize, Serialize}; +use testing_framework_core::scenario::{ + Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView, + DefaultFeedRuntime, DynError, NodeAccess, +}; + +pub const CLUSTER_PORT_KEY: &str = "cluster"; + +pub type NatsTopology = testing_framework_core::topology::ClusterTopology; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct NatsNodeConfig { + pub node_id: u64, + pub client_port: u16, + pub monitor_port: u16, + pub cluster_port: u16, + pub routes: Vec, +} + +#[derive(Clone)] +pub struct NatsClient { + server_url: String, + monitor_base_url: Url, + http: reqwest::Client, +} + +impl NatsClient { + pub fn new(server_url: String, monitor_base_url: Url) -> Self { + Self { + server_url, + monitor_base_url, + http: reqwest::Client::new(), + } + } + + pub async fn connect(&self) -> Result { + Ok(async_nats::connect(&self.server_url).await?) + } + + pub async fn is_healthy(&self) -> Result { + let url = self.monitor_base_url.join("/healthz")?; + let response = self.http.get(url).send().await?; + Ok(response.status().is_success()) + } +} + +pub struct NatsEnv; + +#[async_trait] +impl Application for NatsEnv { + type Deployment = NatsTopology; + type NodeClient = NatsClient; + type NodeConfig = NatsNodeConfig; + type FeedRuntime = DefaultFeedRuntime; + + fn build_node_client(access: &NodeAccess) -> Result { + let client_port = access.testing_port().unwrap_or(access.api_port()); + let server_url = format!("nats://{}:{client_port}", access.host()); + let monitor_base_url = access.api_base_url()?; + Ok(NatsClient::new(server_url, monitor_base_url)) + } + + fn node_readiness_path() -> &'static str { + "/healthz" + } +} + +impl ClusterNodeConfigApplication for NatsEnv { + type ConfigError = Error; + + fn static_network_port() -> u16 { + 6222 + } + + fn static_named_ports() -> &'static [(&'static str, u16)] { + &[("client", 4222), ("monitor", 8222)] + } + + fn build_cluster_node_config( + node: &ClusterNodeView, + peers: &[ClusterPeerView], + ) -> Result { + let routes = peers + .iter() + .map(|peer| format!("nats-route://{}", peer.authority())) + .collect::>(); + + Ok(NatsNodeConfig { + node_id: node.index() as u64, + client_port: node.require_named_port("client")?, + monitor_port: node.require_named_port("monitor")?, + cluster_port: node.network_port(), + routes, + }) + } + + fn serialize_cluster_node_config( + config: &Self::NodeConfig, + ) -> Result { + Ok(render_nats_config(config)) + } +} + +pub fn render_nats_config(config: &NatsNodeConfig) -> String { + let routes = build_routes_line(&config.routes); + + format!( + "server_name: node-{node_id}\nport: {client}\nhttp_port: {monitor}\njetstream {{\n store_dir: \"jetstream\"\n}}\ncluster {{\n name: tf\n port: {cluster}\n{routes}}}\n", + node_id = config.node_id, + client = config.client_port, + monitor = config.monitor_port, + cluster = config.cluster_port, + routes = routes, + ) +} + +fn build_routes_line(routes: &[String]) -> String { + if routes.is_empty() { + return String::new(); + } + + format!(" routes: [{}]\n", routes.join(", ")) +} diff --git a/examples/nats/testing/integration/src/compose_env.rs b/examples/nats/testing/integration/src/compose_env.rs new file mode 100644 index 0000000..c0c464e --- /dev/null +++ b/examples/nats/testing/integration/src/compose_env.rs @@ -0,0 +1,86 @@ +use std::{env, fs, io::Error, path::Path}; + +use testing_framework_core::{ + cfgsync::StaticArtifactRenderer, scenario::DynError, topology::DeploymentDescriptor, +}; +use testing_framework_runner_compose::{ + ComposeDeployEnv, ComposeNodeConfigFileName, ComposeReadinessProbe, LoopbackNodeRuntimeSpec, +}; + +use crate::NatsEnv; + +const NODE_CONFIG_PATH: &str = "/etc/nats/nats.conf"; + +impl ComposeDeployEnv for NatsEnv { + fn prepare_compose_configs( + path: &Path, + topology: &Self::Deployment, + _cfgsync_port: u16, + _metrics_otlp_ingest_url: Option<&reqwest::Url>, + ) -> Result<(), DynError> { + let hostnames = Self::cfgsync_hostnames(topology); + let configs_dir = path + .parent() + .ok_or_else(|| Error::other("cfgsync path has no parent"))? + .join("configs"); + fs::create_dir_all(&configs_dir)?; + + for index in 0..topology.node_count() { + let mut config = ::build_node_config(topology, index)?; + ::rewrite_for_hostnames( + topology, + index, + &hostnames, + &mut config, + )?; + let rendered = ::serialize_node_config(&config)?; + fs::write( + configs_dir.join(Self::static_node_config_file_name(index)), + rendered, + )?; + } + + Ok(()) + } + + fn static_node_config_file_name(index: usize) -> String { + ComposeNodeConfigFileName::FixedExtension("nats").resolve(index) + } + + fn loopback_node_runtime_spec( + topology: &Self::Deployment, + index: usize, + ) -> Result, DynError> { + let _ = topology; + Ok(Some(build_nats_runtime(index))) + } + + fn readiness_probe() -> ComposeReadinessProbe { + ComposeReadinessProbe::Http { + path: ::node_readiness_path(), + } + } +} + +fn build_nats_runtime(index: usize) -> LoopbackNodeRuntimeSpec { + let image = env::var("NATS_IMAGE").unwrap_or_else(|_| "nats:2.10".to_owned()); + let platform = env::var("NATS_PLATFORM").ok(); + + LoopbackNodeRuntimeSpec { + image, + entrypoint: vec![ + "nats-server".to_owned(), + "--config".to_owned(), + NODE_CONFIG_PATH.to_owned(), + ], + volumes: vec![format!( + "./stack/configs/node-{index}.nats:{NODE_CONFIG_PATH}:ro" + )], + extra_hosts: vec![], + container_ports: vec![8222, 4222, 6222], + environment: vec![testing_framework_runner_compose::EnvEntry::new( + "RUST_LOG", "info", + )], + platform, + } +} diff --git a/examples/nats/testing/integration/src/lib.rs b/examples/nats/testing/integration/src/lib.rs new file mode 100644 index 0000000..4aef15e --- /dev/null +++ b/examples/nats/testing/integration/src/lib.rs @@ -0,0 +1,10 @@ +mod app; +mod compose_env; +mod local_env; +pub mod scenario; + +pub use app::*; +pub use scenario::{NatsBuilderExt, NatsScenarioBuilder}; + +pub type NatsLocalDeployer = testing_framework_runner_local::ProcessDeployer; +pub type NatsComposeDeployer = testing_framework_runner_compose::ComposeDeployer; diff --git a/examples/nats/testing/integration/src/local_env.rs b/examples/nats/testing/integration/src/local_env.rs new file mode 100644 index 0000000..fc74370 --- /dev/null +++ b/examples/nats/testing/integration/src/local_env.rs @@ -0,0 +1,68 @@ +use std::{ + collections::HashMap, + net::{Ipv4Addr, SocketAddr}, +}; + +use testing_framework_core::scenario::{DynError, StartNodeOptions}; +use testing_framework_runner_local::{ + LocalDeployerEnv, LocalNodePorts, LocalPeerNode, LocalProcessSpec, NodeEndpointPort, + NodeEndpoints, build_local_cluster_node_config, env::Node, text_node_config, +}; + +use crate::{CLUSTER_PORT_KEY, NatsEnv, NatsNodeConfig, render_nats_config}; + +impl LocalDeployerEnv for NatsEnv { + fn initial_node_name_prefix() -> &'static str { + "nats-node" + } + + fn initial_local_port_names() -> &'static [&'static str] { + &["client", "monitor"] + } + + fn build_local_node_config_with_peers( + _topology: &Self::Deployment, + index: usize, + ports: &LocalNodePorts, + peers: &[LocalPeerNode], + _peer_ports_by_name: &HashMap, + _options: &StartNodeOptions, + _template_config: Option< + &::NodeConfig, + >, + ) -> Result<::NodeConfig, DynError> { + build_local_cluster_node_config::(index, ports, peers) + } + + fn local_process_spec() -> Option { + Some( + LocalProcessSpec::new("NATS_SERVER_BIN", "nats-server") + .with_config_file("nats.conf", "-c"), + ) + } + + fn render_local_config(config: &NatsNodeConfig) -> Result, DynError> { + Ok(text_node_config(render_nats_config(config))) + } + + fn node_endpoints(config: &NatsNodeConfig) -> Result { + let mut endpoints = NodeEndpoints { + api: SocketAddr::from((Ipv4Addr::LOCALHOST, config.monitor_port)), + extra_ports: HashMap::new(), + }; + + endpoints.insert_port(NodeEndpointPort::TestingApi, config.client_port); + endpoints.insert_port( + NodeEndpointPort::Custom(CLUSTER_PORT_KEY.to_owned()), + config.cluster_port, + ); + + Ok(endpoints) + } + + fn node_peer_port(node: &Node) -> u16 { + node.endpoints() + .port(&NodeEndpointPort::Custom(CLUSTER_PORT_KEY.to_owned())) + .unwrap_or_else(|| node.config().cluster_port) + } +} diff --git a/examples/nats/testing/integration/src/scenario.rs b/examples/nats/testing/integration/src/scenario.rs new file mode 100644 index 0000000..b5f4670 --- /dev/null +++ b/examples/nats/testing/integration/src/scenario.rs @@ -0,0 +1,15 @@ +use testing_framework_core::scenario::ScenarioBuilder; + +use crate::{NatsEnv, NatsTopology}; + +pub type NatsScenarioBuilder = ScenarioBuilder; + +pub trait NatsBuilderExt: Sized { + fn deployment_with(f: impl FnOnce(NatsTopology) -> NatsTopology) -> Self; +} + +impl NatsBuilderExt for NatsScenarioBuilder { + fn deployment_with(f: impl FnOnce(NatsTopology) -> NatsTopology) -> Self { + NatsScenarioBuilder::with_deployment(f(NatsTopology::new(3))) + } +} diff --git a/examples/nats/testing/workloads/Cargo.toml b/examples/nats/testing/workloads/Cargo.toml new file mode 100644 index 0000000..9a856c0 --- /dev/null +++ b/examples/nats/testing/workloads/Cargo.toml @@ -0,0 +1,14 @@ +[package] +edition.workspace = true +license.workspace = true +name = "nats-runtime-workloads" +version.workspace = true + +[dependencies] +nats-runtime-ext = { path = "../integration" } +testing-framework-core = { workspace = true } + +async-trait = { workspace = true } +futures-util = "0.3" +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } diff --git a/examples/nats/testing/workloads/src/health.rs b/examples/nats/testing/workloads/src/health.rs new file mode 100644 index 0000000..2260b56 --- /dev/null +++ b/examples/nats/testing/workloads/src/health.rs @@ -0,0 +1,70 @@ +use std::time::Duration; + +use async_trait::async_trait; +use nats_runtime_ext::{NatsClient, NatsEnv}; +use testing_framework_core::scenario::{DynError, Expectation, RunContext}; +use tokio::time::Instant; +use tracing::info; + +#[derive(Clone)] +pub struct NatsClusterHealthy { + timeout: Duration, + poll_interval: Duration, +} + +impl NatsClusterHealthy { + #[must_use] + pub const fn new() -> Self { + Self { + timeout: Duration::from_secs(20), + poll_interval: Duration::from_millis(500), + } + } + + #[must_use] + pub const fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } +} + +impl Default for NatsClusterHealthy { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl Expectation for NatsClusterHealthy { + fn name(&self) -> &str { + "nats_cluster_healthy" + } + + async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> { + let clients = ctx.node_clients().snapshot(); + if clients.is_empty() { + return Err("no nats node clients available".into()); + } + + let deadline = Instant::now() + self.timeout; + while Instant::now() < deadline { + if all_nodes_healthy(&clients).await? { + info!(nodes = clients.len(), "nats cluster healthy"); + return Ok(()); + } + + tokio::time::sleep(self.poll_interval).await; + } + + Err(format!("nats cluster not healthy within {:?}", self.timeout).into()) + } +} + +async fn all_nodes_healthy(clients: &[NatsClient]) -> Result { + for client in clients { + if !client.is_healthy().await? { + return Ok(false); + } + } + Ok(true) +} diff --git a/examples/nats/testing/workloads/src/lib.rs b/examples/nats/testing/workloads/src/lib.rs new file mode 100644 index 0000000..ddd336d --- /dev/null +++ b/examples/nats/testing/workloads/src/lib.rs @@ -0,0 +1,6 @@ +mod health; +mod roundtrip; + +pub use health::NatsClusterHealthy; +pub use nats_runtime_ext::{NatsBuilderExt, NatsEnv, NatsScenarioBuilder, NatsTopology}; +pub use roundtrip::NatsRoundTripWorkload; diff --git a/examples/nats/testing/workloads/src/roundtrip.rs b/examples/nats/testing/workloads/src/roundtrip.rs new file mode 100644 index 0000000..c2aea0b --- /dev/null +++ b/examples/nats/testing/workloads/src/roundtrip.rs @@ -0,0 +1,114 @@ +use std::{collections::HashSet, time::Duration}; + +use async_trait::async_trait; +use futures_util::StreamExt; +use nats_runtime_ext::NatsEnv; +use testing_framework_core::scenario::{DynError, RunContext, Workload}; +use tokio::time::Instant; +use tracing::info; + +#[derive(Clone)] +pub struct NatsRoundTripWorkload { + subject: String, + messages: usize, + publish_rate_per_sec: Option, + timeout: Duration, +} + +impl NatsRoundTripWorkload { + #[must_use] + pub fn new(subject: impl Into) -> Self { + Self { + subject: subject.into(), + messages: 200, + publish_rate_per_sec: Some(50), + timeout: Duration::from_secs(20), + } + } + + #[must_use] + pub const fn messages(mut self, value: usize) -> Self { + self.messages = value; + self + } + + #[must_use] + pub const fn publish_rate_per_sec(mut self, value: usize) -> Self { + self.publish_rate_per_sec = Some(value); + self + } + + #[must_use] + pub const fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } +} + +#[async_trait] +impl Workload for NatsRoundTripWorkload { + fn name(&self) -> &str { + "nats_roundtrip_workload" + } + + async fn start(&self, ctx: &RunContext) -> Result<(), DynError> { + let clients = ctx.node_clients().snapshot(); + if clients.len() < 2 { + return Err("nats roundtrip workload requires at least 2 nodes".into()); + } + + let subscriber_client = clients[1].connect().await?; + let mut subscription = subscriber_client.subscribe(self.subject.clone()).await?; + + let publisher = clients[0].connect().await?; + let interval = self.publish_rate_per_sec.and_then(compute_interval); + + info!(messages = self.messages, subject = %self.subject, "nats publish phase"); + for idx in 0..self.messages { + let payload = format!("msg-{idx}"); + publisher + .publish(self.subject.clone(), payload.into()) + .await?; + + if let Some(delay) = interval { + tokio::time::sleep(delay).await; + } + } + publisher.flush().await?; + + info!(messages = self.messages, subject = %self.subject, "nats consume phase"); + let mut expected = (0..self.messages) + .map(|idx| format!("msg-{idx}")) + .collect::>(); + let deadline = Instant::now() + self.timeout; + + while !expected.is_empty() && Instant::now() < deadline { + let remaining = deadline.saturating_duration_since(Instant::now()); + let Some(message) = tokio::time::timeout(remaining, subscription.next()).await? else { + break; + }; + + let payload = String::from_utf8(message.payload.to_vec())?; + expected.remove(&payload); + } + + if expected.is_empty() { + info!(messages = self.messages, "nats roundtrip finished"); + return Ok(()); + } + + Err(format!( + "nats roundtrip timed out: missing {} messages", + expected.len() + ) + .into()) + } +} + +fn compute_interval(rate_per_sec: usize) -> Option { + if rate_per_sec == 0 { + return None; + } + + Some(Duration::from_millis((1000 / rate_per_sec as u64).max(1))) +} diff --git a/examples/pubsub/Dockerfile b/examples/pubsub/Dockerfile new file mode 100644 index 0000000..500ee53 --- /dev/null +++ b/examples/pubsub/Dockerfile @@ -0,0 +1,24 @@ +FROM rustlang/rust:nightly-bookworm AS builder + +WORKDIR /build + +COPY Cargo.toml Cargo.lock ./ +COPY cfgsync/ ./cfgsync/ +COPY examples/ ./examples/ +COPY testing-framework/ ./testing-framework/ + +RUN cargo build --release -p pubsub-node + +FROM debian:bookworm-slim + +RUN apt-get update && \ + apt-get install -y ca-certificates && \ + rm -rf /var/lib/apt/lists/* + +COPY --from=builder /build/target/release/pubsub-node /usr/local/bin/pubsub-node + +RUN mkdir -p /etc/pubsub +WORKDIR /app + +ENTRYPOINT ["/usr/local/bin/pubsub-node"] +CMD ["--config", "/etc/pubsub/config.yaml"] diff --git a/examples/pubsub/README.md b/examples/pubsub/README.md new file mode 100644 index 0000000..aa872f1 --- /dev/null +++ b/examples/pubsub/README.md @@ -0,0 +1,61 @@ +# PubSub Example + +This example runs a small replicated pub/sub service with a WebSocket client +API. + +The scenarios open WebSocket sessions, subscribe to a topic, publish messages, +and check that every subscriber sees the same events. There is also a reconnect +scenario and a k8s manual-cluster variant that restarts a node mid-run. + +## How TF runs this + +Each example follows the same pattern: + +- TF starts a small deployment of pubsub nodes +- a workload opens WebSocket sessions and drives publish/subscribe behavior +- an expectation checks that the nodes end up with the same topic state + +## Scenarios + +- `basic_ws_roundtrip` runs a local roundtrip check +- `basic_ws_reconnect` runs a local reconnect scenario +- `compose_ws_roundtrip` and `compose_ws_reconnect` run the same checks in Docker Compose +- `k8s_ws_roundtrip` runs the roundtrip scenario on Kubernetes +- `k8s_manual_ws_roundtrip` starts the nodes through the k8s manual cluster API, restarts one node, and checks that the topic state converges again + +## Run locally + +```bash +cargo run -p pubsub-examples --bin basic_ws_roundtrip +cargo run -p pubsub-examples --bin basic_ws_reconnect +``` + +## Run with Docker Compose + +```bash +cargo run -p pubsub-examples --bin compose_ws_roundtrip +cargo run -p pubsub-examples --bin compose_ws_reconnect +``` + +Set `PUBSUB_IMAGE` to override the default compose image tag. + +## Run with Kubernetes + +```bash +docker build -t pubsub-node:local -f examples/pubsub/Dockerfile . +cargo run -p pubsub-examples --bin k8s_ws_roundtrip +``` + +Prerequisites: +- `kubectl` configured with a reachable cluster +- `helm` installed + +Optional image override: +- `PUBSUB_K8S_IMAGE` (falls back to `PUBSUB_IMAGE`, then `pubsub-node:local`) + +## Run with Kubernetes manual cluster + +```bash +docker build -t pubsub-node:local -f examples/pubsub/Dockerfile . +cargo run -p pubsub-examples --bin k8s_manual_ws_roundtrip +``` diff --git a/examples/pubsub/examples/Cargo.toml b/examples/pubsub/examples/Cargo.toml new file mode 100644 index 0000000..e2597e6 --- /dev/null +++ b/examples/pubsub/examples/Cargo.toml @@ -0,0 +1,17 @@ +[package] +edition.workspace = true +license.workspace = true +name = "pubsub-examples" +version.workspace = true + +[dependencies] +anyhow = "1.0" +pubsub-runtime-ext = { path = "../testing/integration" } +pubsub-runtime-workloads = { path = "../testing/workloads" } +serde = { workspace = true } +testing-framework-core = { workspace = true } +testing-framework-runner-compose = { workspace = true } +testing-framework-runner-k8s = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/examples/pubsub/examples/src/bin/basic_ws_reconnect.rs b/examples/pubsub/examples/src/bin/basic_ws_reconnect.rs new file mode 100644 index 0000000..f3f5d72 --- /dev/null +++ b/examples/pubsub/examples/src/bin/basic_ws_reconnect.rs @@ -0,0 +1,36 @@ +use std::time::Duration; + +use pubsub_runtime_ext::PubSubLocalDeployer; +use pubsub_runtime_workloads::{ + PubSubBuilderExt, PubSubConverges, PubSubScenarioBuilder, PubSubTopology, + PubSubWsReconnectWorkload, +}; +use testing_framework_core::scenario::Deployer; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let topic = "demo.reconnect"; + let workload = PubSubWsReconnectWorkload::new(topic) + .phase_one_messages(40) + .disconnected_messages(20) + .phase_two_messages(40) + .publish_rate_per_sec(20) + .timeout(Duration::from_secs(20)); + + let mut scenario = PubSubScenarioBuilder::deployment_with(|_| PubSubTopology::new(3)) + .with_run_duration(Duration::from_secs(35)) + .with_workload(workload.clone()) + .with_expectation( + PubSubConverges::new(topic, workload.total_messages()).timeout(Duration::from_secs(30)), + ) + .build()?; + + let deployer = PubSubLocalDeployer::default(); + let runner = deployer.deploy(&scenario).await?; + runner.run(&mut scenario).await?; + Ok(()) +} diff --git a/examples/pubsub/examples/src/bin/basic_ws_roundtrip.rs b/examples/pubsub/examples/src/bin/basic_ws_roundtrip.rs new file mode 100644 index 0000000..8c20140 --- /dev/null +++ b/examples/pubsub/examples/src/bin/basic_ws_roundtrip.rs @@ -0,0 +1,34 @@ +use std::time::Duration; + +use pubsub_runtime_ext::PubSubLocalDeployer; +use pubsub_runtime_workloads::{ + PubSubBuilderExt, PubSubConverges, PubSubScenarioBuilder, PubSubTopology, + PubSubWsRoundTripWorkload, +}; +use testing_framework_core::scenario::Deployer; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let topic = "demo.topic"; + let messages = 120; + + let mut scenario = PubSubScenarioBuilder::deployment_with(|_| PubSubTopology::new(3)) + .with_run_duration(Duration::from_secs(30)) + .with_workload( + PubSubWsRoundTripWorkload::new(topic) + .messages(messages) + .publish_rate_per_sec(25) + .timeout(Duration::from_secs(20)), + ) + .with_expectation(PubSubConverges::new(topic, messages).timeout(Duration::from_secs(25))) + .build()?; + + let deployer = PubSubLocalDeployer::default(); + let runner = deployer.deploy(&scenario).await?; + runner.run(&mut scenario).await?; + Ok(()) +} diff --git a/examples/pubsub/examples/src/bin/compose_ws_reconnect.rs b/examples/pubsub/examples/src/bin/compose_ws_reconnect.rs new file mode 100644 index 0000000..6084e53 --- /dev/null +++ b/examples/pubsub/examples/src/bin/compose_ws_reconnect.rs @@ -0,0 +1,52 @@ +use std::time::Duration; + +use anyhow::{Context as _, Result}; +use pubsub_runtime_workloads::{ + PubSubBuilderExt, PubSubConverges, PubSubScenarioBuilder, PubSubTopology, + PubSubWsReconnectWorkload, +}; +use testing_framework_core::scenario::Deployer; +use testing_framework_runner_compose::ComposeRunnerError; +use tracing::{info, warn}; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let topic = "demo.reconnect"; + let workload = PubSubWsReconnectWorkload::new(topic) + .phase_one_messages(40) + .disconnected_messages(20) + .phase_two_messages(40) + .publish_rate_per_sec(20) + .timeout(Duration::from_secs(20)); + + let mut scenario = PubSubScenarioBuilder::deployment_with(|_| PubSubTopology::new(3)) + .with_run_duration(Duration::from_secs(35)) + .with_workload(workload.clone()) + .with_expectation( + PubSubConverges::new(topic, workload.total_messages()).timeout(Duration::from_secs(30)), + ) + .build()?; + + let deployer = pubsub_runtime_ext::PubSubComposeDeployer::new(); + let runner = match deployer.deploy(&scenario).await { + Ok(runner) => runner, + Err(ComposeRunnerError::DockerUnavailable) => { + warn!("docker unavailable; skipping pubsub reconnect compose run"); + return Ok(()); + } + Err(error) => { + return Err(anyhow::Error::new(error)).context("deploying pubsub compose stack"); + } + }; + + info!("running pubsub compose ws reconnect scenario"); + runner + .run(&mut scenario) + .await + .context("running pubsub compose reconnect scenario")?; + Ok(()) +} diff --git a/examples/pubsub/examples/src/bin/compose_ws_roundtrip.rs b/examples/pubsub/examples/src/bin/compose_ws_roundtrip.rs new file mode 100644 index 0000000..98346a7 --- /dev/null +++ b/examples/pubsub/examples/src/bin/compose_ws_roundtrip.rs @@ -0,0 +1,50 @@ +use std::time::Duration; + +use anyhow::{Context as _, Result}; +use pubsub_runtime_workloads::{ + PubSubBuilderExt, PubSubConverges, PubSubScenarioBuilder, PubSubTopology, + PubSubWsRoundTripWorkload, +}; +use testing_framework_core::scenario::Deployer; +use testing_framework_runner_compose::ComposeRunnerError; +use tracing::{info, warn}; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let topic = "demo.topic"; + let messages = 120; + + let mut scenario = PubSubScenarioBuilder::deployment_with(|_| PubSubTopology::new(3)) + .with_run_duration(Duration::from_secs(30)) + .with_workload( + PubSubWsRoundTripWorkload::new(topic) + .messages(messages) + .publish_rate_per_sec(20) + .timeout(Duration::from_secs(20)), + ) + .with_expectation(PubSubConverges::new(topic, messages).timeout(Duration::from_secs(25))) + .build()?; + + let deployer = pubsub_runtime_ext::PubSubComposeDeployer::new(); + let runner = match deployer.deploy(&scenario).await { + Ok(runner) => runner, + Err(ComposeRunnerError::DockerUnavailable) => { + warn!("docker unavailable; skipping pubsub compose run"); + return Ok(()); + } + Err(error) => { + return Err(anyhow::Error::new(error)).context("deploying pubsub compose stack"); + } + }; + + info!("running pubsub compose ws roundtrip scenario"); + runner + .run(&mut scenario) + .await + .context("running pubsub compose scenario")?; + Ok(()) +} diff --git a/examples/pubsub/examples/src/bin/k8s_manual_ws_roundtrip.rs b/examples/pubsub/examples/src/bin/k8s_manual_ws_roundtrip.rs new file mode 100644 index 0000000..10d8e94 --- /dev/null +++ b/examples/pubsub/examples/src/bin/k8s_manual_ws_roundtrip.rs @@ -0,0 +1,246 @@ +use std::{collections::HashMap, time::Duration}; + +use anyhow::{Context as _, Result, anyhow}; +use pubsub_runtime_ext::{ + PubSubClient, PubSubEventId, PubSubK8sDeployer, PubSubSession, PubSubTopology, +}; +use serde::Deserialize; +use testing_framework_runner_k8s::ManualClusterError; +use tracing::{info, warn}; + +#[derive(Clone, Debug, Deserialize, PartialEq, Eq)] +struct Revision { + version: u64, + origin: u64, +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Eq)] +struct TopicsStateView { + revision: Revision, + total_events: usize, + topic_counts: HashMap, +} + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let topic = "manual.demo"; + let deployer = PubSubK8sDeployer::new(); + let cluster = match deployer + .manual_cluster_from_descriptors(PubSubTopology::new(3)) + .await + { + Ok(cluster) => cluster, + Err(ManualClusterError::ClientInit { source }) => { + warn!("k8s unavailable ({source}); skipping pubsub k8s manual run"); + return Ok(()); + } + Err(ManualClusterError::InstallStack { source }) + if k8s_cluster_unavailable(&source.to_string()) => + { + warn!("k8s unavailable ({source}); skipping pubsub k8s manual run"); + return Ok(()); + } + Err(error) => { + return Err(anyhow::Error::new(error)).context("creating pubsub k8s manual cluster"); + } + }; + + let node0 = cluster.start_node("node-0").await?.client; + let node1 = cluster.start_node("node-1").await?.client; + let node2 = cluster.start_node("node-2").await?.client; + + cluster.wait_network_ready().await?; + + roundtrip_batch(&node0, &[node1.clone(), node2.clone()], topic, 24, 0).await?; + wait_for_topic_convergence(&[node0.clone(), node1.clone(), node2.clone()], topic, 24).await?; + + info!("restarting node-2 in manual cluster"); + cluster.restart_node("node-2").await?; + cluster.wait_network_ready().await?; + + let restarted_node2 = cluster + .node_client("node-2") + .ok_or_else(|| anyhow!("node-2 client missing after restart"))?; + + roundtrip_batch( + &node0, + &[node1.clone(), restarted_node2.clone()], + topic, + 12, + 24, + ) + .await?; + wait_for_topic_convergence(&[node0, node1, restarted_node2], topic, 36).await?; + + cluster.stop_all(); + Ok(()) +} + +async fn roundtrip_batch( + publisher_client: &PubSubClient, + subscriber_clients: &[PubSubClient], + topic: &str, + message_count: usize, + start_index: usize, +) -> Result<()> { + let mut subscribers = Vec::with_capacity(subscriber_clients.len()); + for client in subscriber_clients { + let mut session = client + .connect() + .await + .map_err(|error| anyhow!(error.to_string()))?; + session + .subscribe(topic) + .await + .map_err(|error| anyhow!(error.to_string()))?; + subscribers.push(session); + } + + let mut publisher = publisher_client + .connect() + .await + .map_err(|error| anyhow!(error.to_string()))?; + for index in start_index..start_index + message_count { + publisher + .publish(topic, format!("msg-{index}")) + .await + .map_err(|error| anyhow!(error.to_string())) + .with_context(|| format!("publishing message {index}"))?; + tokio::time::sleep(Duration::from_millis(50)).await; + } + + let deliveries = collect_deliveries(&mut subscribers, message_count).await?; + ensure_deliveries_match(&deliveries, message_count)?; + + for session in &mut subscribers { + session + .close() + .await + .map_err(|error| anyhow!(error.to_string()))?; + } + + publisher + .close() + .await + .map_err(|error| anyhow!(error.to_string()))?; + Ok(()) +} + +async fn collect_deliveries( + subscribers: &mut [PubSubSession], + expected_messages: usize, +) -> Result>> { + let mut deliveries = vec![HashMap::new(); subscribers.len()]; + let deadline = tokio::time::Instant::now() + Duration::from_secs(30); + + while tokio::time::Instant::now() < deadline + && deliveries.iter().any(|seen| seen.len() < expected_messages) + { + for (index, session) in subscribers.iter_mut().enumerate() { + if deliveries[index].len() >= expected_messages { + continue; + } + + if let Some(event) = session + .next_event_timeout(Duration::from_millis(200)) + .await + .map_err(|error| anyhow!(error.to_string()))? + { + deliveries[index].entry(event.id).or_insert(event.payload); + } + } + } + + Ok(deliveries) +} + +fn ensure_deliveries_match( + deliveries: &[HashMap], + expected_messages: usize, +) -> Result<()> { + for (index, seen) in deliveries.iter().enumerate() { + if seen.len() != expected_messages { + return Err(anyhow!( + "subscriber {index} saw {}/{} messages", + seen.len(), + expected_messages + )); + } + } + + if let Some((baseline, rest)) = deliveries.split_first() { + for seen in rest { + if seen != baseline { + return Err(anyhow!("subscriber deliveries diverged")); + } + } + } + + Ok(()) +} + +async fn wait_for_topic_convergence( + clients: &[PubSubClient], + topic: &str, + expected_messages: usize, +) -> Result<()> { + let deadline = tokio::time::Instant::now() + Duration::from_secs(30); + + while tokio::time::Instant::now() < deadline { + if topic_converged(clients, topic, expected_messages).await? { + info!(expected_messages, "pubsub manual cluster converged"); + return Ok(()); + } + tokio::time::sleep(Duration::from_millis(500)).await; + } + + Err(anyhow!( + "pubsub manual cluster did not converge on topic state within timeout" + )) +} + +async fn topic_converged( + clients: &[PubSubClient], + topic: &str, + expected_messages: usize, +) -> Result { + let mut baseline: Option = None; + + for client in clients { + let state: TopicsStateView = client + .get("/topics/state") + .await + .map_err(|error| anyhow!(error.to_string()))?; + + if state.total_events != expected_messages { + return Ok(false); + } + + if state.topic_counts.get(topic).copied().unwrap_or_default() != expected_messages { + return Ok(false); + } + + match &baseline { + Some(expected) + if expected.revision != state.revision + || expected.topic_counts != state.topic_counts => + { + return Ok(false); + } + None => baseline = Some(state), + Some(_) => {} + } + } + + Ok(true) +} + +fn k8s_cluster_unavailable(message: &str) -> bool { + message.contains("Unable to connect to the server") + || message.contains("TLS handshake timeout") + || message.contains("connection refused") +} diff --git a/examples/pubsub/examples/src/bin/k8s_ws_roundtrip.rs b/examples/pubsub/examples/src/bin/k8s_ws_roundtrip.rs new file mode 100644 index 0000000..b197f35 --- /dev/null +++ b/examples/pubsub/examples/src/bin/k8s_ws_roundtrip.rs @@ -0,0 +1,64 @@ +use std::time::Duration; + +use anyhow::{Context as _, Result}; +use pubsub_runtime_ext::PubSubK8sDeployer; +use pubsub_runtime_workloads::{ + PubSubBuilderExt, PubSubConverges, PubSubScenarioBuilder, PubSubTopology, + PubSubWsRoundTripWorkload, +}; +use testing_framework_core::scenario::Deployer; +use testing_framework_runner_k8s::K8sRunnerError; +use tracing::{info, warn}; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let topic = "demo.topic"; + let messages = 120; + + let mut scenario = PubSubScenarioBuilder::deployment_with(|_| PubSubTopology::new(3)) + .with_run_duration(Duration::from_secs(40)) + .with_workload( + PubSubWsRoundTripWorkload::new(topic) + .messages(messages) + .publish_rate_per_sec(15) + .timeout(Duration::from_secs(30)), + ) + .with_expectation(PubSubConverges::new(topic, messages).timeout(Duration::from_secs(35))) + .build()?; + + let deployer = PubSubK8sDeployer::new(); + let runner = match deployer.deploy(&scenario).await { + Ok(runner) => runner, + Err(K8sRunnerError::ClientInit { source }) => { + warn!("k8s unavailable ({source}); skipping pubsub k8s run"); + return Ok(()); + } + Err(K8sRunnerError::InstallStack { source }) + if k8s_cluster_unavailable(&source.to_string()) => + { + warn!("k8s unavailable ({source}); skipping pubsub k8s run"); + return Ok(()); + } + Err(error) => { + return Err(anyhow::Error::new(error)).context("deploying pubsub k8s stack"); + } + }; + + info!("running pubsub k8s ws roundtrip scenario"); + runner + .run(&mut scenario) + .await + .context("running pubsub k8s scenario")?; + + Ok(()) +} + +fn k8s_cluster_unavailable(message: &str) -> bool { + message.contains("Unable to connect to the server") + || message.contains("TLS handshake timeout") + || message.contains("connection refused") +} diff --git a/examples/pubsub/pubsub-node/Cargo.toml b/examples/pubsub/pubsub-node/Cargo.toml new file mode 100644 index 0000000..f03a67a --- /dev/null +++ b/examples/pubsub/pubsub-node/Cargo.toml @@ -0,0 +1,23 @@ +[package] +edition.workspace = true +license.workspace = true +name = "pubsub-node" +version.workspace = true + +[[bin]] +name = "pubsub-node" +path = "src/main.rs" + +[dependencies] +anyhow = "1.0" +axum = { version = "0.7", features = ["ws"] } +clap = { version = "4.0", features = ["derive"] } +futures-util = "0.3" +reqwest = { workspace = true, features = ["json"] } +serde = { workspace = true } +serde_json = { workspace = true } +serde_yaml = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tower-http = { version = "0.6", features = ["trace"] } +tracing = { workspace = true } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/examples/pubsub/pubsub-node/src/config.rs b/examples/pubsub/pubsub-node/src/config.rs new file mode 100644 index 0000000..7212720 --- /dev/null +++ b/examples/pubsub/pubsub-node/src/config.rs @@ -0,0 +1,29 @@ +use std::{fs, path::Path}; + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PeerInfo { + pub node_id: u64, + pub http_address: String, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PubSubConfig { + pub node_id: u64, + pub http_port: u16, + pub peers: Vec, + #[serde(default = "default_sync_interval_ms")] + pub sync_interval_ms: u64, +} + +impl PubSubConfig { + pub fn load(path: &Path) -> anyhow::Result { + let raw = fs::read_to_string(path)?; + Ok(serde_yaml::from_str(&raw)?) + } +} + +const fn default_sync_interval_ms() -> u64 { + 1000 +} diff --git a/examples/pubsub/pubsub-node/src/main.rs b/examples/pubsub/pubsub-node/src/main.rs new file mode 100644 index 0000000..5095833 --- /dev/null +++ b/examples/pubsub/pubsub-node/src/main.rs @@ -0,0 +1,36 @@ +mod config; +mod server; +mod state; +mod sync; + +use std::path::PathBuf; + +use clap::Parser; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +use crate::{config::PubSubConfig, state::PubSubState, sync::SyncService}; + +#[derive(Parser, Debug)] +#[command(name = "pubsub-node")] +struct Args { + #[arg(short, long)] + config: PathBuf, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "pubsub_node=info,tower_http=debug".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + + let args = Args::parse(); + let config = PubSubConfig::load(&args.config)?; + + let state = PubSubState::new(config.node_id); + SyncService::new(config.clone(), state.clone()).start(); + server::start_server(config, state).await +} diff --git a/examples/pubsub/pubsub-node/src/server.rs b/examples/pubsub/pubsub-node/src/server.rs new file mode 100644 index 0000000..bc78ffb --- /dev/null +++ b/examples/pubsub/pubsub-node/src/server.rs @@ -0,0 +1,167 @@ +use std::{collections::HashSet, sync::Arc}; + +use axum::{ + Router, + extract::{State, WebSocketUpgrade, ws::Message}, + http::StatusCode, + response::{IntoResponse, Json}, + routing::get, +}; +use futures_util::{SinkExt, StreamExt}; +use serde::{Deserialize, Serialize}; +use tower_http::trace::TraceLayer; +use tracing::{debug, warn}; + +use crate::{ + config::PubSubConfig, + state::{PubSubState, Snapshot, TopicEvent, TopicsStateView}, +}; + +#[derive(Serialize)] +struct HealthResponse { + status: &'static str, +} + +#[derive(Debug, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +enum ClientFrame { + Subscribe { topic: String }, + Unsubscribe { topic: String }, + Publish { topic: String, payload: String }, +} + +#[derive(Clone, Debug, Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +enum ServerFrame { + Subscribed { topic: String }, + Unsubscribed { topic: String }, + Published { id: crate::state::EventId }, + Event { event: TopicEvent }, + Error { message: String }, +} + +pub async fn start_server(config: PubSubConfig, state: PubSubState) -> anyhow::Result<()> { + let app = Router::new() + .route("/health/live", get(health_live)) + .route("/health/ready", get(health_ready)) + .route("/topics/state", get(topics_state)) + .route("/internal/snapshot", get(snapshot)) + .route("/ws", get(ws_handler)) + .layer(TraceLayer::new_for_http()) + .with_state(Arc::new(state.clone())); + + let addr = std::net::SocketAddr::from(([0, 0, 0, 0], config.http_port)); + let listener = tokio::net::TcpListener::bind(addr).await?; + + state.set_ready(true).await; + tracing::info!(node_id = state.node_id(), %addr, "pubsub node ready"); + + axum::serve(listener, app).await?; + Ok(()) +} + +async fn health_live() -> (StatusCode, Json) { + (StatusCode::OK, Json(HealthResponse { status: "alive" })) +} + +async fn health_ready(State(state): State>) -> (StatusCode, Json) { + if state.is_ready().await { + (StatusCode::OK, Json(HealthResponse { status: "ready" })) + } else { + ( + StatusCode::SERVICE_UNAVAILABLE, + Json(HealthResponse { + status: "not-ready", + }), + ) + } +} + +async fn topics_state(State(state): State>) -> Json { + Json(state.topics_state().await) +} + +async fn snapshot(State(state): State>) -> Json { + Json(state.snapshot().await) +} + +async fn ws_handler( + ws: WebSocketUpgrade, + State(state): State>, +) -> impl IntoResponse { + ws.on_upgrade(move |socket| handle_socket(socket, state)) +} + +async fn handle_socket(socket: axum::extract::ws::WebSocket, state: Arc) { + let (mut sender, mut receiver) = socket.split(); + let mut topics = HashSet::new(); + let mut events = state.subscribe_events(); + + loop { + tokio::select! { + incoming = receiver.next() => { + match incoming { + Some(Ok(Message::Text(text))) => { + match serde_json::from_str::(&text) { + Ok(frame) => handle_client_frame(frame, &state, &mut topics, &mut sender).await, + Err(error) => { + let _ = send_frame(&mut sender, &ServerFrame::Error { message: format!("invalid frame: {error}") }).await; + } + } + } + Some(Ok(Message::Close(_))) | None => break, + Some(Ok(_)) => {} + Some(Err(error)) => { + debug!(%error, "ws receive error"); + break; + } + } + } + event = events.recv() => { + match event { + Ok(event) if topics.contains(&event.topic) => { + if send_frame(&mut sender, &ServerFrame::Event { event }).await.is_err() { + break; + } + } + Ok(_) => {} + Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => { + warn!(skipped, "ws subscriber lagged"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + } + } +} + +async fn handle_client_frame( + frame: ClientFrame, + state: &PubSubState, + topics: &mut HashSet, + sender: &mut futures_util::stream::SplitSink, +) { + match frame { + ClientFrame::Subscribe { topic } => { + topics.insert(topic.clone()); + let _ = send_frame(sender, &ServerFrame::Subscribed { topic }).await; + } + ClientFrame::Unsubscribe { topic } => { + topics.remove(&topic); + let _ = send_frame(sender, &ServerFrame::Unsubscribed { topic }).await; + } + ClientFrame::Publish { topic, payload } => { + let event = state.publish_local(topic, payload).await; + let _ = send_frame(sender, &ServerFrame::Published { id: event.id }).await; + } + } +} + +async fn send_frame( + sender: &mut futures_util::stream::SplitSink, + frame: &ServerFrame, +) -> Result<(), axum::Error> { + let payload = serde_json::to_string(frame) + .map_err(|error| axum::Error::new(std::io::Error::other(error.to_string())))?; + sender.send(Message::Text(payload)).await +} diff --git a/examples/pubsub/pubsub-node/src/state.rs b/examples/pubsub/pubsub-node/src/state.rs new file mode 100644 index 0000000..94f333d --- /dev/null +++ b/examples/pubsub/pubsub-node/src/state.rs @@ -0,0 +1,179 @@ +use std::{ + collections::{BTreeMap, HashSet}, + sync::Arc, +}; + +use serde::{Deserialize, Serialize}; +use tokio::sync::{RwLock, broadcast}; + +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] +pub struct Revision { + pub version: u64, + pub origin: u64, +} + +#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] +pub struct EventId { + pub origin: u64, + pub seq: u64, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct TopicEvent { + pub id: EventId, + pub topic: String, + pub payload: String, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Snapshot { + pub node_id: u64, + pub revision: Revision, + pub next_seq: u64, + pub events: Vec, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize)] +pub struct TopicsStateView { + pub revision: Revision, + pub total_events: usize, + pub topic_counts: BTreeMap, +} + +#[derive(Debug)] +struct Data { + revision: Revision, + next_seq: u64, + events: Vec, + seen_ids: HashSet, +} + +impl Default for Data { + fn default() -> Self { + Self { + revision: Revision::default(), + next_seq: 1, + events: Vec::new(), + seen_ids: HashSet::new(), + } + } +} + +#[derive(Clone)] +pub struct PubSubState { + node_id: u64, + ready: Arc>, + data: Arc>, + event_tx: broadcast::Sender, +} + +impl PubSubState { + pub fn new(node_id: u64) -> Self { + let (event_tx, _) = broadcast::channel(2048); + + Self { + node_id, + ready: Arc::new(RwLock::new(false)), + data: Arc::new(RwLock::new(Data::default())), + event_tx, + } + } + + pub const fn node_id(&self) -> u64 { + self.node_id + } + + pub fn subscribe_events(&self) -> broadcast::Receiver { + self.event_tx.subscribe() + } + + pub async fn set_ready(&self, value: bool) { + *self.ready.write().await = value; + } + + pub async fn is_ready(&self) -> bool { + *self.ready.read().await + } + + pub async fn publish_local(&self, topic: String, payload: String) -> TopicEvent { + let event = { + let mut data = self.data.write().await; + let event = TopicEvent { + id: EventId { + origin: self.node_id, + seq: data.next_seq, + }, + topic, + payload, + }; + + data.next_seq = data.next_seq.saturating_add(1); + data.seen_ids.insert(event.id.clone()); + data.events.push(event.clone()); + bump_revision(&mut data.revision, self.node_id); + event + }; + + let _ = self.event_tx.send(event.clone()); + event + } + + pub async fn merge_snapshot(&self, snapshot: Snapshot) { + let merged_events = { + let mut data = self.data.write().await; + let mut added = Vec::new(); + + for event in snapshot.events { + if data.seen_ids.insert(event.id.clone()) { + data.events.push(event.clone()); + added.push(event); + } + } + + data.events + .sort_by_key(|event| (event.id.origin, event.id.seq)); + if is_newer_revision(snapshot.revision, data.revision) { + data.revision = snapshot.revision; + } + data.next_seq = data.next_seq.max(snapshot.next_seq); + added + }; + + for event in merged_events { + let _ = self.event_tx.send(event); + } + } + + pub async fn snapshot(&self) -> Snapshot { + let data = self.data.read().await; + Snapshot { + node_id: self.node_id, + revision: data.revision, + next_seq: data.next_seq, + events: data.events.clone(), + } + } + + pub async fn topics_state(&self) -> TopicsStateView { + let data = self.data.read().await; + let mut topic_counts = BTreeMap::new(); + for event in &data.events { + *topic_counts.entry(event.topic.clone()).or_insert(0) += 1; + } + + TopicsStateView { + revision: data.revision, + total_events: data.events.len(), + topic_counts, + } + } +} + +fn bump_revision(revision: &mut Revision, node_id: u64) { + revision.version = revision.version.saturating_add(1); + revision.origin = node_id; +} + +fn is_newer_revision(candidate: Revision, existing: Revision) -> bool { + (candidate.version, candidate.origin) > (existing.version, existing.origin) +} diff --git a/examples/pubsub/pubsub-node/src/sync.rs b/examples/pubsub/pubsub-node/src/sync.rs new file mode 100644 index 0000000..160e333 --- /dev/null +++ b/examples/pubsub/pubsub-node/src/sync.rs @@ -0,0 +1,100 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use reqwest::Client; +use tokio::sync::Mutex; +use tracing::{debug, warn}; + +use crate::{ + config::PubSubConfig, + state::{PubSubState, Snapshot}, +}; + +const WARN_AFTER_CONSECUTIVE_FAILURES: u32 = 5; + +#[derive(Clone)] +pub struct SyncService { + config: Arc, + state: PubSubState, + client: Client, + failures_by_peer: Arc>>, +} + +impl SyncService { + pub fn new(config: PubSubConfig, state: PubSubState) -> Self { + Self { + config: Arc::new(config), + state, + client: Client::new(), + failures_by_peer: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub fn start(&self) { + let service = self.clone(); + tokio::spawn(async move { + service.run().await; + }); + } + + async fn run(self) { + let interval = Duration::from_millis(self.config.sync_interval_ms.max(100)); + loop { + self.sync_once().await; + tokio::time::sleep(interval).await; + } + } + + async fn sync_once(&self) { + for peer in &self.config.peers { + match self.fetch_snapshot(&peer.http_address).await { + Ok(snapshot) => { + self.state.merge_snapshot(snapshot).await; + self.clear_failure_counter(&peer.http_address).await; + } + Err(error) => self.record_sync_failure(&peer.http_address, &error).await, + } + } + } + + async fn fetch_snapshot(&self, peer_address: &str) -> anyhow::Result { + let url = format!("http://{peer_address}/internal/snapshot"); + Ok(self + .client + .get(url) + .send() + .await? + .error_for_status()? + .json() + .await?) + } + + async fn clear_failure_counter(&self, peer_address: &str) { + let mut failures = self.failures_by_peer.lock().await; + failures.remove(peer_address); + } + + async fn record_sync_failure(&self, peer_address: &str, error: &anyhow::Error) { + let consecutive_failures = { + let mut failures = self.failures_by_peer.lock().await; + let entry = failures.entry(peer_address.to_owned()).or_insert(0); + *entry += 1; + *entry + }; + + if consecutive_failures >= WARN_AFTER_CONSECUTIVE_FAILURES { + warn!( + peer = %peer_address, + %error, + consecutive_failures, + "pubsub sync repeatedly failing" + ); + } else { + debug!( + peer = %peer_address, + %error, + consecutive_failures, + "pubsub sync failed" + ); + } + } +} diff --git a/examples/pubsub/testing/integration/Cargo.toml b/examples/pubsub/testing/integration/Cargo.toml new file mode 100644 index 0000000..9e5d8ec --- /dev/null +++ b/examples/pubsub/testing/integration/Cargo.toml @@ -0,0 +1,18 @@ +[package] +edition.workspace = true +license.workspace = true +name = "pubsub-runtime-ext" +version.workspace = true + +[dependencies] +async-trait = { workspace = true } +futures-util = "0.3" +reqwest = { workspace = true, features = ["json"] } +serde = { workspace = true } +serde_json = { workspace = true } +testing-framework-core = { workspace = true } +testing-framework-runner-compose = { workspace = true } +testing-framework-runner-k8s = { workspace = true } +testing-framework-runner-local = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tokio-tungstenite = "0.24" diff --git a/examples/pubsub/testing/integration/src/app.rs b/examples/pubsub/testing/integration/src/app.rs new file mode 100644 index 0000000..40e0840 --- /dev/null +++ b/examples/pubsub/testing/integration/src/app.rs @@ -0,0 +1,230 @@ +use std::{io::Error, time::Duration}; + +use async_trait::async_trait; +use futures_util::{SinkExt, StreamExt}; +use reqwest::Url; +use serde::{Deserialize, Serialize}; +use testing_framework_core::scenario::{ + Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView, + DefaultFeedRuntime, DynError, NodeAccess, serialize_cluster_yaml_config, +}; +use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::Message}; + +pub type PubSubTopology = testing_framework_core::topology::ClusterTopology; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PubSubPeerInfo { + pub node_id: u64, + pub http_address: String, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PubSubNodeConfig { + pub node_id: u64, + pub http_port: u16, + pub peers: Vec, + pub sync_interval_ms: u64, +} + +#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq)] +pub struct PubSubEventId { + pub origin: u64, + pub seq: u64, +} + +#[derive(Clone, Debug, Deserialize)] +pub struct PubSubEvent { + pub id: PubSubEventId, + pub topic: String, + pub payload: String, +} + +#[derive(Clone)] +pub struct PubSubClient { + base_url: Url, + ws_url: Url, + client: reqwest::Client, +} + +impl PubSubClient { + #[must_use] + pub fn new(base_url: Url) -> Self { + let mut ws_url = base_url.clone(); + ws_url + .set_scheme(if ws_url.scheme() == "https" { + "wss" + } else { + "ws" + }) + .ok(); + + Self { + base_url, + ws_url, + client: reqwest::Client::new(), + } + } + + #[must_use] + pub fn ws_endpoint(&self) -> String { + self.ws_url + .join("/ws") + .expect("valid ws endpoint") + .to_string() + } + + pub async fn connect(&self) -> Result { + let (ws, _) = connect_async(self.ws_endpoint()).await?; + Ok(PubSubSession { ws }) + } + + pub async fn get(&self, path: &str) -> Result { + let url = self.base_url.join(path)?; + let response = self.client.get(url).send().await?.error_for_status()?; + Ok(response.json().await?) + } +} + +pub struct PubSubSession { + ws: WebSocketStream>, +} + +impl PubSubSession { + pub async fn close(&mut self) -> Result<(), DynError> { + self.ws.close(None).await?; + Ok(()) + } + + pub async fn subscribe(&mut self, topic: &str) -> Result<(), DynError> { + self.send_frame(&ClientFrame::Subscribe { + topic: topic.to_owned(), + }) + .await + } + + pub async fn unsubscribe(&mut self, topic: &str) -> Result<(), DynError> { + self.send_frame(&ClientFrame::Unsubscribe { + topic: topic.to_owned(), + }) + .await + } + + pub async fn publish(&mut self, topic: &str, payload: String) -> Result<(), DynError> { + self.send_frame(&ClientFrame::Publish { + topic: topic.to_owned(), + payload, + }) + .await + } + + pub async fn next_event_timeout( + &mut self, + timeout: Duration, + ) -> Result, DynError> { + let Some(message) = tokio::time::timeout(timeout, self.ws.next()) + .await + .ok() + .flatten() + else { + return Ok(None); + }; + + self.parse_message(message?) + } + + async fn send_frame(&mut self, frame: &ClientFrame) -> Result<(), DynError> { + self.ws + .send(Message::Text(serde_json::to_string(frame)?)) + .await?; + Ok(()) + } + + fn parse_message(&self, msg: Message) -> Result, DynError> { + let Message::Text(text) = msg else { + return Ok(None); + }; + + let frame: ServerFrame = serde_json::from_str(&text)?; + Ok(match frame { + ServerFrame::Event { event } => Some(event), + ServerFrame::Other + | ServerFrame::Subscribed + | ServerFrame::Unsubscribed + | ServerFrame::Published + | ServerFrame::Error => None, + }) + } +} + +#[derive(Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +enum ClientFrame { + Subscribe { topic: String }, + Unsubscribe { topic: String }, + Publish { topic: String, payload: String }, +} + +#[derive(Debug, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +enum ServerFrame { + Subscribed, + Unsubscribed, + Published, + Event { + event: PubSubEvent, + }, + Error, + #[serde(other)] + Other, +} + +pub struct PubSubEnv; + +#[async_trait] +impl Application for PubSubEnv { + type Deployment = PubSubTopology; + type NodeClient = PubSubClient; + type NodeConfig = PubSubNodeConfig; + type FeedRuntime = DefaultFeedRuntime; + fn build_node_client(access: &NodeAccess) -> Result { + Ok(PubSubClient::new(access.api_base_url()?)) + } + + fn node_readiness_path() -> &'static str { + "/health/ready" + } +} + +impl ClusterNodeConfigApplication for PubSubEnv { + type ConfigError = Error; + + fn static_network_port() -> u16 { + 8080 + } + + fn build_cluster_node_config( + node: &ClusterNodeView, + peers: &[ClusterPeerView], + ) -> Result { + let peers = peers + .iter() + .map(|peer| PubSubPeerInfo { + node_id: peer.index() as u64, + http_address: peer.authority(), + }) + .collect::>(); + + Ok(PubSubNodeConfig { + node_id: node.index() as u64, + http_port: node.network_port(), + peers, + sync_interval_ms: 500, + }) + } + + fn serialize_cluster_node_config( + config: &Self::NodeConfig, + ) -> Result { + serialize_cluster_yaml_config(config).map_err(Error::other) + } +} diff --git a/examples/pubsub/testing/integration/src/compose_env.rs b/examples/pubsub/testing/integration/src/compose_env.rs new file mode 100644 index 0000000..82b7f49 --- /dev/null +++ b/examples/pubsub/testing/integration/src/compose_env.rs @@ -0,0 +1,15 @@ +use testing_framework_runner_compose::{BinaryConfigNodeSpec, ComposeBinaryApp}; + +use crate::PubSubEnv; + +const NODE_CONFIG_PATH: &str = "/etc/pubsub/config.yaml"; + +impl ComposeBinaryApp for PubSubEnv { + fn compose_node_spec() -> BinaryConfigNodeSpec { + BinaryConfigNodeSpec::conventional( + "/usr/local/bin/pubsub-node", + NODE_CONFIG_PATH, + vec![8080, 8081], + ) + } +} diff --git a/examples/pubsub/testing/integration/src/k8s_env.rs b/examples/pubsub/testing/integration/src/k8s_env.rs new file mode 100644 index 0000000..d33bd4a --- /dev/null +++ b/examples/pubsub/testing/integration/src/k8s_env.rs @@ -0,0 +1,21 @@ +use testing_framework_runner_k8s::{BinaryConfigK8sSpec, K8sBinaryApp}; + +use crate::PubSubEnv; + +const CONTAINER_CONFIG_PATH: &str = "/etc/pubsub/config.yaml"; +const CONTAINER_HTTP_PORT: u16 = 8080; +const SERVICE_TESTING_PORT: u16 = 8081; +const NODE_NAME_PREFIX: &str = "pubsub-node"; + +impl K8sBinaryApp for PubSubEnv { + fn k8s_binary_spec() -> BinaryConfigK8sSpec { + BinaryConfigK8sSpec::conventional( + "pubsub", + NODE_NAME_PREFIX, + "/usr/local/bin/pubsub-node", + CONTAINER_CONFIG_PATH, + CONTAINER_HTTP_PORT, + SERVICE_TESTING_PORT, + ) + } +} diff --git a/examples/pubsub/testing/integration/src/lib.rs b/examples/pubsub/testing/integration/src/lib.rs new file mode 100644 index 0000000..37d9531 --- /dev/null +++ b/examples/pubsub/testing/integration/src/lib.rs @@ -0,0 +1,12 @@ +mod app; +mod compose_env; +mod k8s_env; +mod local_env; +pub mod scenario; + +pub use app::*; +pub use scenario::{PubSubBuilderExt, PubSubScenarioBuilder}; + +pub type PubSubK8sDeployer = testing_framework_runner_k8s::K8sDeployer; +pub type PubSubLocalDeployer = testing_framework_runner_local::ProcessDeployer; +pub type PubSubComposeDeployer = testing_framework_runner_compose::ComposeDeployer; diff --git a/examples/pubsub/testing/integration/src/local_env.rs b/examples/pubsub/testing/integration/src/local_env.rs new file mode 100644 index 0000000..03bbdad --- /dev/null +++ b/examples/pubsub/testing/integration/src/local_env.rs @@ -0,0 +1,41 @@ +use std::collections::HashMap; + +use testing_framework_core::scenario::{DynError, StartNodeOptions}; +use testing_framework_runner_local::{ + LocalBinaryApp, LocalNodePorts, LocalPeerNode, LocalProcessSpec, + build_local_cluster_node_config, yaml_node_config, +}; + +use crate::{PubSubEnv, PubSubNodeConfig}; + +impl LocalBinaryApp for PubSubEnv { + fn initial_node_name_prefix() -> &'static str { + "pubsub-node" + } + + fn build_local_node_config_with_peers( + _topology: &Self::Deployment, + index: usize, + ports: &LocalNodePorts, + peers: &[LocalPeerNode], + _peer_ports_by_name: &HashMap, + _options: &StartNodeOptions, + _template_config: Option< + &::NodeConfig, + >, + ) -> Result<::NodeConfig, DynError> { + build_local_cluster_node_config::(index, ports, peers) + } + + fn local_process_spec() -> LocalProcessSpec { + LocalProcessSpec::new("PUBSUB_NODE_BIN", "pubsub-node").with_rust_log("pubsub_node=info") + } + + fn render_local_config(config: &PubSubNodeConfig) -> Result, DynError> { + yaml_node_config(config) + } + + fn http_api_port(config: &PubSubNodeConfig) -> u16 { + config.http_port + } +} diff --git a/examples/pubsub/testing/integration/src/scenario.rs b/examples/pubsub/testing/integration/src/scenario.rs new file mode 100644 index 0000000..7af819e --- /dev/null +++ b/examples/pubsub/testing/integration/src/scenario.rs @@ -0,0 +1,15 @@ +use testing_framework_core::scenario::ScenarioBuilder; + +use crate::{PubSubEnv, PubSubTopology}; + +pub type PubSubScenarioBuilder = ScenarioBuilder; + +pub trait PubSubBuilderExt: Sized { + fn deployment_with(f: impl FnOnce(PubSubTopology) -> PubSubTopology) -> Self; +} + +impl PubSubBuilderExt for PubSubScenarioBuilder { + fn deployment_with(f: impl FnOnce(PubSubTopology) -> PubSubTopology) -> Self { + PubSubScenarioBuilder::with_deployment(f(PubSubTopology::new(3))) + } +} diff --git a/examples/pubsub/testing/workloads/Cargo.toml b/examples/pubsub/testing/workloads/Cargo.toml new file mode 100644 index 0000000..aeaf627 --- /dev/null +++ b/examples/pubsub/testing/workloads/Cargo.toml @@ -0,0 +1,13 @@ +[package] +edition.workspace = true +license.workspace = true +name = "pubsub-runtime-workloads" +version.workspace = true + +[dependencies] +async-trait = { workspace = true } +pubsub-runtime-ext = { path = "../integration" } +serde = { workspace = true } +testing-framework-core = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } diff --git a/examples/pubsub/testing/workloads/src/convergence.rs b/examples/pubsub/testing/workloads/src/convergence.rs new file mode 100644 index 0000000..f9f3c06 --- /dev/null +++ b/examples/pubsub/testing/workloads/src/convergence.rs @@ -0,0 +1,104 @@ +use std::{collections::BTreeMap, time::Duration}; + +use async_trait::async_trait; +use pubsub_runtime_ext::{PubSubClient, PubSubEnv}; +use serde::Deserialize; +use testing_framework_core::scenario::{DynError, Expectation, RunContext}; +use tokio::time::Instant; +use tracing::info; + +#[derive(Clone)] +pub struct PubSubConverges { + topic: String, + min_messages: usize, + timeout: Duration, + poll_interval: Duration, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +struct Revision { + version: u64, + origin: u64, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +struct TopicsState { + revision: Revision, + total_events: usize, + topic_counts: BTreeMap, +} + +impl PubSubConverges { + #[must_use] + pub fn new(topic: impl Into, min_messages: usize) -> Self { + Self { + topic: topic.into(), + min_messages, + timeout: Duration::from_secs(20), + poll_interval: Duration::from_millis(500), + } + } + + #[must_use] + pub const fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } +} + +#[async_trait] +impl Expectation for PubSubConverges { + fn name(&self) -> &str { + "pubsub_converges" + } + + async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> { + let clients = ctx.node_clients().snapshot(); + if clients.is_empty() { + return Err("no pubsub node clients available".into()); + } + + let deadline = Instant::now() + self.timeout; + while Instant::now() < deadline { + if self.is_converged(&clients).await? { + info!(topic = %self.topic, min_messages = self.min_messages, "pubsub converged"); + return Ok(()); + } + tokio::time::sleep(self.poll_interval).await; + } + + Err(format!("pubsub did not converge within {:?}", self.timeout).into()) + } +} + +impl PubSubConverges { + async fn is_converged(&self, clients: &[PubSubClient]) -> Result { + let Some((first, rest)) = clients.split_first() else { + return Ok(false); + }; + + let baseline = read_state(first).await?; + if baseline + .topic_counts + .get(&self.topic) + .copied() + .unwrap_or_default() + < self.min_messages + { + return Ok(false); + } + + for client in rest { + let current = read_state(client).await?; + if current != baseline { + return Ok(false); + } + } + + Ok(true) + } +} + +async fn read_state(client: &PubSubClient) -> Result { + client.get("/topics/state").await +} diff --git a/examples/pubsub/testing/workloads/src/lib.rs b/examples/pubsub/testing/workloads/src/lib.rs new file mode 100644 index 0000000..05be480 --- /dev/null +++ b/examples/pubsub/testing/workloads/src/lib.rs @@ -0,0 +1,8 @@ +mod convergence; +mod ws_reconnect; +mod ws_roundtrip; + +pub use convergence::PubSubConverges; +pub use pubsub_runtime_ext::{PubSubBuilderExt, PubSubEnv, PubSubScenarioBuilder, PubSubTopology}; +pub use ws_reconnect::PubSubWsReconnectWorkload; +pub use ws_roundtrip::PubSubWsRoundTripWorkload; diff --git a/examples/pubsub/testing/workloads/src/ws_reconnect.rs b/examples/pubsub/testing/workloads/src/ws_reconnect.rs new file mode 100644 index 0000000..9c0745f --- /dev/null +++ b/examples/pubsub/testing/workloads/src/ws_reconnect.rs @@ -0,0 +1,201 @@ +use std::{collections::HashSet, time::Duration}; + +use async_trait::async_trait; +use pubsub_runtime_ext::PubSubEnv; +use testing_framework_core::scenario::{DynError, RunContext, Workload}; +use tokio::time::Instant; +use tracing::info; + +#[derive(Clone)] +pub struct PubSubWsReconnectWorkload { + topic: String, + phase_one_messages: usize, + disconnected_messages: usize, + phase_two_messages: usize, + publish_rate_per_sec: Option, + timeout: Duration, +} + +impl PubSubWsReconnectWorkload { + #[must_use] + pub fn new(topic: impl Into) -> Self { + Self { + topic: topic.into(), + phase_one_messages: 40, + disconnected_messages: 20, + phase_two_messages: 40, + publish_rate_per_sec: Some(20), + timeout: Duration::from_secs(20), + } + } + + #[must_use] + pub const fn phase_one_messages(mut self, value: usize) -> Self { + self.phase_one_messages = value; + self + } + + #[must_use] + pub const fn disconnected_messages(mut self, value: usize) -> Self { + self.disconnected_messages = value; + self + } + + #[must_use] + pub const fn phase_two_messages(mut self, value: usize) -> Self { + self.phase_two_messages = value; + self + } + + #[must_use] + pub const fn publish_rate_per_sec(mut self, value: usize) -> Self { + self.publish_rate_per_sec = Some(value); + self + } + + #[must_use] + pub const fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } + + #[must_use] + pub const fn total_messages(&self) -> usize { + self.phase_one_messages + self.disconnected_messages + self.phase_two_messages + } +} + +impl Default for PubSubWsReconnectWorkload { + fn default() -> Self { + Self::new("demo.topic") + } +} + +#[async_trait] +impl Workload for PubSubWsReconnectWorkload { + fn name(&self) -> &str { + "pubsub_ws_reconnect_workload" + } + + async fn start(&self, ctx: &RunContext) -> Result<(), DynError> { + let clients = ctx.node_clients().snapshot(); + if clients.len() < 2 { + return Err("pubsub reconnect workload requires at least 2 nodes".into()); + } + + let delay = self.publish_rate_per_sec.and_then(compute_interval); + + let mut subscriber = clients[1].connect().await?; + subscriber.subscribe(&self.topic).await?; + + let mut publisher = clients[0].connect().await?; + + info!(topic = %self.topic, "pubsub reconnect phase 1: subscriber connected"); + publish_batch( + &mut publisher, + &self.topic, + "phase1", + self.phase_one_messages, + delay, + ) + .await?; + + subscriber.close().await?; + + info!(topic = %self.topic, "pubsub reconnect phase 2: subscriber disconnected"); + publish_batch( + &mut publisher, + &self.topic, + "phase_disconnected", + self.disconnected_messages, + delay, + ) + .await?; + + let mut subscriber = clients[1].connect().await?; + subscriber.subscribe(&self.topic).await?; + + info!(topic = %self.topic, "pubsub reconnect phase 3: subscriber reconnected"); + publish_batch( + &mut publisher, + &self.topic, + "phase2", + self.phase_two_messages, + delay, + ) + .await?; + + let received = collect_prefixed_events( + &mut subscriber, + "phase2-", + self.phase_two_messages, + self.timeout, + ) + .await?; + + if received != self.phase_two_messages { + return Err(format!( + "reconnected subscriber saw {received}/{} phase2 messages", + self.phase_two_messages + ) + .into()); + } + + Ok(()) + } +} + +async fn publish_batch( + publisher: &mut pubsub_runtime_ext::PubSubSession, + topic: &str, + prefix: &str, + count: usize, + delay: Option, +) -> Result<(), DynError> { + for i in 0..count { + publisher.publish(topic, format!("{prefix}-{i}")).await?; + + if let Some(interval) = delay { + tokio::time::sleep(interval).await; + } + } + + Ok(()) +} + +async fn collect_prefixed_events( + subscriber: &mut pubsub_runtime_ext::PubSubSession, + payload_prefix: &str, + expected: usize, + timeout: Duration, +) -> Result { + let mut seen_ids = HashSet::new(); + let deadline = Instant::now() + timeout; + + while Instant::now() < deadline && seen_ids.len() < expected { + let Some(event) = subscriber + .next_event_timeout(Duration::from_millis(100)) + .await? + else { + continue; + }; + + if !event.payload.starts_with(payload_prefix) { + continue; + } + + if !seen_ids.insert(event.id) { + return Err("duplicate phase2 event id observed".into()); + } + } + + Ok(seen_ids.len()) +} + +fn compute_interval(rate_per_sec: usize) -> Option { + if rate_per_sec == 0 { + return None; + } + + Some(Duration::from_millis((1000 / rate_per_sec as u64).max(1))) +} diff --git a/examples/pubsub/testing/workloads/src/ws_roundtrip.rs b/examples/pubsub/testing/workloads/src/ws_roundtrip.rs new file mode 100644 index 0000000..4680ff8 --- /dev/null +++ b/examples/pubsub/testing/workloads/src/ws_roundtrip.rs @@ -0,0 +1,142 @@ +use std::{collections::HashMap, time::Duration}; + +use async_trait::async_trait; +use pubsub_runtime_ext::{PubSubEnv, PubSubEventId, PubSubSession}; +use testing_framework_core::scenario::{DynError, RunContext, Workload}; +use tokio::time::Instant; +use tracing::info; + +#[derive(Clone)] +pub struct PubSubWsRoundTripWorkload { + topic: String, + messages: usize, + publish_rate_per_sec: Option, + timeout: Duration, +} + +impl PubSubWsRoundTripWorkload { + #[must_use] + pub fn new(topic: impl Into) -> Self { + Self { + topic: topic.into(), + messages: 100, + publish_rate_per_sec: Some(20), + timeout: Duration::from_secs(20), + } + } + + #[must_use] + pub const fn messages(mut self, value: usize) -> Self { + self.messages = value; + self + } + + #[must_use] + pub const fn publish_rate_per_sec(mut self, value: usize) -> Self { + self.publish_rate_per_sec = Some(value); + self + } + + #[must_use] + pub const fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } +} + +#[async_trait] +impl Workload for PubSubWsRoundTripWorkload { + fn name(&self) -> &str { + "pubsub_ws_roundtrip_workload" + } + + async fn start(&self, ctx: &RunContext) -> Result<(), DynError> { + let clients = ctx.node_clients().snapshot(); + if clients.len() < 2 { + return Err("pubsub workload requires at least 2 nodes".into()); + } + + let mut subscribers = Vec::new(); + for client in clients.iter().take(2) { + let mut session = client.connect().await?; + session.subscribe(&self.topic).await?; + subscribers.push(session); + } + + let mut publisher = clients[0].connect().await?; + let delay = self.publish_rate_per_sec.and_then(compute_interval); + + info!(messages = self.messages, topic = %self.topic, "pubsub ws roundtrip publish phase"); + for i in 0..self.messages { + publisher.publish(&self.topic, format!("msg-{i}")).await?; + + if let Some(interval) = delay { + tokio::time::sleep(interval).await; + } + } + + let deliveries = collect_deliveries(&mut subscribers, self.messages, self.timeout).await?; + ensure_deliveries_match(&deliveries, self.messages) + } +} + +async fn collect_deliveries( + subscribers: &mut [PubSubSession], + expected_messages: usize, + timeout: Duration, +) -> Result>, DynError> { + let mut deliveries = vec![HashMap::new(); subscribers.len()]; + let deadline = Instant::now() + timeout; + + while Instant::now() < deadline && deliveries.iter().any(|seen| seen.len() < expected_messages) + { + for (idx, session) in subscribers.iter_mut().enumerate() { + if deliveries[idx].len() >= expected_messages { + continue; + } + + if let Some(event) = session + .next_event_timeout(Duration::from_millis(100)) + .await? + { + deliveries[idx].entry(event.id).or_insert(event.payload); + } + } + } + + Ok(deliveries) +} + +fn ensure_deliveries_match( + deliveries: &[HashMap], + expected_messages: usize, +) -> Result<(), DynError> { + for (idx, seen) in deliveries.iter().enumerate() { + if seen.len() != expected_messages { + return Err(format!( + "subscriber {idx} saw {}/{} messages", + seen.len(), + expected_messages + ) + .into()); + } + } + + if let Some((baseline, rest)) = deliveries.split_first() { + for seen in rest { + if seen != baseline { + return Err("subscriber deliveries diverged".into()); + } + } + } + + Ok(()) +} + +fn compute_interval(rate_per_sec: usize) -> Option { + if rate_per_sec == 0 { + return None; + } + + Some(Duration::from_millis((1000 / rate_per_sec as u64).max(1))) +}