From f3f069c32180dd08959eaf61c268fd7778a11655 Mon Sep 17 00:00:00 2001 From: andrussal Date: Sun, 12 Apr 2026 10:55:19 +0200 Subject: [PATCH] feat(examples): replace scheduler with openraft kv example --- .cargo-deny.toml | 6 +- Cargo.lock | 677 +++++++++++++++--- Cargo.toml | 10 +- .../{scheduler => openraft_kv}/Dockerfile | 11 +- examples/openraft_kv/README.md | 87 +++ examples/openraft_kv/examples/Cargo.toml | 16 + .../examples/src/bin/basic_failover.rs | 20 + .../examples/src/bin/compose_failover.rs | 20 + .../examples/src/bin/k8s_failover.rs | 166 +++++ examples/openraft_kv/examples/src/lib.rs | 40 ++ .../openraft-kv-node}/Cargo.toml | 6 +- .../openraft-kv-node/src/client.rs | 136 ++++ .../openraft-kv-node/src/config.rs | 46 ++ .../openraft_kv/openraft-kv-node/src/lib.rs | 25 + .../openraft_kv/openraft-kv-node/src/main.rs | 24 + .../openraft-kv-node/src/network.rs | 158 ++++ .../openraft-kv-node/src/server.rs | 276 +++++++ .../openraft_kv/openraft-kv-node/src/types.rs | 112 +++ .../testing/integration/Cargo.toml | 8 +- .../testing/integration/src/app.rs | 59 ++ .../testing/integration/src/compose_env.rs | 112 +++ .../testing/integration/src/k8s_env.rs | 21 + .../testing/integration/src/lib.rs | 16 + .../testing/integration/src/local_env.rs | 125 ++++ .../testing/integration/src/scenario.rs | 19 + .../openraft_kv/testing/workloads/Cargo.toml | 15 + .../testing/workloads/src/convergence.rs | 58 ++ .../testing/workloads/src/failover.rs | 201 ++++++ .../openraft_kv/testing/workloads/src/lib.rs | 14 + .../testing/workloads/src/support.rs | 325 +++++++++ examples/scheduler/README.md | 45 -- examples/scheduler/examples/Cargo.toml | 15 - .../examples/src/bin/basic_failover.rs | 33 - .../examples/src/bin/compose_failover.rs | 49 -- .../scheduler/scheduler-node/src/client.rs | 40 -- .../scheduler/scheduler-node/src/config.rs | 35 - examples/scheduler/scheduler-node/src/lib.rs | 3 - examples/scheduler/scheduler-node/src/main.rs | 36 - .../scheduler/scheduler-node/src/server.rs | 156 ---- .../scheduler/scheduler-node/src/state.rs | 249 ------- examples/scheduler/scheduler-node/src/sync.rs | 103 --- .../scheduler/testing/integration/src/app.rs | 77 -- .../testing/integration/src/compose_env.rs | 15 - .../scheduler/testing/integration/src/lib.rs | 10 - .../testing/integration/src/local_env.rs | 42 -- .../testing/integration/src/scenario.rs | 15 - .../scheduler/testing/workloads/Cargo.toml | 14 - .../testing/workloads/src/drained.rs | 99 --- .../testing/workloads/src/lease_failover.rs | 205 ------ .../scheduler/testing/workloads/src/lib.rs | 8 - 50 files changed, 2707 insertions(+), 1351 deletions(-) rename examples/{scheduler => openraft_kv}/Dockerfile (54%) create mode 100644 examples/openraft_kv/README.md create mode 100644 examples/openraft_kv/examples/Cargo.toml create mode 100644 examples/openraft_kv/examples/src/bin/basic_failover.rs create mode 100644 examples/openraft_kv/examples/src/bin/compose_failover.rs create mode 100644 examples/openraft_kv/examples/src/bin/k8s_failover.rs create mode 100644 examples/openraft_kv/examples/src/lib.rs rename examples/{scheduler/scheduler-node => openraft_kv/openraft-kv-node}/Cargo.toml (80%) create mode 100644 examples/openraft_kv/openraft-kv-node/src/client.rs create mode 100644 examples/openraft_kv/openraft-kv-node/src/config.rs create mode 100644 examples/openraft_kv/openraft-kv-node/src/lib.rs create mode 100644 examples/openraft_kv/openraft-kv-node/src/main.rs create mode 100644 examples/openraft_kv/openraft-kv-node/src/network.rs create mode 100644 examples/openraft_kv/openraft-kv-node/src/server.rs create mode 100644 examples/openraft_kv/openraft-kv-node/src/types.rs rename examples/{scheduler => openraft_kv}/testing/integration/Cargo.toml (54%) create mode 100644 examples/openraft_kv/testing/integration/src/app.rs create mode 100644 examples/openraft_kv/testing/integration/src/compose_env.rs create mode 100644 examples/openraft_kv/testing/integration/src/k8s_env.rs create mode 100644 examples/openraft_kv/testing/integration/src/lib.rs create mode 100644 examples/openraft_kv/testing/integration/src/local_env.rs create mode 100644 examples/openraft_kv/testing/integration/src/scenario.rs create mode 100644 examples/openraft_kv/testing/workloads/Cargo.toml create mode 100644 examples/openraft_kv/testing/workloads/src/convergence.rs create mode 100644 examples/openraft_kv/testing/workloads/src/failover.rs create mode 100644 examples/openraft_kv/testing/workloads/src/lib.rs create mode 100644 examples/openraft_kv/testing/workloads/src/support.rs delete mode 100644 examples/scheduler/README.md delete mode 100644 examples/scheduler/examples/Cargo.toml delete mode 100644 examples/scheduler/examples/src/bin/basic_failover.rs delete mode 100644 examples/scheduler/examples/src/bin/compose_failover.rs delete mode 100644 examples/scheduler/scheduler-node/src/client.rs delete mode 100644 examples/scheduler/scheduler-node/src/config.rs delete mode 100644 examples/scheduler/scheduler-node/src/lib.rs delete mode 100644 examples/scheduler/scheduler-node/src/main.rs delete mode 100644 examples/scheduler/scheduler-node/src/server.rs delete mode 100644 examples/scheduler/scheduler-node/src/state.rs delete mode 100644 examples/scheduler/scheduler-node/src/sync.rs delete mode 100644 examples/scheduler/testing/integration/src/app.rs delete mode 100644 examples/scheduler/testing/integration/src/compose_env.rs delete mode 100644 examples/scheduler/testing/integration/src/lib.rs delete mode 100644 examples/scheduler/testing/integration/src/local_env.rs delete mode 100644 examples/scheduler/testing/integration/src/scenario.rs delete mode 100644 examples/scheduler/testing/workloads/Cargo.toml delete mode 100644 examples/scheduler/testing/workloads/src/drained.rs delete mode 100644 examples/scheduler/testing/workloads/src/lease_failover.rs delete mode 100644 examples/scheduler/testing/workloads/src/lib.rs diff --git a/.cargo-deny.toml b/.cargo-deny.toml index 13fb949..8994450 100644 --- a/.cargo-deny.toml +++ b/.cargo-deny.toml @@ -6,7 +6,11 @@ exclude-dev = true no-default-features = true [advisories] -ignore = ["RUSTSEC-2026-0097"] +ignore = [ + # Existing workspace dependencies still resolve rand 0.8 via tera/tokio-retry. + # Track removal when those upstream edges move to a fixed release. + "RUSTSEC-2026-0097", +] yanked = "deny" [bans] diff --git a/Cargo.lock b/Cargo.lock index 395a13c..c7f4a48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,17 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "ahash" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" +dependencies = [ + "getrandom 0.2.17", + "once_cell", + "version_check", +] + [[package]] name = "ahash" version = "0.8.12" @@ -89,12 +100,27 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "anyerror" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71add24cc141a1e8326f249b74c41cfd217aeb2a67c9c6cf9134d175469afd49" +dependencies = [ + "serde", +] + [[package]] name = "anyhow" version = "1.0.101" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea" +[[package]] +name = "arrayvec" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" + [[package]] name = "async-broadcast" version = "0.7.2" @@ -126,7 +152,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] [[package]] @@ -137,7 +163,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] [[package]] @@ -230,6 +256,18 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" +[[package]] +name = "bitvec" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" +dependencies = [ + "funty", + "radium", + "tap", + "wyz", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -239,6 +277,30 @@ dependencies = [ "generic-array", ] +[[package]] +name = "borsh" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfd1e3f8955a5d7de9fab72fc8373fade9fb8a703968cb200ae3dc6cf08e185a" +dependencies = [ + "borsh-derive", + "bytes", + "cfg_aliases", +] + +[[package]] +name = "borsh-derive" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfcfdc083699101d5a7965e49925975f2f55060f94f9a05e7187be95d530ca59" +dependencies = [ + "once_cell", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "bstr" version = "1.12.1" @@ -255,6 +317,40 @@ version = "3.19.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5dd9dc738b7a8311c7ade152424974d8115f2cdad61e8dab8dac9f2362298510" +[[package]] +name = "byte-unit" +version = "5.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c6d47a4e2961fb8721bcfc54feae6455f2f64e7054f9bc67e875f0e77f4c58d" +dependencies = [ + "rust_decimal", + "schemars", + "serde", + "utf8-width", +] + +[[package]] +name = "bytecheck" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23cdc57ce23ac53c931e88a43d06d070a6fd142f2617be5855eb75efc9beb1c2" +dependencies = [ + "bytecheck_derive", + "ptr_meta", + "simdutf8", +] + +[[package]] +name = "bytecheck_derive" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3db406d29fbcd95542e92559bed4d8ad92636d1ca8b3b72ede10b4bcc010e659" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "bytes" version = "1.11.1" @@ -277,6 +373,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "cfgsync-adapter" version = "0.1.0" @@ -336,7 +438,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fac4744fb15ae8337dc853fee7fb3f4e48c0fbaa23d0afe49c447b4fab126118" dependencies = [ "iana-time-zone", + "js-sys", "num-traits", + "wasm-bindgen", "windows-link", ] @@ -393,7 +497,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] [[package]] @@ -417,6 +521,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "convert_case" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "633458d4ef8c78b72454de2d54fd6ab2e60f9e02be22f3c6104cdc8a4e0fceb9" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -512,10 +625,12 @@ version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "799a97264921d8623a957f6c3b9011f3b5492f557bbb7a5a19b7fa6d06ba8dcb" dependencies = [ + "convert_case", "proc-macro2", "quote", "rustc_version", - "syn", + "syn 2.0.114", + "unicode-xid", ] [[package]] @@ -542,9 +657,15 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] +[[package]] +name = "dyn-clone" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" + [[package]] name = "educe" version = "0.6.0" @@ -554,7 +675,7 @@ dependencies = [ "enum-ordinalize", "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] [[package]] @@ -572,7 +693,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] [[package]] @@ -592,7 +713,7 @@ checksum = "8ca9601fb2d62598ee17836250842873a413586e5d7ed88b356e38ddbb0ec631" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] [[package]] @@ -686,6 +807,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "funty" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" + [[package]] name = "futures" version = "0.3.31" @@ -694,6 +821,7 @@ checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -716,6 +844,17 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.31" @@ -730,7 +869,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] [[package]] @@ -851,6 +990,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash 0.7.8", +] + [[package]] name = "hashbrown" version = "0.16.1" @@ -1184,7 +1332,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.16.1", ] [[package]] @@ -1236,7 +1384,7 @@ checksum = "2a8c8b344124222efd714b73bb41f8b5120b27a7cc1c75593a6ff768d9d05aa4" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] [[package]] @@ -1367,13 +1515,13 @@ version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c072737075826ee74d3e615e80334e41e617ca3d14fb46ef7cdfda822d6f15f2" dependencies = [ - "ahash", + "ahash 0.8.12", "async-broadcast", "async-stream", "backon", "educe", "futures", - "hashbrown", + "hashbrown 0.16.1", "hostname", "json-patch", "k8s-openapi", @@ -1498,6 +1646,12 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "maplit" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" + [[package]] name = "matchers" version = "0.2.0" @@ -1589,6 +1743,141 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" +[[package]] +name = "openraft" +version = "0.10.0-alpha.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0b9d8db10f834d517e4c2c45ab5c645bc5cafee9d07f7b150b8029a0b1ebdca" +dependencies = [ + "anyerror", + "byte-unit", + "chrono", + "clap", + "derive_more", + "futures-util", + "maplit", + "openraft-macros", + "openraft-rt", + "openraft-rt-tokio", + "peel-off", + "rand 0.9.3", + "serde", + "thiserror 2.0.18", + "tracing", + "validit", +] + +[[package]] +name = "openraft-kv-examples" +version = "0.1.0" +dependencies = [ + "anyhow", + "openraft-kv-node", + "openraft-kv-runtime-ext", + "openraft-kv-runtime-workloads", + "testing-framework-core", + "testing-framework-runner-k8s", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "openraft-kv-node" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum", + "clap", + "openraft", + "openraft-memstore", + "reqwest", + "serde", + "serde_yaml", + "tokio", + "tower-http", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "openraft-kv-runtime-ext" +version = "0.1.0" +dependencies = [ + "openraft-kv-node", + "reqwest", + "testing-framework-core", + "testing-framework-runner-compose", + "testing-framework-runner-k8s", + "testing-framework-runner-local", +] + +[[package]] +name = "openraft-kv-runtime-workloads" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "openraft-kv-node", + "openraft-kv-runtime-ext", + "testing-framework-core", + "thiserror 2.0.18", + "tokio", + "tracing", +] + +[[package]] +name = "openraft-macros" +version = "0.10.0-alpha.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22b0bd215948ed47997a1d0447ea592e49220096360a833b118f329a08aa286" +dependencies = [ + "chrono", + "proc-macro2", + "quote", + "semver", + "syn 2.0.114", +] + +[[package]] +name = "openraft-memstore" +version = "0.10.0-alpha.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49ad7dc4bd822208b76a010c673178e3d5edf3ebd00146b0c555e2ee0088e523" +dependencies = [ + "derive_more", + "futures", + "openraft", + "serde", + "serde_json", + "tokio", + "tracing", +] + +[[package]] +name = "openraft-rt" +version = "0.10.0-alpha.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55b651e6e2f25d022e34549e605eb8875c78ebc26862b16b06143a551e53ec00" +dependencies = [ + "futures-channel", + "futures-util", + "openraft-macros", + "rand 0.9.3", +] + +[[package]] +name = "openraft-rt-tokio" +version = "0.10.0-alpha.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "478d5625fdeb13293e68549ba1d42b7a25085f3be04204412147637ad22e2827" +dependencies = [ + "futures-util", + "openraft-rt", + "rand 0.9.3", + "tokio", +] + [[package]] name = "openssl" version = "0.10.75" @@ -1612,7 +1901,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] [[package]] @@ -1686,6 +1975,12 @@ dependencies = [ "regex", ] +[[package]] +name = "peel-off" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3420ea4424090cbd75a688996f696a807c68d6744b4863591b86435dc3078e9" + [[package]] name = "pem" version = "3.0.6" @@ -1732,7 +2027,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] [[package]] @@ -1771,7 +2066,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d" dependencies = [ "phf_shared", - "rand", + "rand 0.8.5", ] [[package]] @@ -1800,7 +2095,7 @@ checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] [[package]] @@ -1860,6 +2155,15 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "proc-macro-crate" +version = "3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e67ba7e9b2b56446f1d419b1d807906278ffa1a658a8a5d8a39dcb1f5a78614f" +dependencies = [ + "toml_edit", +] + [[package]] name = "proc-macro2" version = "1.0.106" @@ -1883,6 +2187,26 @@ dependencies = [ "url", ] +[[package]] +name = "ptr_meta" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0738ccf7ea06b608c10564b31debd4f5bc5e197fc8bfe088f68ae5ce81e7a4f1" +dependencies = [ + "ptr_meta_derive", +] + +[[package]] +name = "ptr_meta_derive" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16b845dbfca988fa33db069c0e230574d15a3088f147a87b64c7589eb662c9ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "queue-examples" version = "0.1.0" @@ -1954,6 +2278,12 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "radium" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" + [[package]] name = "rand" version = "0.8.5" @@ -1961,8 +2291,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ec095654a25171c2124e9e3393a930bddbffdc939556c914957a4c3e0a87166" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.5", ] [[package]] @@ -1972,7 +2312,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.5", ] [[package]] @@ -1984,6 +2334,15 @@ dependencies = [ "getrandom 0.2.17", ] +[[package]] +name = "rand_core" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76afc826de14238e6e8c374ddcc1fa19e374fd8dd986b0d2af0d02377261d83c" +dependencies = [ + "getrandom 0.3.4", +] + [[package]] name = "redox_syscall" version = "0.5.18" @@ -1993,6 +2352,26 @@ dependencies = [ "bitflags", ] +[[package]] +name = "ref-cast" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f354300ae66f76f1c85c5f84693f0ce81d747e2c3f21a45fef496d89c960bf7d" +dependencies = [ + "ref-cast-impl", +] + +[[package]] +name = "ref-cast-impl" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "regex" version = "1.12.3" @@ -2022,6 +2401,15 @@ version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a96887878f22d7bad8a3b6dc5b7440e0ada9a245242924394987b21cf2210a4c" +[[package]] +name = "rend" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71fe3824f5629716b1589be05dacd749f6aa084c87e00e016714a8cdfccc997c" +dependencies = [ + "bytecheck", +] + [[package]] name = "reqwest" version = "0.12.28" @@ -2072,6 +2460,52 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rkyv" +version = "0.7.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2297bf9c81a3f0dc96bc9521370b88f054168c29826a75e89c55ff196e7ed6a1" +dependencies = [ + "bitvec", + "bytecheck", + "bytes", + "hashbrown 0.12.3", + "ptr_meta", + "rend", + "rkyv_derive", + "seahash", + "tinyvec", + "uuid", +] + +[[package]] +name = "rkyv_derive" +version = "0.7.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84d7b42d4b8d06048d3ac8db0eb31bcb942cbeb709f0b5f2b2ebde398d3038f5" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "rust_decimal" +version = "1.41.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ce901f9a19d251159075a4c37af514c3b8ef99c22e02dd8c19161cf397ee94a" +dependencies = [ + "arrayvec", + "borsh", + "bytes", + "num-traits", + "rand 0.8.5", + "rkyv", + "serde", + "serde_json", + "wasm-bindgen", +] + [[package]] name = "rustc_version" version = "0.4.1" @@ -2185,58 +2619,15 @@ dependencies = [ ] [[package]] -name = "scheduler-examples" -version = "0.1.0" +name = "schemars" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc" dependencies = [ - "anyhow", - "scheduler-runtime-ext", - "scheduler-runtime-workloads", - "testing-framework-core", - "testing-framework-runner-compose", - "tokio", - "tracing", - "tracing-subscriber", -] - -[[package]] -name = "scheduler-node" -version = "0.1.0" -dependencies = [ - "anyhow", - "axum", - "clap", - "reqwest", + "dyn-clone", + "ref-cast", "serde", - "serde_yaml", - "tokio", - "tower-http", - "tracing", - "tracing-subscriber", -] - -[[package]] -name = "scheduler-runtime-ext" -version = "0.1.0" -dependencies = [ - "async-trait", - "scheduler-node", - "serde", - "testing-framework-core", - "testing-framework-runner-compose", - "testing-framework-runner-local", -] - -[[package]] -name = "scheduler-runtime-workloads" -version = "0.1.0" -dependencies = [ - "async-trait", - "scheduler-node", - "scheduler-runtime-ext", - "serde", - "testing-framework-core", - "tokio", - "tracing", + "serde_json", ] [[package]] @@ -2245,6 +2636,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "seahash" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" + [[package]] name = "secrecy" version = "0.10.3" @@ -2333,7 +2730,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] [[package]] @@ -2421,6 +2818,12 @@ dependencies = [ "libc", ] +[[package]] +name = "simdutf8" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" + [[package]] name = "siphasher" version = "1.0.2" @@ -2477,6 +2880,17 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.114" @@ -2505,9 +2919,15 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] +[[package]] +name = "tap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" + [[package]] name = "tempfile" version = "3.24.0" @@ -2535,7 +2955,7 @@ dependencies = [ "percent-encoding", "pest", "pest_derive", - "rand", + "rand 0.8.5", "regex", "serde", "serde_json", @@ -2555,7 +2975,7 @@ dependencies = [ "futures", "parking_lot", "prometheus-http-query", - "rand", + "rand 0.8.5", "reqwest", "serde", "serde_yaml", @@ -2651,7 +3071,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] [[package]] @@ -2662,7 +3082,7 @@ checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] [[package]] @@ -2714,6 +3134,21 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinyvec" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e61e67053d25a4e82c844e8424039d9745781b3fc4f32b8d55ed50f5f667ef3" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.49.0" @@ -2728,6 +3163,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", + "tracing", "windows-sys 0.61.2", ] @@ -2739,7 +3175,7 @@ checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] [[package]] @@ -2759,7 +3195,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" dependencies = [ "pin-project", - "rand", + "rand 0.8.5", "tokio", ] @@ -2787,6 +3223,36 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml_datetime" +version = "1.1.1+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3165f65f62e28e0115a00b2ebdd37eb6f3b641855f9d636d3cd4103767159ad7" +dependencies = [ + "serde_core", +] + +[[package]] +name = "toml_edit" +version = "0.25.11+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b59c4d22ed448339746c59b905d24568fcbb3ab65a500494f7b8c3e97739f2b" +dependencies = [ + "indexmap", + "toml_datetime", + "toml_parser", + "winnow", +] + +[[package]] +name = "toml_parser" +version = "1.1.2+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2abe9b86193656635d2411dc43050282ca48aa31c2451210f4202550afb7526" +dependencies = [ + "winnow", +] + [[package]] name = "tower" version = "0.5.3" @@ -2857,7 +3323,7 @@ checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] [[package]] @@ -2929,6 +3395,12 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "unsafe-libyaml" version = "0.2.11" @@ -2954,6 +3426,12 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "utf8-width" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1292c0d970b54115d14f2492fe0170adf21d68a1de108eebc51c1df4f346a091" + [[package]] name = "utf8_iter" version = "1.0.4" @@ -2977,6 +3455,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "validit" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4efba0434d5a0a62d4f22070b44ce055dc18cb64d4fa98276aa523dadfaba0e7" +dependencies = [ + "anyerror", +] + [[package]] name = "valuable" version = "0.1.1" @@ -3075,7 +3562,7 @@ dependencies = [ "bumpalo", "proc-macro2", "quote", - "syn", + "syn 2.0.114", "wasm-bindgen-shared", ] @@ -3140,7 +3627,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] [[package]] @@ -3151,7 +3638,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] [[package]] @@ -3334,6 +3821,15 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" +[[package]] +name = "winnow" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09dac053f1cd375980747450bfc7250c264eaae0583872e845c0c7cd578872b5" +dependencies = [ + "memchr", +] + [[package]] name = "winsafe" version = "0.0.19" @@ -3352,6 +3848,15 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" +[[package]] +name = "wyz" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed" +dependencies = [ + "tap", +] + [[package]] name = "yoke" version = "0.8.1" @@ -3371,7 +3876,7 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.114", "synstructure", ] @@ -3392,7 +3897,7 @@ checksum = "4122cd3169e94605190e77839c9a40d40ed048d305bfdc146e7df40ab0f3e517" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] [[package]] @@ -3412,7 +3917,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.114", "synstructure", ] @@ -3452,7 +3957,7 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.114", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 4bba838..3715a7c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,14 +8,14 @@ members = [ "examples/kvstore/kvstore-node", "examples/kvstore/testing/integration", "examples/kvstore/testing/workloads", + "examples/openraft_kv/examples", + "examples/openraft_kv/openraft-kv-node", + "examples/openraft_kv/testing/integration", + "examples/openraft_kv/testing/workloads", "examples/queue/examples", "examples/queue/queue-node", "examples/queue/testing/integration", "examples/queue/testing/workloads", - "examples/scheduler/examples", - "examples/scheduler/scheduler-node", - "examples/scheduler/testing/integration", - "examples/scheduler/testing/workloads", "testing-framework/core", "testing-framework/deployers/compose", "testing-framework/deployers/k8s", @@ -54,6 +54,8 @@ bytes = { default-features = false, version = "1.3" } hex = { default-features = false, version = "0.4.3" } libp2p = { default-features = false, version = "0.55" } num-bigint = { default-features = false, version = "0.4" } +openraft = { default-features = true, features = ["serde", "type-alias"], version = "0.10.0-alpha.17" } +openraft-memstore = { default-features = true, version = "0.10.0-alpha.17" } parking_lot = { default-features = false, version = "0.12" } rand = { default-features = false, features = ["std", "std_rng"], version = "0.8" } reqwest = { default-features = false, version = "0.12" } diff --git a/examples/scheduler/Dockerfile b/examples/openraft_kv/Dockerfile similarity index 54% rename from examples/scheduler/Dockerfile rename to examples/openraft_kv/Dockerfile index 625ef34..bc54ff1 100644 --- a/examples/scheduler/Dockerfile +++ b/examples/openraft_kv/Dockerfile @@ -1,3 +1,4 @@ +# Build stage FROM rustlang/rust:nightly-bookworm AS builder WORKDIR /build @@ -7,7 +8,7 @@ COPY cfgsync/ ./cfgsync/ COPY examples/ ./examples/ COPY testing-framework/ ./testing-framework/ -RUN cargo build --release -p scheduler-node +RUN cargo build --release -p openraft-kv-node FROM debian:bookworm-slim @@ -15,10 +16,10 @@ RUN apt-get update && \ apt-get install -y ca-certificates && \ rm -rf /var/lib/apt/lists/* -COPY --from=builder /build/target/release/scheduler-node /usr/local/bin/scheduler-node +COPY --from=builder /build/target/release/openraft-kv-node /usr/local/bin/openraft-kv-node -RUN mkdir -p /etc/scheduler +RUN mkdir -p /etc/openraft-kv WORKDIR /app -ENTRYPOINT ["/usr/local/bin/scheduler-node"] -CMD ["--config", "/etc/scheduler/config.yaml"] +ENTRYPOINT ["/usr/local/bin/openraft-kv-node"] +CMD ["--config", "/etc/openraft-kv/config.yaml"] diff --git a/examples/openraft_kv/README.md b/examples/openraft_kv/README.md new file mode 100644 index 0000000..c358e21 --- /dev/null +++ b/examples/openraft_kv/README.md @@ -0,0 +1,87 @@ +# OpenRaft KV Example + +This example runs a small key-value service built on top of `OpenRaft`. + +The main scenario does four things: + +- bootstraps node 0 as a one-node cluster +- adds nodes 1 and 2 as learners and promotes them to voters +- writes one batch of keys through the current leader +- restarts that leader, waits for a new leader, writes again, and then checks + that all three nodes expose the same replicated state + +## How TF runs this + +- TF starts three OpenRaft nodes +- the workload bootstraps the cluster through the admin API +- the workload writes a first batch, restarts the current leader, waits for failover, and writes again +- the expectation checks that all three nodes converge on the same key/value state and membership + +## Scenario + +- `basic_failover` runs the leader-restart flow locally +- `compose_failover` runs the same flow in Docker Compose +- `k8s_failover` runs the same flow against a manual Kubernetes cluster deployment + +## API + +Each node exposes: + +- `GET /healthz` for readiness +- `GET /state` for current Raft role, leader, membership, log progress, and replicated key/value data +- `POST /kv/write` to submit a write through the local Raft node +- `POST /kv/read` to read a key from the local state machine +- `POST /admin/init` to initialize a single-node cluster +- `POST /admin/add-learner` to add a new Raft learner +- `POST /admin/change-membership` to promote learners into the voting set + +The node also exposes internal Raft RPC endpoints used only for replication: + +- `POST /raft/vote` +- `POST /raft/append` +- `POST /raft/snapshot` + +## Run locally + +```bash +OPENRAFT_KV_NODE_BIN="$(pwd)/target/debug/openraft-kv-node" \ +cargo run -p openraft-kv-examples --bin basic_failover +``` + +Build the node first if you have not done that yet: + +```bash +cargo build -p openraft-kv-node +``` + +## Run with Docker Compose + +Build the image first: + +```bash +docker build -t openraft-kv-node:local -f examples/openraft_kv/Dockerfile . +``` + +Then run: + +```bash +cargo run -p openraft-kv-examples --bin compose_failover +``` + +Set `OPENRAFT_KV_IMAGE` to override the default compose image tag. + +## Run on Kubernetes + +Build the same image first: + +```bash +docker build -t openraft-kv-node:local -f examples/openraft_kv/Dockerfile . +``` + +Then run: + +```bash +cargo run -p openraft-kv-examples --bin k8s_failover +``` + +If no cluster is available, the example exits early and prints a skip message. diff --git a/examples/openraft_kv/examples/Cargo.toml b/examples/openraft_kv/examples/Cargo.toml new file mode 100644 index 0000000..847f837 --- /dev/null +++ b/examples/openraft_kv/examples/Cargo.toml @@ -0,0 +1,16 @@ +[package] +edition.workspace = true +license.workspace = true +name = "openraft-kv-examples" +version.workspace = true + +[dependencies] +anyhow = "1.0" +openraft-kv-node = { path = "../openraft-kv-node" } +openraft-kv-runtime-ext = { path = "../testing/integration" } +openraft-kv-runtime-workloads = { path = "../testing/workloads" } +testing-framework-core = { workspace = true } +testing-framework-runner-k8s = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/examples/openraft_kv/examples/src/bin/basic_failover.rs b/examples/openraft_kv/examples/src/bin/basic_failover.rs new file mode 100644 index 0000000..42bda85 --- /dev/null +++ b/examples/openraft_kv/examples/src/bin/basic_failover.rs @@ -0,0 +1,20 @@ +use std::time::Duration; + +use openraft_kv_examples::build_failover_scenario; +use openraft_kv_runtime_ext::OpenRaftKvLocalDeployer; +use testing_framework_core::scenario::Deployer; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let mut scenario = build_failover_scenario(Duration::from_secs(45), Duration::from_secs(30))?; + + let deployer = OpenRaftKvLocalDeployer::default(); + let runner = deployer.deploy(&scenario).await?; + runner.run(&mut scenario).await?; + + Ok(()) +} diff --git a/examples/openraft_kv/examples/src/bin/compose_failover.rs b/examples/openraft_kv/examples/src/bin/compose_failover.rs new file mode 100644 index 0000000..be548af --- /dev/null +++ b/examples/openraft_kv/examples/src/bin/compose_failover.rs @@ -0,0 +1,20 @@ +use std::time::Duration; + +use openraft_kv_examples::build_failover_scenario; +use openraft_kv_runtime_ext::OpenRaftKvComposeDeployer; +use testing_framework_core::scenario::Deployer; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let mut scenario = build_failover_scenario(Duration::from_secs(60), Duration::from_secs(40))?; + + let deployer = OpenRaftKvComposeDeployer::new(); + let runner = deployer.deploy(&scenario).await?; + runner.run(&mut scenario).await?; + + Ok(()) +} diff --git a/examples/openraft_kv/examples/src/bin/k8s_failover.rs b/examples/openraft_kv/examples/src/bin/k8s_failover.rs new file mode 100644 index 0000000..38ba53b --- /dev/null +++ b/examples/openraft_kv/examples/src/bin/k8s_failover.rs @@ -0,0 +1,166 @@ +use std::time::Duration; + +use anyhow::{Context as _, Result, anyhow}; +use openraft_kv_examples::{ + INITIAL_WRITE_BATCH, RAFT_KEY_PREFIX, SECOND_WRITE_BATCH, TOTAL_WRITES, +}; +use openraft_kv_node::OpenRaftKvClient; +use openraft_kv_runtime_ext::{OpenRaftKvEnv, OpenRaftKvK8sDeployer, OpenRaftKvTopology}; +use openraft_kv_runtime_workloads::{ + OpenRaftMembership, resolve_client_for_node, wait_for_leader, wait_for_membership, + wait_for_replication, write_batch, +}; +use testing_framework_runner_k8s::{ManualCluster, ManualClusterError}; +use tracing::{info, warn}; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let deployer = OpenRaftKvK8sDeployer::new(); + let cluster = match deployer + .manual_cluster_from_descriptors(OpenRaftKvTopology::new(3)) + .await + { + Ok(cluster) => cluster, + Err(ManualClusterError::ClientInit { source }) => { + warn!("k8s unavailable ({source}); skipping openraft k8s run"); + + return Ok(()); + } + Err(ManualClusterError::InstallStack { source }) + if k8s_cluster_unavailable(&source.to_string()) => + { + warn!("k8s unavailable ({source}); skipping openraft k8s run"); + + return Ok(()); + } + Err(error) => { + return Err(anyhow::Error::new(error)).context("creating openraft k8s cluster"); + } + }; + + run_failover(cluster, Duration::from_secs(40)).await +} + +async fn run_failover(cluster: ManualCluster, timeout: Duration) -> Result<()> { + let mut clients = start_cluster(&cluster).await?; + + clients[0].init_self().await?; + + let initial_leader = wait_for_leader(&clients, timeout, None).await?; + let membership = OpenRaftMembership::discover(&clients).await?; + + add_learners_and_promote(&clients, initial_leader, &membership, timeout).await?; + write_initial_batch(&clients, initial_leader, timeout).await?; + + restart_leader(&cluster, initial_leader).await?; + refresh_clients(&cluster, &mut clients)?; + + let new_leader = wait_for_leader(&clients, timeout, Some(initial_leader)).await?; + write_second_batch(&clients, new_leader, timeout).await?; + + let expected = openraft_kv_runtime_workloads::expected_kv(RAFT_KEY_PREFIX, TOTAL_WRITES); + wait_for_replication(&clients, &expected, timeout).await?; + + cluster.stop_all(); + + Ok(()) +} + +async fn start_cluster(cluster: &ManualCluster) -> Result> { + let node0 = cluster.start_node("node-0").await?.client; + let node1 = cluster.start_node("node-1").await?.client; + let node2 = cluster.start_node("node-2").await?.client; + + cluster.wait_network_ready().await?; + + Ok(vec![node0, node1, node2]) +} + +async fn add_learners_and_promote( + clients: &[OpenRaftKvClient], + leader_id: u64, + membership: &OpenRaftMembership, + timeout: Duration, +) -> Result<()> { + let leader = resolve_client_for_node(clients, leader_id, timeout).await?; + + for learner in membership.learner_targets(leader_id) { + info!( + target = learner.node_id, + addr = %learner.public_addr, + "adding learner" + ); + + leader + .add_learner(learner.node_id, &learner.public_addr) + .await?; + } + + let voter_ids = membership.voter_ids(); + leader.change_membership(voter_ids.iter().copied()).await?; + + wait_for_membership(clients, &voter_ids, timeout).await?; + + Ok(()) +} + +async fn write_initial_batch( + clients: &[OpenRaftKvClient], + leader_id: u64, + timeout: Duration, +) -> Result<()> { + let leader = resolve_client_for_node(clients, leader_id, timeout).await?; + write_batch(&leader, RAFT_KEY_PREFIX, 0, INITIAL_WRITE_BATCH).await?; + + Ok(()) +} + +async fn write_second_batch( + clients: &[OpenRaftKvClient], + leader_id: u64, + timeout: Duration, +) -> Result<()> { + let leader = resolve_client_for_node(clients, leader_id, timeout).await?; + write_batch( + &leader, + RAFT_KEY_PREFIX, + INITIAL_WRITE_BATCH, + SECOND_WRITE_BATCH, + ) + .await?; + + Ok(()) +} + +async fn restart_leader(cluster: &ManualCluster, leader_id: u64) -> Result<()> { + let leader_name = format!("node-{leader_id}"); + info!(%leader_name, "restarting current leader"); + + cluster.restart_node(&leader_name).await?; + cluster.wait_network_ready().await?; + + Ok(()) +} + +fn refresh_clients( + cluster: &ManualCluster, + clients: &mut [OpenRaftKvClient], +) -> Result<()> { + for (index, slot) in clients.iter_mut().enumerate() { + *slot = cluster + .node_client(&format!("node-{index}")) + .ok_or_else(|| anyhow!("node-{index} client missing after restart"))?; + } + + Ok(()) +} + +fn k8s_cluster_unavailable(message: &str) -> bool { + message.contains("Unable to connect to the server") + || message.contains("TLS handshake timeout") + || message.contains("connection refused") +} diff --git a/examples/openraft_kv/examples/src/lib.rs b/examples/openraft_kv/examples/src/lib.rs new file mode 100644 index 0000000..4ab036c --- /dev/null +++ b/examples/openraft_kv/examples/src/lib.rs @@ -0,0 +1,40 @@ +use std::time::Duration; + +use openraft_kv_runtime_ext::{OpenRaftKvBuilderExt, OpenRaftKvEnv, OpenRaftKvScenarioBuilder}; +use openraft_kv_runtime_workloads::{OpenRaftKvConverges, OpenRaftKvFailoverWorkload}; +use testing_framework_core::scenario::{NodeControlCapability, Scenario}; + +/// Number of writes issued before the leader restart. +pub const INITIAL_WRITE_BATCH: usize = 8; +/// Number of writes issued after the leader restart. +pub const SECOND_WRITE_BATCH: usize = 8; +/// Total write count expected after the scenario completes. +pub const TOTAL_WRITES: usize = INITIAL_WRITE_BATCH + SECOND_WRITE_BATCH; +/// Key prefix shared by the failover workload and convergence expectation. +pub const RAFT_KEY_PREFIX: &str = "raft-key"; + +/// Builds the standard failover scenario used by the local and compose +/// binaries. +pub fn build_failover_scenario( + run_duration: Duration, + workload_timeout: Duration, +) -> anyhow::Result> { + Ok( + OpenRaftKvScenarioBuilder::deployment_with(|deployment| deployment) + .enable_node_control() + .with_run_duration(run_duration) + .with_workload( + OpenRaftKvFailoverWorkload::new() + .first_batch(INITIAL_WRITE_BATCH) + .second_batch(SECOND_WRITE_BATCH) + .timeout(workload_timeout) + .key_prefix(RAFT_KEY_PREFIX), + ) + .with_expectation( + OpenRaftKvConverges::new(TOTAL_WRITES) + .timeout(run_duration) + .key_prefix(RAFT_KEY_PREFIX), + ) + .build()?, + ) +} diff --git a/examples/scheduler/scheduler-node/Cargo.toml b/examples/openraft_kv/openraft-kv-node/Cargo.toml similarity index 80% rename from examples/scheduler/scheduler-node/Cargo.toml rename to examples/openraft_kv/openraft-kv-node/Cargo.toml index 6f2e682..a8d92db 100644 --- a/examples/scheduler/scheduler-node/Cargo.toml +++ b/examples/openraft_kv/openraft-kv-node/Cargo.toml @@ -1,17 +1,19 @@ [package] edition.workspace = true license.workspace = true -name = "scheduler-node" +name = "openraft-kv-node" version.workspace = true [[bin]] -name = "scheduler-node" +name = "openraft-kv-node" path = "src/main.rs" [dependencies] anyhow = "1.0" axum = "0.7" clap = { version = "4.0", features = ["derive"] } +openraft = { workspace = true } +openraft-memstore = { workspace = true } reqwest = { workspace = true, features = ["json"] } serde = { workspace = true } serde_yaml = { workspace = true } diff --git a/examples/openraft_kv/openraft-kv-node/src/client.rs b/examples/openraft_kv/openraft-kv-node/src/client.rs new file mode 100644 index 0000000..bc7c381 --- /dev/null +++ b/examples/openraft_kv/openraft-kv-node/src/client.rs @@ -0,0 +1,136 @@ +use std::{collections::BTreeSet, time::Duration}; + +use reqwest::Url; +use serde::{Serialize, de::DeserializeOwned}; + +use crate::types::{ + AddLearnerRequest, AddLearnerResult, ChangeMembershipRequest, ChangeMembershipResult, + InitResult, OpenRaftKvReadRequest, OpenRaftKvReadResponse, OpenRaftKvState, + OpenRaftKvWriteRequest, OpenRaftKvWriteResponse, +}; + +/// Small HTTP client for the OpenRaft example node and its admin endpoints. +#[derive(Clone)] +pub struct OpenRaftKvClient { + base_url: Url, + client: reqwest::Client, +} + +impl OpenRaftKvClient { + /// Builds a client for one node base URL. + #[must_use] + pub fn new(base_url: Url) -> Self { + Self { + base_url, + client: reqwest::Client::builder() + .timeout(Duration::from_secs(2)) + .connect_timeout(Duration::from_secs(2)) + .build() + .expect("openraft kv client timeout configuration is valid"), + } + } + + /// Fetches the node's current Raft and application state. + pub async fn state(&self) -> anyhow::Result { + self.get("state").await + } + + /// Replicates one key/value write through the current leader. + pub async fn write( + &self, + key: &str, + value: &str, + serial: u64, + ) -> anyhow::Result { + self.post_result( + "kv/write", + &OpenRaftKvWriteRequest { + key: key.to_owned(), + value: value.to_owned(), + serial, + }, + ) + .await + } + + /// Reads one key from the replicated state machine. + pub async fn read(&self, key: &str) -> anyhow::Result> { + let response: OpenRaftKvReadResponse = self + .post_result( + "kv/read", + &OpenRaftKvReadRequest { + key: key.to_owned(), + }, + ) + .await?; + Ok(response.value) + } + + /// Bootstraps a one-node cluster on this node. + pub async fn init_self(&self) -> anyhow::Result<()> { + let _: InitResult = self.post("admin/init", &()).await?; + Ok(()) + } + + /// Registers another node as a learner with the current leader. + pub async fn add_learner(&self, node_id: u64, addr: &str) -> anyhow::Result<()> { + let _: AddLearnerResult = self + .post( + "admin/add-learner", + &AddLearnerRequest { + node_id, + addr: addr.to_owned(), + }, + ) + .await?; + Ok(()) + } + + /// Promotes the cluster to the provided voter set. + pub async fn change_membership( + &self, + voters: impl IntoIterator, + ) -> anyhow::Result<()> { + let voters = normalize_voters(voters); + let request = ChangeMembershipRequest { voters }; + + let _: ChangeMembershipResult = self.post("admin/change-membership", &request).await?; + Ok(()) + } + + async fn get(&self, path: &str) -> anyhow::Result { + let url = self.base_url.join(path)?; + let response = self.client.get(url).send().await?; + let response = response.error_for_status()?; + + Ok(response.json().await?) + } + + async fn post( + &self, + path: &str, + body: &B, + ) -> anyhow::Result { + let url = self.base_url.join(path)?; + + let response = self.client.post(url).json(body).send().await?; + + let response = response.error_for_status()?; + + Ok(response.json().await?) + } + + async fn post_result( + &self, + path: &str, + body: &B, + ) -> anyhow::Result { + let result: Result = self.post(path, body).await?; + result.map_err(anyhow::Error::msg) + } +} + +fn normalize_voters(voters: impl IntoIterator) -> Vec { + let unique_voters = voters.into_iter().collect::>(); + unique_voters.into_iter().collect() +} diff --git a/examples/openraft_kv/openraft-kv-node/src/config.rs b/examples/openraft_kv/openraft-kv-node/src/config.rs new file mode 100644 index 0000000..2d767bd --- /dev/null +++ b/examples/openraft_kv/openraft-kv-node/src/config.rs @@ -0,0 +1,46 @@ +use std::{collections::BTreeMap, fs, path::Path}; + +use serde::{Deserialize, Serialize}; + +/// Static node config written by TF for one OpenRaft node process. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct OpenRaftKvNodeConfig { + /// Stable OpenRaft node identifier. + pub node_id: u64, + /// HTTP port bound by the node process. + pub http_port: u16, + /// Advertised Raft address for this node. + pub public_addr: String, + /// Advertised Raft addresses for the other known nodes. + #[serde(default)] + pub peer_addrs: BTreeMap, + /// Heartbeat interval passed to the OpenRaft config. + #[serde(default = "default_heartbeat_interval_ms")] + pub heartbeat_interval_ms: u64, + /// Lower election timeout bound passed to OpenRaft. + #[serde(default = "default_election_timeout_min_ms")] + pub election_timeout_min_ms: u64, + /// Upper election timeout bound passed to OpenRaft. + #[serde(default = "default_election_timeout_max_ms")] + pub election_timeout_max_ms: u64, +} + +impl OpenRaftKvNodeConfig { + /// Loads one node config from YAML on disk. + pub fn load(path: &Path) -> anyhow::Result { + let raw = fs::read_to_string(path)?; + Ok(serde_yaml::from_str(&raw)?) + } +} + +const fn default_heartbeat_interval_ms() -> u64 { + 500 +} + +const fn default_election_timeout_min_ms() -> u64 { + 1_500 +} + +const fn default_election_timeout_max_ms() -> u64 { + 3_000 +} diff --git a/examples/openraft_kv/openraft-kv-node/src/lib.rs b/examples/openraft_kv/openraft-kv-node/src/lib.rs new file mode 100644 index 0000000..b039d7f --- /dev/null +++ b/examples/openraft_kv/openraft-kv-node/src/lib.rs @@ -0,0 +1,25 @@ +//! OpenRaft-backed key-value node used by the `examples-simple-clusters` +//! branch. + +/// HTTP client for interacting with one OpenRaft node. +pub mod client; +/// YAML node configuration used by TF and the node binary. +pub mod config; +mod network; +/// Axum server bootstrap and request handlers for one node process. +pub mod server; +/// Shared request, response, and state payload types. +pub mod types; + +/// Re-export of the node HTTP client. +pub use client::OpenRaftKvClient; +/// Re-export of the node YAML config type. +pub use config::OpenRaftKvNodeConfig; +/// Re-export of the public request and state payloads. +pub use types::{ + AddLearnerRequest, ChangeMembershipRequest, OpenRaftKvReadRequest, OpenRaftKvReadResponse, + OpenRaftKvState, OpenRaftKvWriteRequest, OpenRaftKvWriteResponse, +}; + +/// OpenRaft type configuration shared by the in-memory log and state machine. +pub type TypeConfig = openraft_memstore::TypeConfig; diff --git a/examples/openraft_kv/openraft-kv-node/src/main.rs b/examples/openraft_kv/openraft-kv-node/src/main.rs new file mode 100644 index 0000000..d07a076 --- /dev/null +++ b/examples/openraft_kv/openraft-kv-node/src/main.rs @@ -0,0 +1,24 @@ +use std::path::PathBuf; + +use clap::Parser; +use openraft_kv_node::{config::OpenRaftKvNodeConfig, server::run_server}; +use tracing_subscriber::EnvFilter; + +#[derive(Parser, Clone, Debug)] +#[command(author, version, about)] +struct Opt { + #[arg(long)] + config: PathBuf, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .with_ansi(false) + .init(); + + let options = Opt::parse(); + let config = OpenRaftKvNodeConfig::load(&options.config)?; + run_server(config).await +} diff --git a/examples/openraft_kv/openraft-kv-node/src/network.rs b/examples/openraft_kv/openraft-kv-node/src/network.rs new file mode 100644 index 0000000..47ed7a1 --- /dev/null +++ b/examples/openraft_kv/openraft-kv-node/src/network.rs @@ -0,0 +1,158 @@ +//! HTTP transport used by OpenRaft to replicate between example nodes. + +use std::{collections::BTreeMap, sync::Arc}; + +use openraft::{ + RaftNetworkFactory, RaftNetworkV2, + alias::{SnapshotOf, VoteOf}, + errors::{RPCError, StreamingError, Unreachable}, + network::RPCOption, +}; +use reqwest::Url; +use tokio::sync::RwLock; + +use crate::{ + TypeConfig, + types::{InstallFullSnapshotBody, SnapshotRpcResult}, +}; + +/// Shared node-address book used by Raft RPC clients. +#[derive(Clone, Default)] +pub struct HttpNetworkFactory { + client: reqwest::Client, + known_nodes: Arc>>, +} + +/// Per-target HTTP client used for Raft replication traffic. +pub struct HttpNetworkClient { + client: reqwest::Client, + target: u64, + target_addr: Option, +} + +impl HttpNetworkFactory { + /// Creates a network factory backed by one shared node-address map. + #[must_use] + pub fn new(known_nodes: Arc>>) -> Self { + Self { + client: reqwest::Client::new(), + known_nodes, + } + } +} + +impl RaftNetworkFactory for HttpNetworkFactory { + type Network = HttpNetworkClient; + + async fn new_client(&mut self, target: u64, _node: &()) -> Self::Network { + let target_addr = self.known_nodes.read().await.get(&target).cloned(); + + HttpNetworkClient { + client: self.client.clone(), + target, + target_addr, + } + } +} + +impl RaftNetworkV2 for HttpNetworkClient { + async fn append_entries( + &mut self, + rpc: openraft::raft::AppendEntriesRequest, + _option: RPCOption, + ) -> Result, RPCError> { + self.post_rpc("raft/append", &rpc).await + } + + async fn vote( + &mut self, + rpc: openraft::raft::VoteRequest, + _option: RPCOption, + ) -> Result, RPCError> { + self.post_rpc("raft/vote", &rpc).await + } + + async fn full_snapshot( + &mut self, + vote: VoteOf, + snapshot: SnapshotOf, + _cancel: impl std::future::Future + + openraft::OptionalSend + + 'static, + _option: RPCOption, + ) -> Result, StreamingError> { + let body = InstallFullSnapshotBody { + vote, + meta: snapshot.meta, + data: snapshot.snapshot.into_inner(), + }; + + self.post_snapshot("raft/snapshot", &body).await + } +} + +impl HttpNetworkClient { + async fn post_rpc(&self, path: &str, body: &B) -> Result> + where + B: serde::Serialize, + T: serde::de::DeserializeOwned, + { + let url = self.endpoint_url(path)?; + let response = self + .client + .post(url) + .json(body) + .send() + .await + .map_err(|err| RPCError::Unreachable(Unreachable::new(&err)))? + .error_for_status() + .map_err(|err| RPCError::Unreachable(Unreachable::new(&err)))?; + + let result: Result = response + .json() + .await + .map_err(|err| RPCError::Unreachable(Unreachable::new(&err)))?; + + result.map_err(|err| RPCError::Unreachable(Unreachable::from_string(err))) + } + + async fn post_snapshot( + &self, + path: &str, + body: &InstallFullSnapshotBody, + ) -> Result, StreamingError> { + let url = self + .endpoint_url(path) + .map_err(|err| StreamingError::Unreachable(Unreachable::new(&err)))?; + let response = self + .client + .post(url) + .json(body) + .send() + .await + .map_err(|err| StreamingError::Unreachable(Unreachable::new(&err)))? + .error_for_status() + .map_err(|err| StreamingError::Unreachable(Unreachable::new(&err)))?; + + let result: SnapshotRpcResult = response + .json() + .await + .map_err(|err| StreamingError::Unreachable(Unreachable::new(&err)))?; + + result.map_err(|err| StreamingError::Unreachable(Unreachable::from_string(err))) + } + + fn endpoint_url(&self, path: &str) -> Result> { + let Some(addr) = &self.target_addr else { + return Err(Unreachable::from_string(format!( + "target {} has no known address", + self.target + ))); + }; + + let mut url = + Url::parse(&format!("http://{addr}/")).map_err(|err| Unreachable::new(&err))?; + url.set_path(path); + Ok(url) + } +} diff --git a/examples/openraft_kv/openraft-kv-node/src/server.rs b/examples/openraft_kv/openraft-kv-node/src/server.rs new file mode 100644 index 0000000..5dd374d --- /dev/null +++ b/examples/openraft_kv/openraft-kv-node/src/server.rs @@ -0,0 +1,276 @@ +//! Axum server that exposes the OpenRaft example node and its admin endpoints. + +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::Arc, +}; + +use axum::{ + Json, Router, + extract::State, + http::StatusCode, + routing::{get, post}, +}; +use openraft::{Config, Raft, SnapshotPolicy, type_config::async_runtime::WatchReceiver}; +use openraft_memstore::{ClientRequest, MemLogStore, MemStateMachine, new_mem_store}; +use tokio::sync::RwLock; +use tower_http::trace::TraceLayer; +use tracing::info; + +use crate::{ + TypeConfig, + config::OpenRaftKvNodeConfig, + network::HttpNetworkFactory, + types::{ + AddLearnerRequest, AppendRpcResult, ChangeMembershipRequest, InitResult, + InstallSnapshotBody, MetricsResult, OpenRaftKvReadRequest, OpenRaftKvReadResponse, + OpenRaftKvState, OpenRaftKvWriteRequest, OpenRaftKvWriteResponse, SnapshotRpcResult, + VoteRpcResult, + }, +}; + +type KnownNodes = Arc>>; + +/// Shared state used by the HTTP handlers exposed by one node. +#[derive(Clone)] +pub struct AppState { + config: OpenRaftKvNodeConfig, + raft: Raft>, + state_machine: Arc, + known_nodes: KnownNodes, +} + +impl AppState { + /// Builds the application state for one node process. + pub fn new( + config: OpenRaftKvNodeConfig, + raft: Raft>, + state_machine: Arc, + known_nodes: KnownNodes, + ) -> Self { + Self { + config, + raft, + state_machine, + known_nodes, + } + } +} + +/// Starts one OpenRaft-backed HTTP node. +pub async fn run_server(config: OpenRaftKvNodeConfig) -> anyhow::Result<()> { + let raft_config = Arc::new( + Config { + cluster_name: "openraft-kv".to_owned(), + heartbeat_interval: config.heartbeat_interval_ms, + election_timeout_min: config.election_timeout_min_ms, + election_timeout_max: config.election_timeout_max_ms, + snapshot_policy: SnapshotPolicy::Never, + ..Default::default() + } + .validate()?, + ); + + let known_nodes = Arc::new(RwLock::new(known_nodes(&config))); + + let (log_store, state_machine): (Arc, Arc) = new_mem_store(); + let network = HttpNetworkFactory::new(known_nodes.clone()); + + let raft = Raft::new( + config.node_id, + raft_config, + network, + log_store, + state_machine.clone(), + ) + .await?; + + let app_state = AppState::new(config.clone(), raft, state_machine, known_nodes); + let app = router(app_state); + let address = std::net::SocketAddr::from(([0, 0, 0, 0], config.http_port)); + + info!( + node_id = config.node_id, + public_addr = %config.public_addr, + peers = ?config.peer_addrs, + %address, + "starting openraft kv node" + ); + + let listener = tokio::net::TcpListener::bind(address).await?; + axum::serve(listener, app).await?; + Ok(()) +} + +fn router(app_state: AppState) -> Router { + let app_routes = Router::new() + .route("/healthz", get(healthz)) + .route("/state", get(cluster_state)) + .route("/kv/write", post(write)) + .route("/kv/read", post(read)); + + let admin_routes = Router::new() + .route("/admin/init", post(init)) + .route("/admin/add-learner", post(add_learner)) + .route("/admin/change-membership", post(change_membership)) + .route("/admin/metrics", get(metrics)); + + let raft_routes = Router::new() + .route("/raft/vote", post(vote)) + .route("/raft/append", post(append)) + .route("/raft/snapshot", post(snapshot)); + + app_routes + .merge(admin_routes) + .merge(raft_routes) + .layer(TraceLayer::new_for_http()) + .with_state(app_state) +} + +async fn healthz() -> &'static str { + "ok" +} + +async fn cluster_state(State(app): State) -> Result, StatusCode> { + let metrics = app.raft.metrics().borrow_watched().clone(); + + let sm = app.state_machine.get_state_machine().await; + + let voters = metrics + .membership_config + .membership() + .voter_ids() + .collect::>(); + + let kv = sm.client_status.into_iter().collect::>(); + + Ok(Json(OpenRaftKvState { + node_id: app.config.node_id, + public_addr: app.config.public_addr.clone(), + role: format!("{:?}", metrics.state), + current_leader: metrics.current_leader, + current_term: metrics.current_term, + last_log_index: metrics.last_log_index, + last_applied_index: metrics.last_applied.as_ref().map(|log_id| log_id.index()), + voters, + kv, + })) +} + +async fn metrics(State(app): State) -> Json { + Json(Ok(app.raft.metrics().borrow_watched().clone())) +} + +async fn init(State(app): State) -> Json { + let members = BTreeSet::from([app.config.node_id]); + + Json( + app.raft + .initialize(members) + .await + .map_err(|err| err.to_string()), + ) +} + +async fn add_learner( + State(app): State, + Json(request): Json, +) -> Json { + let mut known_nodes = app.known_nodes.write().await; + known_nodes.insert(request.node_id, request.addr.clone()); + drop(known_nodes); + + Json( + app.raft + .add_learner(request.node_id, (), true) + .await + .map(|_| ()) + .map_err(|err| err.to_string()), + ) +} + +async fn change_membership( + State(app): State, + Json(request): Json, +) -> Json { + Json( + app.raft + .change_membership(request.voters.into_iter().collect::>(), false) + .await + .map(|_| ()) + .map_err(|err| err.to_string()), + ) +} + +async fn write( + State(app): State, + Json(request): Json, +) -> Json> { + let result = app + .raft + .client_write(ClientRequest { + client: request.key, + serial: request.serial, + status: request.value, + }) + .await + .map(|response| OpenRaftKvWriteResponse { + previous: response.response().0.clone(), + }) + .map_err(|err| err.to_string()); + + Json(result) +} + +async fn read( + State(app): State, + Json(request): Json, +) -> Json> { + let sm = app.state_machine.get_state_machine().await; + + Json(Ok(OpenRaftKvReadResponse { + value: sm.client_status.get(&request.key).cloned(), + })) +} + +async fn vote( + State(app): State, + Json(request): Json>, +) -> Json { + Json(app.raft.vote(request).await.map_err(|err| err.to_string())) +} + +async fn append( + State(app): State, + Json(request): Json>, +) -> Json { + Json( + app.raft + .append_entries(request) + .await + .map_err(|err| err.to_string()), + ) +} + +async fn snapshot( + State(app): State, + Json(request): Json, +) -> Json { + let snapshot = openraft::alias::SnapshotOf:: { + meta: request.meta, + snapshot: std::io::Cursor::new(request.data), + }; + + Json( + app.raft + .install_full_snapshot(request.vote, snapshot) + .await + .map_err(|err| err.to_string()), + ) +} + +fn known_nodes(config: &OpenRaftKvNodeConfig) -> BTreeMap { + let mut known_nodes = config.peer_addrs.clone(); + known_nodes.insert(config.node_id, config.public_addr.clone()); + known_nodes +} diff --git a/examples/openraft_kv/openraft-kv-node/src/types.rs b/examples/openraft_kv/openraft-kv-node/src/types.rs new file mode 100644 index 0000000..7272f3c --- /dev/null +++ b/examples/openraft_kv/openraft-kv-node/src/types.rs @@ -0,0 +1,112 @@ +use std::collections::BTreeMap; + +use openraft::{ + RaftMetrics, + alias::{SnapshotMetaOf, VoteOf}, + raft::InstallSnapshotRequest, +}; +use serde::{Deserialize, Serialize}; + +use crate::TypeConfig; + +/// Result shape used by the simple admin endpoints in this example. +pub type OpenRaftResult = Result; + +/// Request body for a replicated write submitted through the leader. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct OpenRaftKvWriteRequest { + /// Application key to write. + pub key: String, + /// Value stored for the key. + pub value: String, + /// Client-side serial used by OpenRaft's example state machine. + pub serial: u64, +} + +/// Response body returned after a replicated write is committed. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct OpenRaftKvWriteResponse { + /// Previous value stored under the key, if any. + pub previous: Option, +} + +/// Request body for a key lookup. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct OpenRaftKvReadRequest { + /// Application key to look up. + pub key: String, +} + +/// Response body returned by a key lookup. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct OpenRaftKvReadResponse { + /// Current value stored under the key, if any. + pub value: Option, +} + +/// Admin request used to register a learner in the current cluster. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct AddLearnerRequest { + /// OpenRaft node identifier for the learner. + pub node_id: u64, + /// Advertised Raft address for the learner. + pub addr: String, +} + +/// Admin request used to promote the cluster to a concrete voter set. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ChangeMembershipRequest { + /// Full voter set that should own the cluster after the change. + pub voters: Vec, +} + +/// Snapshot of one node's externally visible Raft and application state. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct OpenRaftKvState { + /// Stable OpenRaft node identifier. + pub node_id: u64, + /// Advertised Raft address for this node. + pub public_addr: String, + /// Current OpenRaft role rendered as text. + pub role: String, + /// Leader known by this node, if any. + pub current_leader: Option, + /// Current term reported by this node. + pub current_term: u64, + /// Highest log index stored locally. + pub last_log_index: Option, + /// Highest log index applied to the state machine. + pub last_applied_index: Option, + /// Current voter set reported by this node. + pub voters: Vec, + /// Application state machine contents. + pub kv: BTreeMap, +} + +/// JSON representation used for full-snapshot replication over HTTP. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct InstallFullSnapshotBody { + /// Vote bundled with the snapshot transfer. + pub vote: VoteOf, + /// Snapshot metadata describing the transferred state. + pub meta: SnapshotMetaOf, + /// Serialized state machine bytes. + pub data: Vec, +} + +/// Serialized result of a vote RPC. +pub type VoteRpcResult = Result, String>; +/// Serialized result of an append-entries RPC. +pub type AppendRpcResult = Result, String>; +/// Serialized result of a full-snapshot RPC. +pub type SnapshotRpcResult = Result, String>; +/// JSON payload returned by the metrics endpoint. +pub type MetricsResult = Result, String>; +/// JSON payload returned by `/admin/init`. +pub type InitResult = Result<(), String>; +/// JSON payload returned by `/admin/add-learner`. +pub type AddLearnerResult = Result<(), String>; +/// JSON payload returned by `/admin/change-membership`. +pub type ChangeMembershipResult = Result<(), String>; +/// Request type accepted by the snapshot endpoint. +pub type InstallSnapshotBody = InstallSnapshotRequest; diff --git a/examples/scheduler/testing/integration/Cargo.toml b/examples/openraft_kv/testing/integration/Cargo.toml similarity index 54% rename from examples/scheduler/testing/integration/Cargo.toml rename to examples/openraft_kv/testing/integration/Cargo.toml index 819b60b..d5b90e1 100644 --- a/examples/scheduler/testing/integration/Cargo.toml +++ b/examples/openraft_kv/testing/integration/Cargo.toml @@ -1,13 +1,13 @@ [package] edition.workspace = true license.workspace = true -name = "scheduler-runtime-ext" +name = "openraft-kv-runtime-ext" version.workspace = true [dependencies] -async-trait = { workspace = true } -scheduler-node = { path = "../../scheduler-node" } -serde = { workspace = true } +openraft-kv-node = { path = "../../openraft-kv-node" } +reqwest = { workspace = true } testing-framework-core = { workspace = true } testing-framework-runner-compose = { workspace = true } +testing-framework-runner-k8s = { workspace = true } testing-framework-runner-local = { workspace = true } diff --git a/examples/openraft_kv/testing/integration/src/app.rs b/examples/openraft_kv/testing/integration/src/app.rs new file mode 100644 index 0000000..bc2c7f1 --- /dev/null +++ b/examples/openraft_kv/testing/integration/src/app.rs @@ -0,0 +1,59 @@ +use std::io::Error; + +use openraft_kv_node::{OpenRaftKvClient, OpenRaftKvNodeConfig}; +use testing_framework_core::scenario::{ + Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView, DynError, + NodeAccess, serialize_cluster_yaml_config, +}; + +/// Three-node topology used by the OpenRaft example scenarios. +pub type OpenRaftKvTopology = testing_framework_core::topology::ClusterTopology; + +/// Application environment wiring for the OpenRaft-backed key-value example. +pub struct OpenRaftKvEnv; + +impl Application for OpenRaftKvEnv { + type Deployment = OpenRaftKvTopology; + type NodeClient = OpenRaftKvClient; + type NodeConfig = OpenRaftKvNodeConfig; + + fn build_node_client(access: &NodeAccess) -> Result { + Ok(OpenRaftKvClient::new(access.api_base_url()?)) + } + + fn node_readiness_path() -> &'static str { + "/healthz" + } +} + +impl ClusterNodeConfigApplication for OpenRaftKvEnv { + type ConfigError = Error; + + fn static_network_port() -> u16 { + 8080 + } + + fn build_cluster_node_config( + node: &ClusterNodeView, + peers: &[ClusterPeerView], + ) -> Result { + Ok(OpenRaftKvNodeConfig { + node_id: node.index() as u64, + http_port: node.network_port(), + public_addr: node.authority(), + peer_addrs: peers + .iter() + .map(|peer| (peer.index() as u64, peer.authority())) + .collect(), + heartbeat_interval_ms: 500, + election_timeout_min_ms: 1_500, + election_timeout_max_ms: 3_000, + }) + } + + fn serialize_cluster_node_config( + config: &Self::NodeConfig, + ) -> Result { + serialize_cluster_yaml_config(config).map_err(Error::other) + } +} diff --git a/examples/openraft_kv/testing/integration/src/compose_env.rs b/examples/openraft_kv/testing/integration/src/compose_env.rs new file mode 100644 index 0000000..1120aa5 --- /dev/null +++ b/examples/openraft_kv/testing/integration/src/compose_env.rs @@ -0,0 +1,112 @@ +use std::{fs, path::Path}; + +use testing_framework_core::{ + cfgsync::StaticNodeConfigProvider, + scenario::{Application, DynError}, + topology::DeploymentDescriptor, +}; +use testing_framework_runner_compose::{ + BinaryConfigNodeSpec, ComposeDeployEnv, ComposeDescriptor, NodeDescriptor, + binary_config_node_runtime_spec, node_identifier, +}; + +use crate::OpenRaftKvEnv; + +const NODE_CONFIG_PATH: &str = "/etc/openraft-kv/config.yaml"; +const COMPOSE_HTTP_PORT_BASE: u16 = 47_080; + +fn compose_node_spec() -> BinaryConfigNodeSpec { + BinaryConfigNodeSpec::conventional( + "/usr/local/bin/openraft-kv-node", + NODE_CONFIG_PATH, + vec![8080], + ) +} + +fn fixed_loopback_port_binding(host_port: u16, container_port: u16) -> String { + format!("127.0.0.1:{host_port}:{container_port}") +} + +impl ComposeDeployEnv for OpenRaftKvEnv { + fn prepare_compose_configs( + path: &Path, + topology: &::Deployment, + _cfgsync_port: u16, + _metrics_otlp_ingest_url: Option<&reqwest::Url>, + ) -> Result<(), DynError> { + let hostnames = Self::cfgsync_hostnames(topology); + let stack_dir = path + .parent() + .ok_or_else(|| std::io::Error::other("compose config path has no parent"))?; + let configs_dir = stack_dir.join("configs"); + fs::create_dir_all(&configs_dir)?; + + for index in 0..topology.node_count() { + let mut config = Self::build_node_config(topology, index)?; + Self::rewrite_for_hostnames(topology, index, &hostnames, &mut config)?; + let rendered = Self::serialize_node_config(&config)?; + fs::write( + configs_dir.join(Self::static_node_config_file_name(index)), + rendered, + )?; + } + + Ok(()) + } + + fn static_node_config_file_name(index: usize) -> String { + format!("node-{index}.yaml") + } + + fn binary_config_node_spec( + _topology: &::Deployment, + _index: usize, + ) -> Result, DynError> { + Ok(Some(compose_node_spec())) + } + + fn compose_descriptor( + topology: &::Deployment, + _cfgsync_port: u16, + ) -> Result { + let spec = compose_node_spec(); + + let nodes = (0..topology.node_count()) + .map(|index| { + let runtime = binary_config_node_runtime_spec(index, &spec); + let file_name = Self::static_node_config_file_name(index); + + let host_port = COMPOSE_HTTP_PORT_BASE + index as u16; + let ports = compose_node_ports(host_port, &runtime.container_ports); + + NodeDescriptor::new( + node_identifier(index), + runtime.image, + runtime.entrypoint, + vec![format!( + "./stack/configs/{file_name}:{}:ro", + spec.config_container_path + )], + runtime.extra_hosts, + ports, + runtime.container_ports, + runtime.environment, + runtime.platform, + ) + }) + .collect(); + + Ok(ComposeDescriptor::new(nodes)) + } +} + +fn compose_node_ports(host_port: u16, container_ports: &[u16]) -> Vec { + container_ports + .iter() + .map(|port| { + // OpenRaft failover restarts the leader. Fixed host ports keep TF + // clients stable across `docker compose restart`. + fixed_loopback_port_binding(host_port, *port) + }) + .collect() +} diff --git a/examples/openraft_kv/testing/integration/src/k8s_env.rs b/examples/openraft_kv/testing/integration/src/k8s_env.rs new file mode 100644 index 0000000..8aa46df --- /dev/null +++ b/examples/openraft_kv/testing/integration/src/k8s_env.rs @@ -0,0 +1,21 @@ +use testing_framework_runner_k8s::{BinaryConfigK8sSpec, K8sBinaryApp}; + +use crate::OpenRaftKvEnv; + +const CONTAINER_CONFIG_PATH: &str = "/etc/openraft-kv/config.yaml"; +const CONTAINER_HTTP_PORT: u16 = 8080; +const SERVICE_TESTING_PORT: u16 = 8081; +const NODE_NAME_PREFIX: &str = "openraft-kv-node"; + +impl K8sBinaryApp for OpenRaftKvEnv { + fn k8s_binary_spec() -> BinaryConfigK8sSpec { + BinaryConfigK8sSpec::conventional( + "openraft-kv", + NODE_NAME_PREFIX, + "/usr/local/bin/openraft-kv-node", + CONTAINER_CONFIG_PATH, + CONTAINER_HTTP_PORT, + SERVICE_TESTING_PORT, + ) + } +} diff --git a/examples/openraft_kv/testing/integration/src/lib.rs b/examples/openraft_kv/testing/integration/src/lib.rs new file mode 100644 index 0000000..27ed847 --- /dev/null +++ b/examples/openraft_kv/testing/integration/src/lib.rs @@ -0,0 +1,16 @@ +mod app; +mod compose_env; +mod k8s_env; +mod local_env; +pub mod scenario; + +pub use app::*; +pub use scenario::{OpenRaftKvBuilderExt, OpenRaftKvScenarioBuilder}; + +/// Local process deployer for the OpenRaft example app. +pub type OpenRaftKvLocalDeployer = testing_framework_runner_local::ProcessDeployer; +/// Docker Compose deployer for the OpenRaft example app. +pub type OpenRaftKvComposeDeployer = + testing_framework_runner_compose::ComposeDeployer; +/// Kubernetes deployer for the OpenRaft example app. +pub type OpenRaftKvK8sDeployer = testing_framework_runner_k8s::K8sDeployer; diff --git a/examples/openraft_kv/testing/integration/src/local_env.rs b/examples/openraft_kv/testing/integration/src/local_env.rs new file mode 100644 index 0000000..2cf05fa --- /dev/null +++ b/examples/openraft_kv/testing/integration/src/local_env.rs @@ -0,0 +1,125 @@ +use std::collections::{BTreeMap, HashMap}; + +use openraft_kv_node::OpenRaftKvNodeConfig; +use testing_framework_core::{ + scenario::{DynError, StartNodeOptions}, + topology::DeploymentDescriptor, +}; +use testing_framework_runner_local::{ + BuiltNodeConfig, LocalDeployerEnv, LocalNodePorts, LocalProcessSpec, NodeConfigEntry, + reserve_local_node_ports, yaml_node_config, +}; + +use crate::OpenRaftKvEnv; + +impl LocalDeployerEnv for OpenRaftKvEnv { + fn build_node_config_from_template( + _topology: &Self::Deployment, + index: usize, + _peer_ports_by_name: &HashMap, + _options: &StartNodeOptions, + peer_ports: &[u16], + template_config: Option<&OpenRaftKvNodeConfig>, + ) -> Result, DynError> { + let mut reserved = reserve_local_node_ports(1, &[], "node") + .map_err(|source| -> DynError { source.into() })?; + + let ports = reserved + .pop() + .ok_or_else(|| std::io::Error::other("failed to reserve local node ports"))?; + + let mut config = template_config + .cloned() + .unwrap_or_else(|| local_node_config(index, ports.network_port(), BTreeMap::new())); + + // OpenRaft peer config is index-sensitive, so local restarts must rebuild + // the full peer map from the current reserved port set. + let network_port = ports.network_port(); + config.node_id = index as u64; + config.http_port = network_port; + config.public_addr = local_addr(network_port); + config.peer_addrs = peer_addrs_from_ports(peer_ports, index); + + Ok(BuiltNodeConfig { + config, + network_port, + }) + } + + fn build_initial_node_configs( + topology: &Self::Deployment, + ) -> Result< + Vec>, + testing_framework_runner_local::process::ProcessSpawnError, + > { + let reserved_ports = reserve_local_node_ports(topology.node_count(), &[], "node")?; + + let peer_ports = reserved_ports + .iter() + .map(LocalNodePorts::network_port) + .collect::>(); + + // Build every node from the same reserved port view so the initial + // cluster starts with a consistent peer list on all nodes. + Ok(reserved_ports + .iter() + .enumerate() + .map(|(index, ports)| NodeConfigEntry { + name: format!("node-{index}"), + config: local_node_config( + index, + ports.network_port(), + peer_addrs_from_ports(&peer_ports, index), + ), + }) + .collect()) + } + + fn initial_node_name_prefix() -> &'static str { + "node" + } + + fn local_process_spec() -> Option { + Some( + LocalProcessSpec::new("OPENRAFT_KV_NODE_BIN", "openraft-kv-node").with_rust_log("info"), + ) + } + + fn render_local_config(config: &OpenRaftKvNodeConfig) -> Result, DynError> { + yaml_node_config(config) + } + + fn http_api_port(config: &OpenRaftKvNodeConfig) -> Option { + Some(config.http_port) + } +} + +fn local_node_config( + index: usize, + network_port: u16, + peer_addrs: BTreeMap, +) -> OpenRaftKvNodeConfig { + OpenRaftKvNodeConfig { + node_id: index as u64, + http_port: network_port, + public_addr: local_addr(network_port), + peer_addrs, + + heartbeat_interval_ms: 500, + election_timeout_min_ms: 1_500, + election_timeout_max_ms: 3_000, + } +} + +fn peer_addrs_from_ports(peer_ports: &[u16], local_index: usize) -> BTreeMap { + peer_ports + .iter() + .enumerate() + .filter(|(peer_index, _)| *peer_index != local_index) + .map(|(peer_index, peer_port)| (peer_index as u64, local_addr(*peer_port))) + .collect() +} + +fn local_addr(port: u16) -> String { + format!("127.0.0.1:{port}") +} diff --git a/examples/openraft_kv/testing/integration/src/scenario.rs b/examples/openraft_kv/testing/integration/src/scenario.rs new file mode 100644 index 0000000..eca27b7 --- /dev/null +++ b/examples/openraft_kv/testing/integration/src/scenario.rs @@ -0,0 +1,19 @@ +use testing_framework_core::scenario::ScenarioBuilder; + +use crate::{OpenRaftKvEnv, OpenRaftKvTopology}; + +/// Scenario builder alias used by the OpenRaft example binaries. +pub type OpenRaftKvScenarioBuilder = ScenarioBuilder; + +/// Convenience helpers for constructing the fixed three-node OpenRaft topology. +pub trait OpenRaftKvBuilderExt: Sized { + /// Starts from the default three-node deployment and lets callers adjust + /// it. + fn deployment_with(f: impl FnOnce(OpenRaftKvTopology) -> OpenRaftKvTopology) -> Self; +} + +impl OpenRaftKvBuilderExt for OpenRaftKvScenarioBuilder { + fn deployment_with(f: impl FnOnce(OpenRaftKvTopology) -> OpenRaftKvTopology) -> Self { + OpenRaftKvScenarioBuilder::with_deployment(f(OpenRaftKvTopology::new(3))) + } +} diff --git a/examples/openraft_kv/testing/workloads/Cargo.toml b/examples/openraft_kv/testing/workloads/Cargo.toml new file mode 100644 index 0000000..0682b55 --- /dev/null +++ b/examples/openraft_kv/testing/workloads/Cargo.toml @@ -0,0 +1,15 @@ +[package] +edition.workspace = true +license.workspace = true +name = "openraft-kv-runtime-workloads" +version.workspace = true + +[dependencies] +anyhow = "1.0" +async-trait = { workspace = true } +openraft-kv-node = { path = "../../openraft-kv-node" } +openraft-kv-runtime-ext = { path = "../integration" } +testing-framework-core = { workspace = true } +thiserror = "2.0" +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } diff --git a/examples/openraft_kv/testing/workloads/src/convergence.rs b/examples/openraft_kv/testing/workloads/src/convergence.rs new file mode 100644 index 0000000..9e88870 --- /dev/null +++ b/examples/openraft_kv/testing/workloads/src/convergence.rs @@ -0,0 +1,58 @@ +use std::time::Duration; + +use async_trait::async_trait; +use openraft_kv_runtime_ext::OpenRaftKvEnv; +use testing_framework_core::scenario::{DynError, Expectation, RunContext}; + +use crate::support::{expected_kv, wait_for_replication}; + +/// Expectation that waits for the full voter set and the writes from this run +/// to converge on every node. +#[derive(Clone)] +pub struct OpenRaftKvConverges { + total_writes: usize, + timeout: Duration, + key_prefix: String, +} + +impl OpenRaftKvConverges { + /// Creates a convergence check for the given number of replicated writes. + #[must_use] + pub fn new(total_writes: usize) -> Self { + Self { + total_writes, + timeout: Duration::from_secs(30), + key_prefix: "raft-key".to_owned(), + } + } + + /// Overrides the key prefix used to derive expected writes. + #[must_use] + pub fn key_prefix(mut self, value: &str) -> Self { + self.key_prefix = value.to_owned(); + self + } + + /// Overrides the convergence timeout. + #[must_use] + pub const fn timeout(mut self, value: Duration) -> Self { + self.timeout = value; + self + } +} + +#[async_trait] +impl Expectation for OpenRaftKvConverges { + fn name(&self) -> &str { + "openraft_kv_converges" + } + + async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> { + let expected = expected_kv(&self.key_prefix, self.total_writes); + let clients = ctx.node_clients().snapshot(); + + wait_for_replication(&clients, &expected, self.timeout).await?; + + Ok(()) + } +} diff --git a/examples/openraft_kv/testing/workloads/src/failover.rs b/examples/openraft_kv/testing/workloads/src/failover.rs new file mode 100644 index 0000000..89482c7 --- /dev/null +++ b/examples/openraft_kv/testing/workloads/src/failover.rs @@ -0,0 +1,201 @@ +use std::time::Duration; + +use async_trait::async_trait; +use openraft_kv_node::OpenRaftKvClient; +use openraft_kv_runtime_ext::OpenRaftKvEnv; +use testing_framework_core::scenario::{DynError, RunContext, Workload}; +use tracing::info; + +use crate::support::{ + OpenRaftMembership, ensure_cluster_size, resolve_client_for_node, wait_for_leader, + wait_for_membership, write_batch, +}; + +/// Workload that bootstraps the cluster, expands it to three voters, writes one +/// batch, restarts the leader, then writes a second batch through the new +/// leader. +#[derive(Clone)] +pub struct OpenRaftKvFailoverWorkload { + first_batch: usize, + second_batch: usize, + timeout: Duration, + key_prefix: String, +} + +impl OpenRaftKvFailoverWorkload { + /// Creates the default failover workload configuration. + #[must_use] + pub fn new() -> Self { + Self { + first_batch: 8, + second_batch: 8, + timeout: Duration::from_secs(30), + key_prefix: "raft-key".to_owned(), + } + } + + /// Sets the number of writes issued before the leader restart. + #[must_use] + pub const fn first_batch(mut self, value: usize) -> Self { + self.first_batch = value; + self + } + + /// Sets the number of writes issued after the leader restart. + #[must_use] + pub const fn second_batch(mut self, value: usize) -> Self { + self.second_batch = value; + self + } + + /// Overrides the key prefix used for generated writes. + #[must_use] + pub fn key_prefix(mut self, value: &str) -> Self { + self.key_prefix = value.to_owned(); + self + } + + /// Overrides the timeout used for leader and membership transitions. + #[must_use] + pub const fn timeout(mut self, value: Duration) -> Self { + self.timeout = value; + self + } +} + +impl Default for OpenRaftKvFailoverWorkload { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl Workload for OpenRaftKvFailoverWorkload { + fn name(&self) -> &str { + "openraft_kv_failover_workload" + } + + async fn start(&self, ctx: &RunContext) -> Result<(), DynError> { + let clients = ctx.node_clients().snapshot(); + ensure_cluster_size(&clients, 3)?; + + self.bootstrap_cluster(&clients).await?; + + let initial_leader = wait_for_leader(&clients, self.timeout, None).await?; + let membership = OpenRaftMembership::discover(&clients).await?; + + self.promote_cluster(&clients, initial_leader, &membership) + .await?; + self.write_initial_batch(&clients, initial_leader).await?; + + let new_leader = self + .restart_leader_and_wait_for_failover(ctx, &clients, initial_leader) + .await?; + self.write_second_batch(&clients, new_leader).await?; + + Ok(()) + } +} + +impl OpenRaftKvFailoverWorkload { + async fn bootstrap_cluster(&self, clients: &[OpenRaftKvClient]) -> Result<(), DynError> { + info!("initializing openraft cluster"); + + clients[0].init_self().await?; + + Ok(()) + } + + async fn promote_cluster( + &self, + clients: &[OpenRaftKvClient], + leader_id: u64, + membership: &OpenRaftMembership, + ) -> Result<(), DynError> { + let leader = resolve_client_for_node(clients, leader_id, self.timeout).await?; + + for learner in membership.learner_targets(leader_id) { + info!( + target = learner.node_id, + addr = %learner.public_addr, + "adding learner" + ); + + leader + .add_learner(learner.node_id, &learner.public_addr) + .await?; + } + + let voter_ids = membership.voter_ids(); + leader.change_membership(voter_ids.iter().copied()).await?; + + wait_for_membership(clients, &voter_ids, self.timeout).await?; + + Ok(()) + } + + async fn write_initial_batch( + &self, + clients: &[OpenRaftKvClient], + leader_id: u64, + ) -> Result<(), DynError> { + info!( + leader = leader_id, + writes = self.first_batch, + "writing initial batch" + ); + + let leader = resolve_client_for_node(clients, leader_id, self.timeout).await?; + write_batch(&leader, &self.key_prefix, 0, self.first_batch).await?; + + Ok(()) + } + + async fn restart_leader_and_wait_for_failover( + &self, + ctx: &RunContext, + clients: &[OpenRaftKvClient], + leader_id: u64, + ) -> Result { + let Some(control) = ctx.node_control() else { + return Err("openraft failover workload requires node control".into()); + }; + + let leader_name = format!("node-{leader_id}"); + info!(%leader_name, "restarting current leader"); + + control.restart_node(&leader_name).await?; + + let new_leader = wait_for_leader(clients, self.timeout, Some(leader_id)).await?; + + info!( + old_leader = leader_id, + new_leader, "leader changed after restart" + ); + + Ok(new_leader) + } + + async fn write_second_batch( + &self, + clients: &[OpenRaftKvClient], + leader_id: u64, + ) -> Result<(), DynError> { + info!( + leader = leader_id, + writes = self.second_batch, + "writing second batch" + ); + + let leader = resolve_client_for_node(clients, leader_id, self.timeout).await?; + write_batch( + &leader, + &self.key_prefix, + self.first_batch, + self.second_batch, + ) + .await?; + + Ok(()) + } +} diff --git a/examples/openraft_kv/testing/workloads/src/lib.rs b/examples/openraft_kv/testing/workloads/src/lib.rs new file mode 100644 index 0000000..f55a906 --- /dev/null +++ b/examples/openraft_kv/testing/workloads/src/lib.rs @@ -0,0 +1,14 @@ +mod convergence; +mod failover; +mod support; + +/// Replication expectation used by the OpenRaft example binaries. +pub use convergence::OpenRaftKvConverges; +/// Failover workload used by the OpenRaft example binaries. +pub use failover::OpenRaftKvFailoverWorkload; +/// Shared cluster helpers used by the OpenRaft workload and manual k8s example. +pub use support::{ + FULL_VOTER_SET, OpenRaftClusterError, OpenRaftMembership, ensure_cluster_size, expected_kv, + resolve_client_for_node, wait_for_leader, wait_for_membership, wait_for_replication, + write_batch, +}; diff --git a/examples/openraft_kv/testing/workloads/src/support.rs b/examples/openraft_kv/testing/workloads/src/support.rs new file mode 100644 index 0000000..489575a --- /dev/null +++ b/examples/openraft_kv/testing/workloads/src/support.rs @@ -0,0 +1,325 @@ +use std::{ + collections::{BTreeMap, BTreeSet}, + time::Duration, +}; + +use openraft_kv_node::{OpenRaftKvClient, OpenRaftKvState}; +use thiserror::Error; +use tokio::time::{Instant, sleep}; + +const POLL_INTERVAL: Duration = Duration::from_millis(250); +const CLIENT_RESOLUTION_INTERVAL: Duration = Duration::from_millis(200); + +/// Fixed voter set used by the example cluster. +pub const FULL_VOTER_SET: [u64; 3] = [0, 1, 2]; + +/// One learner candidate discovered from cluster state. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct LearnerTarget { + /// Node identifier used by OpenRaft membership. + pub node_id: u64, + /// Public address advertised for Raft traffic. + pub public_addr: String, +} + +/// Membership view captured from the current node states. +#[derive(Clone, Debug)] +pub struct OpenRaftMembership { + states: Vec, +} + +impl OpenRaftMembership { + /// Reads and sorts the current node states by id. + pub async fn discover(clients: &[OpenRaftKvClient]) -> Result { + let mut states = Vec::with_capacity(clients.len()); + + for client in clients { + states.push(client.state().await.map_err(OpenRaftClusterError::Client)?); + } + + states.sort_by_key(|state| state.node_id); + + Ok(Self { states }) + } + + /// Returns the full voter set implied by the discovered nodes. + #[must_use] + pub fn voter_ids(&self) -> BTreeSet { + self.states.iter().map(|state| state.node_id).collect() + } + + /// Returns every non-leader node as a learner target. + #[must_use] + pub fn learner_targets(&self, leader_id: u64) -> Vec { + self.states + .iter() + .filter(|state| state.node_id != leader_id) + .map(|state| LearnerTarget { + node_id: state.node_id, + public_addr: state.public_addr.clone(), + }) + .collect() + } +} + +/// One poll result across all known clients. +#[derive(Clone, Debug, Default)] +pub struct OpenRaftObservation { + states: Vec, + failures: Vec, +} + +impl OpenRaftObservation { + /// Captures one best-effort view of the cluster. + pub async fn capture(clients: &[OpenRaftKvClient]) -> Self { + let mut states = Vec::with_capacity(clients.len()); + let mut failures = Vec::new(); + + for (index, client) in clients.iter().enumerate() { + match client.state().await { + Ok(state) => states.push(state), + Err(error) => failures.push(format!("client_index={index} error={error}")), + } + } + + states.sort_by_key(|state| state.node_id); + + Self { states, failures } + } + + /// Returns the unique observed leader when all responding nodes agree. + #[must_use] + pub fn agreed_leader(&self, different_from: Option) -> Option { + let observed = self + .states + .iter() + .filter_map(|state| state.current_leader) + .collect::>(); + + let leader = observed.iter().next().copied()?; + + (observed.len() == 1 && different_from != Some(leader)).then_some(leader) + } + + /// Returns `true` when every responding node reports the expected voter + /// set. + #[must_use] + pub fn all_voters_match(&self, expected_voters: &BTreeSet) -> bool { + !self.states.is_empty() + && self.failures.is_empty() + && self.states.iter().all(|state| { + state.voters.iter().copied().collect::>() == *expected_voters + }) + } + + /// Returns `true` when every responding node exposes the expected key/value + /// data. + #[must_use] + pub fn all_kv_match(&self, expected: &BTreeMap) -> bool { + !self.states.is_empty() + && self.failures.is_empty() + && self.states.iter().all(|state| { + state.current_leader.is_some() + && state.voters == FULL_VOTER_SET + && expected + .iter() + .all(|(key, value)| state.kv.get(key) == Some(value)) + }) + } + + /// Returns a concise summary for timeout errors. + #[must_use] + pub fn summary(&self) -> String { + let mut lines = self + .states + .iter() + .map(|state| { + format!( + "node={} leader={:?} voters={:?} keys={}", + state.node_id, + state.current_leader, + state.voters, + state.kv.len() + ) + }) + .collect::>(); + + lines.extend(self.failures.iter().cloned()); + + if lines.is_empty() { + return "no state observed yet".to_owned(); + } + + lines.join("; ") + } +} + +/// Errors raised by the OpenRaft example cluster helpers. +#[derive(Debug, Error)] +pub enum OpenRaftClusterError { + #[error("openraft example requires at least {expected} node clients, got {actual}")] + InsufficientClients { expected: usize, actual: usize }, + #[error("failed to query openraft node state: {0}")] + Client(#[source] anyhow::Error), + #[error( + "timed out waiting for {action} after {timeout:?}; last observation: {last_observation}" + )] + Timeout { + action: &'static str, + timeout: Duration, + last_observation: String, + }, + #[error("timed out resolving node client for {node_id} after {timeout:?}")] + ClientResolution { node_id: u64, timeout: Duration }, +} + +/// Ensures the example cluster has the expected number of node clients. +pub fn ensure_cluster_size( + clients: &[OpenRaftKvClient], + expected: usize, +) -> Result<(), OpenRaftClusterError> { + if clients.len() < expected { + return Err(OpenRaftClusterError::InsufficientClients { + expected, + actual: clients.len(), + }); + } + + Ok(()) +} + +/// Waits until the cluster converges on one leader. +pub async fn wait_for_leader( + clients: &[OpenRaftKvClient], + timeout: Duration, + different_from: Option, +) -> Result { + let deadline = Instant::now() + timeout; + + loop { + let last_observation = OpenRaftObservation::capture(clients).await; + + if let Some(leader) = last_observation.agreed_leader(different_from) { + return Ok(leader); + } + + if Instant::now() >= deadline { + return Err(OpenRaftClusterError::Timeout { + action: "leader agreement", + timeout, + last_observation: last_observation.summary(), + }); + } + + sleep(POLL_INTERVAL).await; + } +} + +/// Waits until every node reports the expected voter set. +pub async fn wait_for_membership( + clients: &[OpenRaftKvClient], + expected_voters: &BTreeSet, + timeout: Duration, +) -> Result<(), OpenRaftClusterError> { + let deadline = Instant::now() + timeout; + + loop { + let last_observation = OpenRaftObservation::capture(clients).await; + + if last_observation.all_voters_match(expected_voters) { + return Ok(()); + } + + if Instant::now() >= deadline { + return Err(OpenRaftClusterError::Timeout { + action: "membership convergence", + timeout, + last_observation: last_observation.summary(), + }); + } + + sleep(POLL_INTERVAL).await; + } +} + +/// Waits until every node reports the full replicated key set. +pub async fn wait_for_replication( + clients: &[OpenRaftKvClient], + expected: &BTreeMap, + timeout: Duration, +) -> Result<(), OpenRaftClusterError> { + let deadline = Instant::now() + timeout; + + loop { + let last_observation = OpenRaftObservation::capture(clients).await; + + if last_observation.all_kv_match(expected) { + return Ok(()); + } + + if Instant::now() >= deadline { + return Err(OpenRaftClusterError::Timeout { + action: "replicated state convergence", + timeout, + last_observation: last_observation.summary(), + }); + } + + sleep(POLL_INTERVAL).await; + } +} + +/// Resolves the client handle that currently identifies as `node_id`. +pub async fn resolve_client_for_node( + clients: &[OpenRaftKvClient], + node_id: u64, + timeout: Duration, +) -> Result { + let deadline = Instant::now() + timeout; + + loop { + for client in clients { + let Ok(state) = client.state().await else { + continue; + }; + + if state.node_id == node_id { + return Ok(client.clone()); + } + } + + if Instant::now() >= deadline { + return Err(OpenRaftClusterError::ClientResolution { node_id, timeout }); + } + + sleep(CLIENT_RESOLUTION_INTERVAL).await; + } +} + +/// Issues a contiguous batch of writes through the current leader. +pub async fn write_batch( + leader: &OpenRaftKvClient, + prefix: &str, + start: usize, + count: usize, +) -> Result<(), OpenRaftClusterError> { + for index in start..(start + count) { + let key = format!("{prefix}-{index}"); + let value = format!("value-{index}"); + + leader + .write(&key, &value, index as u64 + 1) + .await + .map_err(OpenRaftClusterError::Client)?; + } + + Ok(()) +} + +/// Builds the replicated key/value map expected after the workload completes. +#[must_use] +pub fn expected_kv(prefix: &str, total_writes: usize) -> BTreeMap { + (0..total_writes) + .map(|index| (format!("{prefix}-{index}"), format!("value-{index}"))) + .collect() +} diff --git a/examples/scheduler/README.md b/examples/scheduler/README.md deleted file mode 100644 index 9a97079..0000000 --- a/examples/scheduler/README.md +++ /dev/null @@ -1,45 +0,0 @@ -# Scheduler Example - -This example runs a small replicated job scheduler with worker leases. - -The scenario enqueues jobs, lets one worker claim them, stops making progress, -and then checks that another worker can reclaim and complete them after the -lease expires. - -## How TF runs this - -Each example follows the same pattern: - -- TF starts a small deployment of scheduler nodes -- the workload drives the worker flow through the HTTP API -- the expectation checks that jobs are eventually reclaimed and completed - -## Scenario - -- `basic_failover` runs the failover flow locally -- `compose_failover` runs the same flow in Docker Compose - -## API - -Each node exposes: - -- `POST /jobs/enqueue` to add jobs -- `POST /jobs/claim` to claim pending jobs with a lease -- `POST /jobs/heartbeat` to extend a lease -- `POST /jobs/ack` to mark a job complete -- `GET /jobs/state` to inspect scheduler state -- `GET /internal/snapshot` to read the local replicated state - -## Run locally - -```bash -cargo run -p scheduler-examples --bin basic_failover -``` - -## Run with Docker Compose - -```bash -cargo run -p scheduler-examples --bin compose_failover -``` - -Set `SCHEDULER_IMAGE` to override the default compose image tag. diff --git a/examples/scheduler/examples/Cargo.toml b/examples/scheduler/examples/Cargo.toml deleted file mode 100644 index 32c3756..0000000 --- a/examples/scheduler/examples/Cargo.toml +++ /dev/null @@ -1,15 +0,0 @@ -[package] -edition.workspace = true -license.workspace = true -name = "scheduler-examples" -version.workspace = true - -[dependencies] -anyhow = "1.0" -scheduler-runtime-ext = { path = "../testing/integration" } -scheduler-runtime-workloads = { path = "../testing/workloads" } -testing-framework-core = { workspace = true } -testing-framework-runner-compose = { workspace = true } -tokio = { workspace = true, features = ["full"] } -tracing = { workspace = true } -tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/examples/scheduler/examples/src/bin/basic_failover.rs b/examples/scheduler/examples/src/bin/basic_failover.rs deleted file mode 100644 index f7c4577..0000000 --- a/examples/scheduler/examples/src/bin/basic_failover.rs +++ /dev/null @@ -1,33 +0,0 @@ -use std::time::Duration; - -use scheduler_runtime_ext::SchedulerLocalDeployer; -use scheduler_runtime_workloads::{ - SchedulerBuilderExt, SchedulerDrained, SchedulerLeaseFailoverWorkload, - SchedulerScenarioBuilder, SchedulerTopology, -}; -use testing_framework_core::scenario::Deployer; - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .init(); - - let jobs = 100; - - let mut scenario = SchedulerScenarioBuilder::deployment_with(|_| SchedulerTopology::new(3)) - .with_run_duration(Duration::from_secs(35)) - .with_workload( - SchedulerLeaseFailoverWorkload::new() - .operations(jobs) - .lease_ttl(Duration::from_secs(3)) - .rate_per_sec(25), - ) - .with_expectation(SchedulerDrained::new(jobs).timeout(Duration::from_secs(30))) - .build()?; - - let deployer = SchedulerLocalDeployer::default(); - let runner = deployer.deploy(&scenario).await?; - runner.run(&mut scenario).await?; - Ok(()) -} diff --git a/examples/scheduler/examples/src/bin/compose_failover.rs b/examples/scheduler/examples/src/bin/compose_failover.rs deleted file mode 100644 index 8bbdb01..0000000 --- a/examples/scheduler/examples/src/bin/compose_failover.rs +++ /dev/null @@ -1,49 +0,0 @@ -use std::time::Duration; - -use anyhow::{Context as _, Result}; -use scheduler_runtime_workloads::{ - SchedulerBuilderExt, SchedulerDrained, SchedulerLeaseFailoverWorkload, - SchedulerScenarioBuilder, SchedulerTopology, -}; -use testing_framework_core::scenario::Deployer; -use testing_framework_runner_compose::ComposeRunnerError; -use tracing::{info, warn}; - -#[tokio::main] -async fn main() -> Result<()> { - tracing_subscriber::fmt() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .init(); - - let jobs = 100; - - let mut scenario = SchedulerScenarioBuilder::deployment_with(|_| SchedulerTopology::new(3)) - .with_run_duration(Duration::from_secs(35)) - .with_workload( - SchedulerLeaseFailoverWorkload::new() - .operations(jobs) - .lease_ttl(Duration::from_secs(3)) - .rate_per_sec(20), - ) - .with_expectation(SchedulerDrained::new(jobs).timeout(Duration::from_secs(30))) - .build()?; - - let deployer = scheduler_runtime_ext::SchedulerComposeDeployer::new(); - let runner = match deployer.deploy(&scenario).await { - Ok(runner) => runner, - Err(ComposeRunnerError::DockerUnavailable) => { - warn!("docker unavailable; skipping scheduler compose run"); - return Ok(()); - } - Err(error) => { - return Err(anyhow::Error::new(error)).context("deploying scheduler compose stack"); - } - }; - - info!("running scheduler compose failover scenario"); - runner - .run(&mut scenario) - .await - .context("running scheduler compose scenario")?; - Ok(()) -} diff --git a/examples/scheduler/scheduler-node/src/client.rs b/examples/scheduler/scheduler-node/src/client.rs deleted file mode 100644 index 4c2e0ee..0000000 --- a/examples/scheduler/scheduler-node/src/client.rs +++ /dev/null @@ -1,40 +0,0 @@ -use reqwest::Url; -use serde::Serialize; - -#[derive(Clone)] -pub struct SchedulerHttpClient { - base_url: Url, - client: reqwest::Client, -} - -impl SchedulerHttpClient { - #[must_use] - pub fn new(base_url: Url) -> Self { - Self { - base_url, - client: reqwest::Client::new(), - } - } - - pub async fn get(&self, path: &str) -> anyhow::Result { - let url = self.base_url.join(path)?; - let response = self.client.get(url).send().await?.error_for_status()?; - Ok(response.json().await?) - } - - pub async fn post( - &self, - path: &str, - body: &B, - ) -> anyhow::Result { - let url = self.base_url.join(path)?; - let response = self - .client - .post(url) - .json(body) - .send() - .await? - .error_for_status()?; - Ok(response.json().await?) - } -} diff --git a/examples/scheduler/scheduler-node/src/config.rs b/examples/scheduler/scheduler-node/src/config.rs deleted file mode 100644 index fca2eb7..0000000 --- a/examples/scheduler/scheduler-node/src/config.rs +++ /dev/null @@ -1,35 +0,0 @@ -use std::{fs, path::Path}; - -use serde::{Deserialize, Serialize}; - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct PeerInfo { - pub node_id: u64, - pub http_address: String, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct SchedulerConfig { - pub node_id: u64, - pub http_port: u16, - pub peers: Vec, - #[serde(default = "default_sync_interval_ms")] - pub sync_interval_ms: u64, - #[serde(default = "default_lease_ttl_ms")] - pub lease_ttl_ms: u64, -} - -impl SchedulerConfig { - pub fn load(path: &Path) -> anyhow::Result { - let raw = fs::read_to_string(path)?; - Ok(serde_yaml::from_str(&raw)?) - } -} - -const fn default_sync_interval_ms() -> u64 { - 1000 -} - -const fn default_lease_ttl_ms() -> u64 { - 3000 -} diff --git a/examples/scheduler/scheduler-node/src/lib.rs b/examples/scheduler/scheduler-node/src/lib.rs deleted file mode 100644 index 3f17002..0000000 --- a/examples/scheduler/scheduler-node/src/lib.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod client; - -pub use client::SchedulerHttpClient; diff --git a/examples/scheduler/scheduler-node/src/main.rs b/examples/scheduler/scheduler-node/src/main.rs deleted file mode 100644 index 803c212..0000000 --- a/examples/scheduler/scheduler-node/src/main.rs +++ /dev/null @@ -1,36 +0,0 @@ -mod config; -mod server; -mod state; -mod sync; - -use std::path::PathBuf; - -use clap::Parser; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; - -use crate::{config::SchedulerConfig, state::SchedulerState, sync::SyncService}; - -#[derive(Parser, Debug)] -#[command(name = "scheduler-node")] -struct Args { - #[arg(short, long)] - config: PathBuf, -} - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - tracing_subscriber::registry() - .with( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "scheduler_node=info,tower_http=debug".into()), - ) - .with(tracing_subscriber::fmt::layer()) - .init(); - - let args = Args::parse(); - let config = SchedulerConfig::load(&args.config)?; - - let state = SchedulerState::new(config.node_id, config.lease_ttl_ms); - SyncService::new(config.clone(), state.clone()).start(); - server::start_server(config, state).await -} diff --git a/examples/scheduler/scheduler-node/src/server.rs b/examples/scheduler/scheduler-node/src/server.rs deleted file mode 100644 index 662be80..0000000 --- a/examples/scheduler/scheduler-node/src/server.rs +++ /dev/null @@ -1,156 +0,0 @@ -use std::net::SocketAddr; - -use axum::{ - Router, - extract::State, - http::StatusCode, - response::Json, - routing::{get, post}, -}; -use serde::{Deserialize, Serialize}; -use tower_http::trace::TraceLayer; - -use crate::{ - config::SchedulerConfig, - state::{SchedulerState, Snapshot, StateView}, -}; - -#[derive(Serialize)] -struct HealthResponse { - status: &'static str, -} - -#[derive(Deserialize)] -struct EnqueueRequest { - payload: String, -} - -#[derive(Serialize)] -struct EnqueueResponse { - id: u64, -} - -#[derive(Deserialize)] -struct ClaimRequest { - worker_id: String, - max_jobs: usize, -} - -#[derive(Serialize)] -struct ClaimResponse { - jobs: Vec, -} - -#[derive(Serialize)] -struct ClaimedJob { - id: u64, - payload: String, - attempt: u32, -} - -#[derive(Deserialize)] -struct HeartbeatRequest { - worker_id: String, - job_id: u64, -} - -#[derive(Deserialize)] -struct AckRequest { - worker_id: String, - job_id: u64, -} - -#[derive(Serialize)] -struct OperationResponse { - ok: bool, -} - -pub async fn start_server(config: SchedulerConfig, state: SchedulerState) -> anyhow::Result<()> { - let app = Router::new() - .route("/health/live", get(health_live)) - .route("/health/ready", get(health_ready)) - .route("/jobs/enqueue", post(enqueue)) - .route("/jobs/claim", post(claim)) - .route("/jobs/heartbeat", post(heartbeat)) - .route("/jobs/ack", post(ack)) - .route("/jobs/state", get(state_view)) - .route("/internal/snapshot", get(snapshot)) - .layer(TraceLayer::new_for_http()) - .with_state(state.clone()); - - let addr = SocketAddr::from(([0, 0, 0, 0], config.http_port)); - let listener = tokio::net::TcpListener::bind(addr).await?; - - state.set_ready(true).await; - tracing::info!(node_id = state.node_id(), %addr, "scheduler node ready"); - - axum::serve(listener, app).await?; - Ok(()) -} - -async fn health_live() -> (StatusCode, Json) { - (StatusCode::OK, Json(HealthResponse { status: "alive" })) -} - -async fn health_ready(State(state): State) -> (StatusCode, Json) { - if state.is_ready().await { - (StatusCode::OK, Json(HealthResponse { status: "ready" })) - } else { - ( - StatusCode::SERVICE_UNAVAILABLE, - Json(HealthResponse { - status: "not-ready", - }), - ) - } -} - -async fn enqueue( - State(state): State, - Json(request): Json, -) -> (StatusCode, Json) { - let id = state.enqueue(request.payload).await; - (StatusCode::OK, Json(EnqueueResponse { id })) -} - -async fn claim( - State(state): State, - Json(request): Json, -) -> (StatusCode, Json) { - let result = state.claim(request.worker_id, request.max_jobs).await; - let jobs = result - .jobs - .into_iter() - .map(|job| ClaimedJob { - id: job.id, - payload: job.payload, - attempt: job.attempt, - }) - .collect(); - - (StatusCode::OK, Json(ClaimResponse { jobs })) -} - -async fn heartbeat( - State(state): State, - Json(request): Json, -) -> (StatusCode, Json) { - let ok = state.heartbeat(&request.worker_id, request.job_id).await; - (StatusCode::OK, Json(OperationResponse { ok })) -} - -async fn ack( - State(state): State, - Json(request): Json, -) -> (StatusCode, Json) { - let ok = state.ack(&request.worker_id, request.job_id).await; - (StatusCode::OK, Json(OperationResponse { ok })) -} - -async fn state_view(State(state): State) -> Json { - Json(state.state_view().await) -} - -async fn snapshot(State(state): State) -> Json { - Json(state.snapshot().await) -} diff --git a/examples/scheduler/scheduler-node/src/state.rs b/examples/scheduler/scheduler-node/src/state.rs deleted file mode 100644 index d3e8ada..0000000 --- a/examples/scheduler/scheduler-node/src/state.rs +++ /dev/null @@ -1,249 +0,0 @@ -use std::{ - collections::BTreeMap, - sync::Arc, - time::{SystemTime, UNIX_EPOCH}, -}; - -use serde::{Deserialize, Serialize}; -use tokio::sync::RwLock; - -#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] -pub struct Revision { - pub version: u64, - pub origin: u64, -} - -#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] -pub struct JobRecord { - pub id: u64, - pub payload: String, - pub attempt: u32, - pub owner: Option, - pub lease_expires_at_ms: Option, - pub done: bool, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Snapshot { - pub node_id: u64, - pub revision: Revision, - pub next_id: u64, - pub jobs: BTreeMap, -} - -#[derive(Clone, Debug, Eq, PartialEq, Serialize)] -pub struct StateView { - pub revision: Revision, - pub next_id: u64, - pub pending: usize, - pub leased: usize, - pub done: usize, -} - -#[derive(Clone, Debug, Serialize)] -pub struct ClaimResult { - pub jobs: Vec, -} - -#[derive(Debug, Default)] -struct Data { - revision: Revision, - next_id: u64, - jobs: BTreeMap, -} - -#[derive(Clone)] -pub struct SchedulerState { - node_id: u64, - ready: Arc>, - lease_ttl_ms: u64, - data: Arc>, -} - -impl SchedulerState { - pub fn new(node_id: u64, lease_ttl_ms: u64) -> Self { - Self { - node_id, - ready: Arc::new(RwLock::new(false)), - lease_ttl_ms, - data: Arc::new(RwLock::new(Data { - next_id: 1, - ..Data::default() - })), - } - } - - pub const fn node_id(&self) -> u64 { - self.node_id - } - - pub async fn set_ready(&self, value: bool) { - *self.ready.write().await = value; - } - - pub async fn is_ready(&self) -> bool { - *self.ready.read().await - } - - pub async fn enqueue(&self, payload: String) -> u64 { - let mut data = self.data.write().await; - reap_expired_leases(&mut data.jobs); - - let id = data.next_id; - data.next_id = data.next_id.saturating_add(1); - - data.jobs.insert( - id, - JobRecord { - id, - payload, - attempt: 0, - owner: None, - lease_expires_at_ms: None, - done: false, - }, - ); - - bump_revision(&mut data.revision, self.node_id); - id - } - - pub async fn claim(&self, worker_id: String, max_jobs: usize) -> ClaimResult { - let mut data = self.data.write().await; - reap_expired_leases(&mut data.jobs); - - let now = unix_ms(); - let mut claimed = Vec::new(); - - for job in data.jobs.values_mut() { - if claimed.len() >= max_jobs { - break; - } - if job.done || job.owner.is_some() { - continue; - } - - job.attempt = job.attempt.saturating_add(1); - job.owner = Some(worker_id.clone()); - job.lease_expires_at_ms = Some(now.saturating_add(self.lease_ttl_ms)); - claimed.push(job.clone()); - } - - if !claimed.is_empty() { - bump_revision(&mut data.revision, self.node_id); - } - - ClaimResult { jobs: claimed } - } - - pub async fn heartbeat(&self, worker_id: &str, job_id: u64) -> bool { - let mut data = self.data.write().await; - reap_expired_leases(&mut data.jobs); - - let Some(job) = data.jobs.get_mut(&job_id) else { - return false; - }; - - if job.done || job.owner.as_deref() != Some(worker_id) { - return false; - } - - job.lease_expires_at_ms = Some(unix_ms().saturating_add(self.lease_ttl_ms)); - bump_revision(&mut data.revision, self.node_id); - true - } - - pub async fn ack(&self, worker_id: &str, job_id: u64) -> bool { - let mut data = self.data.write().await; - reap_expired_leases(&mut data.jobs); - - let Some(job) = data.jobs.get_mut(&job_id) else { - return false; - }; - - if job.done || job.owner.as_deref() != Some(worker_id) { - return false; - } - - job.done = true; - job.owner = None; - job.lease_expires_at_ms = None; - bump_revision(&mut data.revision, self.node_id); - true - } - - pub async fn state_view(&self) -> StateView { - let data = self.data.read().await; - let mut pending = 0; - let mut leased = 0; - let mut done = 0; - - for job in data.jobs.values() { - if job.done { - done += 1; - } else if job.owner.is_some() { - leased += 1; - } else { - pending += 1; - } - } - - StateView { - revision: data.revision, - next_id: data.next_id, - pending, - leased, - done, - } - } - - pub async fn merge_snapshot(&self, snapshot: Snapshot) { - let mut data = self.data.write().await; - if is_newer_revision(snapshot.revision, data.revision) { - data.revision = snapshot.revision; - data.next_id = snapshot.next_id; - data.jobs = snapshot.jobs; - } - } - - pub async fn snapshot(&self) -> Snapshot { - let data = self.data.read().await; - Snapshot { - node_id: self.node_id, - revision: data.revision, - next_id: data.next_id, - jobs: data.jobs.clone(), - } - } -} - -fn reap_expired_leases(jobs: &mut BTreeMap) { - let now = unix_ms(); - for job in jobs.values_mut() { - if job.done { - continue; - } - - if let Some(expiry) = job.lease_expires_at_ms - && expiry <= now - { - job.owner = None; - job.lease_expires_at_ms = None; - } - } -} - -fn bump_revision(revision: &mut Revision, node_id: u64) { - revision.version = revision.version.saturating_add(1); - revision.origin = node_id; -} - -fn is_newer_revision(candidate: Revision, existing: Revision) -> bool { - (candidate.version, candidate.origin) > (existing.version, existing.origin) -} - -fn unix_ms() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .map_or(0, |duration| duration.as_millis() as u64) -} diff --git a/examples/scheduler/scheduler-node/src/sync.rs b/examples/scheduler/scheduler-node/src/sync.rs deleted file mode 100644 index 2f25802..0000000 --- a/examples/scheduler/scheduler-node/src/sync.rs +++ /dev/null @@ -1,103 +0,0 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; - -use reqwest::Client; -use tokio::sync::Mutex; -use tracing::{debug, warn}; - -use crate::{ - config::SchedulerConfig, - state::{SchedulerState, Snapshot}, -}; - -const WARN_AFTER_CONSECUTIVE_FAILURES: u32 = 5; - -#[derive(Clone)] -pub struct SyncService { - config: Arc, - state: SchedulerState, - client: Client, - failures_by_peer: Arc>>, -} - -impl SyncService { - pub fn new(config: SchedulerConfig, state: SchedulerState) -> Self { - Self { - config: Arc::new(config), - state, - client: Client::new(), - failures_by_peer: Arc::new(Mutex::new(HashMap::new())), - } - } - - pub fn start(&self) { - let service = self.clone(); - tokio::spawn(async move { - service.run().await; - }); - } - - async fn run(self) { - let interval = Duration::from_millis(self.config.sync_interval_ms.max(100)); - loop { - self.sync_once().await; - tokio::time::sleep(interval).await; - } - } - - async fn sync_once(&self) { - for peer in &self.config.peers { - match self.fetch_snapshot(&peer.http_address).await { - Ok(snapshot) => { - self.state.merge_snapshot(snapshot).await; - self.clear_failure_counter(&peer.http_address).await; - } - Err(error) => { - self.record_sync_failure(&peer.http_address, &error).await; - } - } - } - } - - async fn fetch_snapshot(&self, peer_address: &str) -> anyhow::Result { - let url = format!("http://{peer_address}/internal/snapshot"); - let snapshot = self - .client - .get(url) - .send() - .await? - .error_for_status()? - .json() - .await?; - Ok(snapshot) - } - - async fn clear_failure_counter(&self, peer_address: &str) { - let mut failures = self.failures_by_peer.lock().await; - failures.remove(peer_address); - } - - async fn record_sync_failure(&self, peer_address: &str, error: &anyhow::Error) { - let consecutive_failures = { - let mut failures = self.failures_by_peer.lock().await; - let entry = failures.entry(peer_address.to_owned()).or_insert(0); - *entry += 1; - *entry - }; - - if consecutive_failures >= WARN_AFTER_CONSECUTIVE_FAILURES { - warn!( - peer = %peer_address, - %error, - consecutive_failures, - "scheduler sync repeatedly failing" - ); - } else { - debug!( - peer = %peer_address, - %error, - consecutive_failures, - "scheduler sync failed" - ); - } - } -} diff --git a/examples/scheduler/testing/integration/src/app.rs b/examples/scheduler/testing/integration/src/app.rs deleted file mode 100644 index af1db60..0000000 --- a/examples/scheduler/testing/integration/src/app.rs +++ /dev/null @@ -1,77 +0,0 @@ -use std::io::Error; - -use async_trait::async_trait; -use scheduler_node::SchedulerHttpClient; -use serde::{Deserialize, Serialize}; -use testing_framework_core::scenario::{ - Application, ClusterNodeConfigApplication, ClusterNodeView, ClusterPeerView, DynError, - NodeAccess, serialize_cluster_yaml_config, -}; - -pub type SchedulerTopology = testing_framework_core::topology::ClusterTopology; - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct SchedulerPeerInfo { - pub node_id: u64, - pub http_address: String, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct SchedulerNodeConfig { - pub node_id: u64, - pub http_port: u16, - pub peers: Vec, - pub sync_interval_ms: u64, - pub lease_ttl_ms: u64, -} - -pub struct SchedulerEnv; - -#[async_trait] -impl Application for SchedulerEnv { - type Deployment = SchedulerTopology; - type NodeClient = SchedulerHttpClient; - type NodeConfig = SchedulerNodeConfig; - fn build_node_client(access: &NodeAccess) -> Result { - Ok(SchedulerHttpClient::new(access.api_base_url()?)) - } - - fn node_readiness_path() -> &'static str { - "/health/ready" - } -} - -impl ClusterNodeConfigApplication for SchedulerEnv { - type ConfigError = Error; - - fn static_network_port() -> u16 { - 8080 - } - - fn build_cluster_node_config( - node: &ClusterNodeView, - peers: &[ClusterPeerView], - ) -> Result { - let peers = peers - .iter() - .map(|peer| SchedulerPeerInfo { - node_id: peer.index() as u64, - http_address: peer.authority(), - }) - .collect::>(); - - Ok(SchedulerNodeConfig { - node_id: node.index() as u64, - http_port: node.network_port(), - peers, - sync_interval_ms: 500, - lease_ttl_ms: 3000, - }) - } - - fn serialize_cluster_node_config( - config: &Self::NodeConfig, - ) -> Result { - serialize_cluster_yaml_config(config).map_err(Error::other) - } -} diff --git a/examples/scheduler/testing/integration/src/compose_env.rs b/examples/scheduler/testing/integration/src/compose_env.rs deleted file mode 100644 index f844c08..0000000 --- a/examples/scheduler/testing/integration/src/compose_env.rs +++ /dev/null @@ -1,15 +0,0 @@ -use testing_framework_runner_compose::{BinaryConfigNodeSpec, ComposeBinaryApp}; - -use crate::SchedulerEnv; - -const NODE_CONFIG_PATH: &str = "/etc/scheduler/config.yaml"; - -impl ComposeBinaryApp for SchedulerEnv { - fn compose_node_spec() -> BinaryConfigNodeSpec { - BinaryConfigNodeSpec::conventional( - "/usr/local/bin/scheduler-node", - NODE_CONFIG_PATH, - vec![8080, 8081], - ) - } -} diff --git a/examples/scheduler/testing/integration/src/lib.rs b/examples/scheduler/testing/integration/src/lib.rs deleted file mode 100644 index d855502..0000000 --- a/examples/scheduler/testing/integration/src/lib.rs +++ /dev/null @@ -1,10 +0,0 @@ -mod app; -mod compose_env; -mod local_env; -pub mod scenario; - -pub use app::*; -pub use scenario::{SchedulerBuilderExt, SchedulerScenarioBuilder}; - -pub type SchedulerLocalDeployer = testing_framework_runner_local::ProcessDeployer; -pub type SchedulerComposeDeployer = testing_framework_runner_compose::ComposeDeployer; diff --git a/examples/scheduler/testing/integration/src/local_env.rs b/examples/scheduler/testing/integration/src/local_env.rs deleted file mode 100644 index e6560b3..0000000 --- a/examples/scheduler/testing/integration/src/local_env.rs +++ /dev/null @@ -1,42 +0,0 @@ -use std::collections::HashMap; - -use testing_framework_core::scenario::{DynError, StartNodeOptions}; -use testing_framework_runner_local::{ - LocalBinaryApp, LocalNodePorts, LocalPeerNode, LocalProcessSpec, - build_local_cluster_node_config, yaml_node_config, -}; - -use crate::{SchedulerEnv, SchedulerNodeConfig}; - -impl LocalBinaryApp for SchedulerEnv { - fn initial_node_name_prefix() -> &'static str { - "scheduler-node" - } - - fn build_local_node_config_with_peers( - _topology: &Self::Deployment, - index: usize, - ports: &LocalNodePorts, - peers: &[LocalPeerNode], - _peer_ports_by_name: &HashMap, - _options: &StartNodeOptions, - _template_config: Option< - &::NodeConfig, - >, - ) -> Result<::NodeConfig, DynError> { - build_local_cluster_node_config::(index, ports, peers) - } - - fn local_process_spec() -> LocalProcessSpec { - LocalProcessSpec::new("SCHEDULER_NODE_BIN", "scheduler-node") - .with_rust_log("scheduler_node=info") - } - - fn render_local_config(config: &SchedulerNodeConfig) -> Result, DynError> { - yaml_node_config(config) - } - - fn http_api_port(config: &SchedulerNodeConfig) -> u16 { - config.http_port - } -} diff --git a/examples/scheduler/testing/integration/src/scenario.rs b/examples/scheduler/testing/integration/src/scenario.rs deleted file mode 100644 index b6e54eb..0000000 --- a/examples/scheduler/testing/integration/src/scenario.rs +++ /dev/null @@ -1,15 +0,0 @@ -use testing_framework_core::scenario::ScenarioBuilder; - -use crate::{SchedulerEnv, SchedulerTopology}; - -pub type SchedulerScenarioBuilder = ScenarioBuilder; - -pub trait SchedulerBuilderExt: Sized { - fn deployment_with(f: impl FnOnce(SchedulerTopology) -> SchedulerTopology) -> Self; -} - -impl SchedulerBuilderExt for SchedulerScenarioBuilder { - fn deployment_with(f: impl FnOnce(SchedulerTopology) -> SchedulerTopology) -> Self { - SchedulerScenarioBuilder::with_deployment(f(SchedulerTopology::new(3))) - } -} diff --git a/examples/scheduler/testing/workloads/Cargo.toml b/examples/scheduler/testing/workloads/Cargo.toml deleted file mode 100644 index 657653a..0000000 --- a/examples/scheduler/testing/workloads/Cargo.toml +++ /dev/null @@ -1,14 +0,0 @@ -[package] -edition.workspace = true -license.workspace = true -name = "scheduler-runtime-workloads" -version.workspace = true - -[dependencies] -async-trait = { workspace = true } -scheduler-node = { path = "../../scheduler-node" } -scheduler-runtime-ext = { path = "../integration" } -serde = { workspace = true } -testing-framework-core = { workspace = true } -tokio = { workspace = true, features = ["full"] } -tracing = { workspace = true } diff --git a/examples/scheduler/testing/workloads/src/drained.rs b/examples/scheduler/testing/workloads/src/drained.rs deleted file mode 100644 index 62366f3..0000000 --- a/examples/scheduler/testing/workloads/src/drained.rs +++ /dev/null @@ -1,99 +0,0 @@ -use std::time::Duration; - -use async_trait::async_trait; -use scheduler_runtime_ext::SchedulerEnv; -use serde::Deserialize; -use testing_framework_core::scenario::{DynError, Expectation, RunContext}; -use tracing::info; - -#[derive(Clone)] -pub struct SchedulerDrained { - min_done: usize, - timeout: Duration, - poll_interval: Duration, -} - -#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] -struct Revision { - version: u64, - origin: u64, -} - -#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] -struct StateResponse { - revision: Revision, - pending: usize, - leased: usize, - done: usize, -} - -impl SchedulerDrained { - #[must_use] - pub fn new(min_done: usize) -> Self { - Self { - min_done, - timeout: Duration::from_secs(30), - poll_interval: Duration::from_millis(500), - } - } - - #[must_use] - pub const fn timeout(mut self, timeout: Duration) -> Self { - self.timeout = timeout; - self - } -} - -#[async_trait] -impl Expectation for SchedulerDrained { - fn name(&self) -> &str { - "scheduler_drained" - } - - async fn evaluate(&mut self, ctx: &RunContext) -> Result<(), DynError> { - let clients = ctx.node_clients().snapshot(); - if clients.is_empty() { - return Err("no scheduler node clients available".into()); - } - - let deadline = tokio::time::Instant::now() + self.timeout; - while tokio::time::Instant::now() < deadline { - if is_drained_and_converged(&clients, self.min_done).await? { - info!(min_done = self.min_done, "scheduler drained and converged"); - return Ok(()); - } - tokio::time::sleep(self.poll_interval).await; - } - - Err(format!("scheduler not drained within {:?}", self.timeout).into()) - } -} - -async fn is_drained_and_converged( - clients: &[scheduler_node::SchedulerHttpClient], - min_done: usize, -) -> Result { - let Some((first, rest)) = clients.split_first() else { - return Ok(false); - }; - - let baseline = read_state(first).await?; - if baseline.pending != 0 || baseline.leased != 0 || baseline.done < min_done { - return Ok(false); - } - - for client in rest { - let current = read_state(client).await?; - if current != baseline { - return Ok(false); - } - } - - Ok(true) -} - -async fn read_state( - client: &scheduler_node::SchedulerHttpClient, -) -> Result { - Ok(client.get("/jobs/state").await?) -} diff --git a/examples/scheduler/testing/workloads/src/lease_failover.rs b/examples/scheduler/testing/workloads/src/lease_failover.rs deleted file mode 100644 index 15eb0c5..0000000 --- a/examples/scheduler/testing/workloads/src/lease_failover.rs +++ /dev/null @@ -1,205 +0,0 @@ -use std::{collections::HashSet, time::Duration}; - -use async_trait::async_trait; -use scheduler_runtime_ext::SchedulerEnv; -use serde::{Deserialize, Serialize}; -use testing_framework_core::scenario::{DynError, RunContext, Workload}; -use tokio::time::{Instant, sleep}; -use tracing::info; - -#[derive(Clone)] -pub struct SchedulerLeaseFailoverWorkload { - operations: usize, - lease_ttl: Duration, - rate_per_sec: Option, - payload_prefix: String, -} - -#[derive(Serialize)] -struct EnqueueRequest { - payload: String, -} - -#[derive(Deserialize)] -struct EnqueueResponse { - id: u64, -} - -#[derive(Serialize)] -struct ClaimRequest { - worker_id: String, - max_jobs: usize, -} - -#[derive(Deserialize)] -struct ClaimedJob { - id: u64, -} - -#[derive(Deserialize)] -struct ClaimResponse { - jobs: Vec, -} - -#[derive(Serialize)] -struct AckRequest { - worker_id: String, - job_id: u64, -} - -#[derive(Deserialize)] -struct OperationResponse { - ok: bool, -} - -impl SchedulerLeaseFailoverWorkload { - #[must_use] - pub fn new() -> Self { - Self { - operations: 100, - lease_ttl: Duration::from_secs(3), - rate_per_sec: Some(25), - payload_prefix: "scheduler-job".to_owned(), - } - } - - #[must_use] - pub const fn operations(mut self, value: usize) -> Self { - self.operations = value; - self - } - - #[must_use] - pub const fn lease_ttl(mut self, value: Duration) -> Self { - self.lease_ttl = value; - self - } - - #[must_use] - pub const fn rate_per_sec(mut self, value: usize) -> Self { - self.rate_per_sec = Some(value); - self - } -} - -impl Default for SchedulerLeaseFailoverWorkload { - fn default() -> Self { - Self::new() - } -} - -#[async_trait] -impl Workload for SchedulerLeaseFailoverWorkload { - fn name(&self) -> &str { - "scheduler_lease_failover_workload" - } - - async fn start(&self, ctx: &RunContext) -> Result<(), DynError> { - let clients = ctx.node_clients().snapshot(); - let Some(node_a) = clients.first() else { - return Err("no scheduler node clients available".into()); - }; - let node_b = clients.get(1).unwrap_or(node_a); - - let interval = self.rate_per_sec.and_then(compute_interval); - let mut enqueued_ids = Vec::with_capacity(self.operations); - - info!( - operations = self.operations, - "scheduler failover: enqueue phase" - ); - for index in 0..self.operations { - let response: EnqueueResponse = node_a - .post( - "/jobs/enqueue", - &EnqueueRequest { - payload: format!("{}-{index}", self.payload_prefix), - }, - ) - .await?; - enqueued_ids.push(response.id); - if let Some(delay) = interval { - sleep(delay).await; - } - } - - info!("scheduler failover: worker-a claim without ack"); - let first_claim: ClaimResponse = node_a - .post( - "/jobs/claim", - &ClaimRequest { - worker_id: "worker-a".to_owned(), - max_jobs: self.operations, - }, - ) - .await?; - - if first_claim.jobs.len() != self.operations { - return Err(format!( - "worker-a claimed {} jobs, expected {}", - first_claim.jobs.len(), - self.operations - ) - .into()); - } - - sleep(self.lease_ttl + Duration::from_millis(500)).await; - - info!("scheduler failover: worker-b reclaim and ack"); - let mut pending_ids: HashSet = enqueued_ids.into_iter().collect(); - let reclaim_deadline = Instant::now() + Duration::from_secs(20); - - while !pending_ids.is_empty() && Instant::now() < reclaim_deadline { - let claim: ClaimResponse = node_b - .post( - "/jobs/claim", - &ClaimRequest { - worker_id: "worker-b".to_owned(), - max_jobs: pending_ids.len(), - }, - ) - .await?; - - if claim.jobs.is_empty() { - sleep(Duration::from_millis(200)).await; - continue; - } - - for job in claim.jobs { - if !pending_ids.remove(&job.id) { - return Err(format!("unexpected reclaimed job id {}", job.id).into()); - } - - let ack: OperationResponse = node_b - .post( - "/jobs/ack", - &AckRequest { - worker_id: "worker-b".to_owned(), - job_id: job.id, - }, - ) - .await?; - - if !ack.ok { - return Err(format!("failed to ack reclaimed job {}", job.id).into()); - } - } - } - - if !pending_ids.is_empty() { - return Err( - format!("scheduler failover left {} unacked jobs", pending_ids.len()).into(), - ); - } - - Ok(()) - } -} - -fn compute_interval(rate_per_sec: usize) -> Option { - if rate_per_sec == 0 { - return None; - } - - Some(Duration::from_millis((1000 / rate_per_sec as u64).max(1))) -} diff --git a/examples/scheduler/testing/workloads/src/lib.rs b/examples/scheduler/testing/workloads/src/lib.rs deleted file mode 100644 index 6df35b7..0000000 --- a/examples/scheduler/testing/workloads/src/lib.rs +++ /dev/null @@ -1,8 +0,0 @@ -mod drained; -mod lease_failover; - -pub use drained::SchedulerDrained; -pub use lease_failover::SchedulerLeaseFailoverWorkload; -pub use scheduler_runtime_ext::{ - SchedulerBuilderExt, SchedulerEnv, SchedulerScenarioBuilder, SchedulerTopology, -};