From ad32d12019bafa71b7336eca58b2742d98edab95 Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Quiros Date: Thu, 20 Oct 2022 11:47:50 +0200 Subject: [PATCH] Public release --- .github/workflows/main.yml | 74 +++++ Cargo.lock | 398 +++++++++++++++++++++++++++ Cargo.toml | 10 + LICENSE | 25 ++ README.md | 45 +++ overwatch-derive/Cargo.toml | 19 ++ overwatch-derive/src/lib.rs | 293 ++++++++++++++++++++ overwatch-derive/src/utils.rs | 34 +++ overwatch/Cargo.toml | 22 ++ overwatch/src/lib.rs | 29 ++ overwatch/src/overwatch/commands.rs | 68 +++++ overwatch/src/overwatch/handle.rs | 88 ++++++ overwatch/src/overwatch/mod.rs | 302 ++++++++++++++++++++ overwatch/src/services/handle.rs | 168 +++++++++++ overwatch/src/services/life_cycle.rs | 1 + overwatch/src/services/mod.rs | 89 ++++++ overwatch/src/services/relay.rs | 196 +++++++++++++ overwatch/src/services/settings.rs | 90 ++++++ overwatch/src/services/state.rs | 204 ++++++++++++++ overwatch/src/utils/const_checks.rs | 35 +++ overwatch/src/utils/mod.rs | 2 + overwatch/src/utils/runtime.rs | 9 + overwatch/tests/print_service.rs | 116 ++++++++ overwatch/tests/settings_update.rs | 85 ++++++ overwatch/tests/state_handling.rs | 101 +++++++ 25 files changed, 2503 insertions(+) create mode 100644 .github/workflows/main.yml create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 LICENSE create mode 100644 README.md create mode 100644 overwatch-derive/Cargo.toml create mode 100644 overwatch-derive/src/lib.rs create mode 100644 overwatch-derive/src/utils.rs create mode 100644 overwatch/Cargo.toml create mode 100644 overwatch/src/lib.rs create mode 100644 overwatch/src/overwatch/commands.rs create mode 100644 overwatch/src/overwatch/handle.rs create mode 100644 overwatch/src/overwatch/mod.rs create mode 100644 overwatch/src/services/handle.rs create mode 100644 overwatch/src/services/life_cycle.rs create mode 100644 overwatch/src/services/mod.rs create mode 100644 overwatch/src/services/relay.rs create mode 100644 overwatch/src/services/settings.rs create mode 100644 overwatch/src/services/state.rs create mode 100644 overwatch/src/utils/const_checks.rs create mode 100644 overwatch/src/utils/mod.rs create mode 100644 overwatch/src/utils/runtime.rs create mode 100644 overwatch/tests/print_service.rs create mode 100644 overwatch/tests/settings_update.rs create mode 100644 overwatch/tests/state_handling.rs diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 0000000..51596f9 --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,74 @@ +# copy of https://github.com/actions-rs/meta/blob/master/recipes/quickstart.md +on: [push, pull_request] + +name: CI + +jobs: + check: + name: Check + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + submodules: true + - uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + - uses: actions-rs/cargo@v1 + continue-on-error: false + with: + command: check + + test: + name: Test Suite + strategy: + matrix: + os: [ubuntu-latest, windows-latest, macos-latest] + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v2 + with: + submodules: true + - uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + - uses: actions-rs/cargo@v1 + continue-on-error: false + with: + command: build + - uses: actions-rs/cargo@v1 + continue-on-error: false + with: + command: test + + lints: + name: Rust lints + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + submodules: true + - uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + components: rustfmt, clippy + + - name: Run cargo fmt + uses: actions-rs/cargo@v1 + continue-on-error: false + with: + command: fmt + args: --all -- --check + + - name: Run cargo clippy + uses: actions-rs/cargo@v1 + continue-on-error: false + with: + command: clippy + args: -- --deny warnings \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..cc42289 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,398 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "async-trait" +version = "0.1.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e805d94e6b5001b651426cf4cd446b1ab5f319d27bab5c644f61de0a804360c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "bytes" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "const-str" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21077772762a1002bb421c3af42ac1725fa56066bfc53d9a55bb79905df2aaf3" +dependencies = [ + "const-str-proc-macro", +] + +[[package]] +name = "const-str-proc-macro" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e1e0fdd2e5d3041e530e1b21158aeeef8b5d0e306bc5c1e3d6cf0930d10e25a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac" + +[[package]] +name = "futures-executor" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" + +[[package]] +name = "futures-macro" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9" + +[[package]] +name = "futures-task" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea" + +[[package]] +name = "futures-util" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "heck" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" + +[[package]] +name = "hermit-abi" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" +dependencies = [ + "libc", +] + +[[package]] +name = "libc" +version = "0.2.135" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68783febc7782c6c5cb401fbda4de5a9898be1762314da0bb2c10ced61f18b0c" + +[[package]] +name = "memchr" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" + +[[package]] +name = "num_cpus" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "once_cell" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e82dad04139b71a90c080c8463fe0dc7902db5192d939bd0950f074d014339e1" + +[[package]] +name = "overwatch" +version = "0.1.0" +dependencies = [ + "async-trait", + "const-str", + "futures", + "overwatch-derive", + "thiserror", + "tokio", + "tokio-stream", + "tracing", +] + +[[package]] +name = "overwatch-derive" +version = "0.1.0" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", + "tracing", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + +[[package]] +name = "proc-macro2" +version = "1.0.47" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "slab" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef" +dependencies = [ + "autocfg", +] + +[[package]] +name = "syn" +version = "1.0.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fcd952facd492f9be3ef0d0b7032a6e442ee9b361d4acc2b1d0c4aaa5f613a1" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "thiserror" +version = "1.0.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio" +version = "1.21.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099" +dependencies = [ + "autocfg", + "bytes", + "memchr", + "num_cpus", + "pin-project-lite", + "tokio-macros", +] + +[[package]] +name = "tokio-macros" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-stream" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", + "tokio-util", +] + +[[package]] +name = "tokio-util" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tracing" +version = "0.1.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" +dependencies = [ + "once_cell", +] + +[[package]] +name = "unicode-ident" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3" + +[[package]] +name = "version_check" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..ea6aa8d --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,10 @@ +[workspace] + +members = [ + "overwatch", + "overwatch-derive", +] + +[profile.release-opt] +inherits = "release" +lto = true \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..b71beea --- /dev/null +++ b/LICENSE @@ -0,0 +1,25 @@ +Overwatch is licensed under the MIT License +Copyright (c) 2018 Status Research & Development GmbH +----------------------------------------------------- + +The MIT License (MIT) + +Copyright (c) 2022 Status Research & Development GmbH + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..d71538f --- /dev/null +++ b/README.md @@ -0,0 +1,45 @@ +# Overwatch + +Overwatch is a framework to easily construct applications that requires of several independent +parts that needs communication between them. +Everything is self-contained, and it matches somewhat the advantages of microservices. + +## Design Goals + +- Modularity: + - Components should be self-contained (as possible) + - Communication relations between components should be specifically defined + - Components should be mockable. This is rather important for measurements and testing. + +- Single responsibility: + - It is easier to isolate problems + - Minimal sharing when unavoidable + +- Debuggeability + - Easy to track workflow + - Easy to test + - Easy to measure + - Asynchronous Communication + +## Main components + +- Overwatch: the main messenger relay component (internal communications). It is also be responsible of managing other components lifecycle and handling configuration updates. +- Services (handled by the *overwatch*) + +## Project Structure + +* `overwatch`: Services runner framework +* `overwatch-derive`: Overwatch macros + +## Build & Test + +Minimal Rust supported version: `1.63` + +When in development, please, use `cargo clippy` to build the project. Any warning is promoted to an error in our CI. + +* Use `cargo test` for executing tests, and `cargo test -- --nocapture` for seeing test outputs. +* Use `cargo run --exampel {example_name}` to run an example. + +### Build Documentation + +Simply run `cargo doc --open --no-deps` to build and access a copy of the generated documentation. diff --git a/overwatch-derive/Cargo.toml b/overwatch-derive/Cargo.toml new file mode 100644 index 0000000..243f890 --- /dev/null +++ b/overwatch-derive/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "overwatch-derive" +version = "0.1.0" +edition = "2021" +authors = [ + "Daniel Sanchez Quiros " +] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +proc-macro = true + +[dependencies] +heck = "0.4" +syn = "1.0" +quote = "1.0" +proc-macro2 = "1.0" +proc-macro-error = "1.0" +tracing = "0.1" diff --git a/overwatch-derive/src/lib.rs b/overwatch-derive/src/lib.rs new file mode 100644 index 0000000..69335e4 --- /dev/null +++ b/overwatch-derive/src/lib.rs @@ -0,0 +1,293 @@ +mod utils; + +use proc_macro_error::{abort_call_site, proc_macro_error}; +use quote::{format_ident, quote}; +use syn::{punctuated::Punctuated, token::Comma, Data, DeriveInput, Field}; + +#[proc_macro_derive(Services)] +#[proc_macro_error] +pub fn derive_services(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let input: DeriveInput = syn::parse(input).expect("A syn parseable token stream"); + let derived = impl_services(&input); + derived.into() +} + +fn service_settings_identifier_from( + services_identifier: &proc_macro2::Ident, +) -> proc_macro2::Ident { + format_ident!("{}ServiceSettings", services_identifier) +} + +fn service_settings_field_identifier_from( + field_identifier: &proc_macro2::Ident, +) -> proc_macro2::Ident { + format_ident!("{}_settings", field_identifier) +} + +fn impl_services(input: &DeriveInput) -> proc_macro2::TokenStream { + use syn::DataStruct; + + let struct_identifier = &input.ident; + let data = &input.data; + match data { + Data::Struct(DataStruct { + fields: syn::Fields::Named(fields), + .. + }) => impl_services_for_struct(struct_identifier, &fields.named), + _ => { + abort_call_site!("Deriving Services is only supported for named Structs"); + } + } +} + +fn impl_services_for_struct( + identifier: &proc_macro2::Ident, + fields: &Punctuated, +) -> proc_macro2::TokenStream { + let settings = generate_services_settings(identifier, fields); + let unique_ids_check = generate_assert_unique_identifiers(identifier, fields); + let services_impl = generate_services_impl(identifier, fields); + + quote! { + #unique_ids_check + + #settings + + #services_impl + } +} + +fn generate_services_settings( + services_identifier: &proc_macro2::Ident, + fields: &Punctuated, +) -> proc_macro2::TokenStream { + let services_settings = fields.iter().map(|field| { + let service_name = field.ident.as_ref().expect("A named struct attribute"); + let _type = utils::extract_type_from(&field.ty); + + quote!(pub #service_name: <#_type as ::overwatch::services::ServiceData>::Settings) + }); + let services_settings_identifier = service_settings_identifier_from(services_identifier); + quote! { + #[derive(::std::clone::Clone, ::std::fmt::Debug)] + pub struct #services_settings_identifier { + #( #services_settings ),* + } + } +} + +fn generate_assert_unique_identifiers( + services_identifier: &proc_macro2::Ident, + fields: &Punctuated, +) -> proc_macro2::TokenStream { + let services_ids = fields.iter().map(|field| { + let _type = utils::extract_type_from(&field.ty); + quote! { + <#_type as ::overwatch::services::ServiceData>::SERVICE_ID + } + }); + let services_ids_check = format_ident!( + "__{}__CONST_CHECK_UNIQUE_SERVICES_IDS", + services_identifier.to_string().to_uppercase() + ); + + quote! { + const #services_ids_check: () = assert!(::overwatch::utils::const_checks::unique_ids(&[#( #services_ids ),*])); + } +} + +fn generate_services_impl( + services_identifier: &proc_macro2::Ident, + fields: &Punctuated, +) -> proc_macro2::TokenStream { + let services_settings_identifier = service_settings_identifier_from(services_identifier); + let impl_new = generate_new_impl(fields); + let impl_start_all = generate_start_all_impl(fields); + let impl_start = generate_start_impl(fields); + let impl_stop = generate_stop_impl(fields); + let impl_relay = generate_request_relay_impl(fields); + let impl_update_settings = generate_update_settings_impl(fields); + + quote! { + impl ::overwatch::overwatch::Services for #services_identifier { + type Settings = #services_settings_identifier; + + #impl_new + + #impl_start_all + + #impl_start + + #impl_stop + + #impl_relay + + #impl_update_settings + } + } +} + +fn generate_new_impl(fields: &Punctuated) -> proc_macro2::TokenStream { + let fields_settings = fields.iter().map(|field| { + let field_identifier = field.ident.as_ref().expect("A struct attribute identifier"); + let settings_field_identifier = service_settings_field_identifier_from(field_identifier); + quote! { + #field_identifier: #settings_field_identifier + } + }); + + let managers = fields.iter().map(|field| { + let field_identifier = field.ident.as_ref().expect("A struct attribute identifier"); + let service_type = utils::extract_type_from(&field.ty); + let settings_field_identifier = service_settings_field_identifier_from(field_identifier); + quote! { + #field_identifier: { + let (runtime, manager) = + ::overwatch::services::handle::ServiceHandle::<#service_type>::new( + #settings_field_identifier, overwatch_handle.clone(), + ); + runtimes.insert(<#service_type as ::overwatch::services::ServiceData>::SERVICE_ID, runtime); + manager + } + } + }); + + quote! { + fn new(settings: Self::Settings, overwatch_handle: ::overwatch::overwatch::handle::OverwatchHandle) -> ( + std::collections::HashMap<::overwatch::services::ServiceId, ::std::option::Option<::tokio::runtime::Runtime>>, + Self + ) { + let Self::Settings { + #( #fields_settings ),* + } = settings; + + let mut runtimes = ::std::collections::HashMap::new(); + + let app = Self { + #( #managers ),* + }; + + (runtimes, app) + } + } +} + +fn generate_start_all_impl(fields: &Punctuated) -> proc_macro2::TokenStream { + let call_start = fields.iter().map(|field| { + let field_identifier = field.ident.as_ref().expect("A struct attribute identifier"); + quote! { + self.#field_identifier.service_runner().run(); + } + }); + + quote! { + #[::tracing::instrument(skip(self), err)] + fn start_all(&mut self) -> Result<(), ::overwatch::overwatch::Error> { + #( #call_start )* + Ok(()) + } + } +} + +fn generate_start_impl(fields: &Punctuated) -> proc_macro2::TokenStream { + let cases = fields.iter().map(|field| { + let field_identifier = field.ident.as_ref().expect("A struct attribute identifier"); + let type_id = utils::extract_type_from(&field.ty); + quote! { + <#type_id as ::overwatch::services::ServiceData>::SERVICE_ID => { + self.#field_identifier.service_runner().run(); + Ok(()) + } + } + }); + + quote! { + #[::tracing::instrument(skip(self), err)] + fn start(&mut self, service_id: ::overwatch::services::ServiceId) -> Result<(), ::overwatch::overwatch::Error> { + match service_id { + #( #cases ),* + service_id => Err(::overwatch::overwatch::Error::Unavailable { service_id }) + } + } + } +} + +fn generate_stop_impl(fields: &Punctuated) -> proc_macro2::TokenStream { + let cases = fields.iter().map(|field| { + let _field_identifier = field.ident.as_ref().expect("A struct attribute identifier"); + let type_id = utils::extract_type_from(&field.ty); + // TODO: actually stop them here once service lifecycle is implemented + quote! { + <#type_id as ::overwatch::services::ServiceData>::SERVICE_ID => { unimplemented!() } + } + }); + + quote! { + #[::tracing::instrument(skip(self), err)] + fn stop(&mut self, service_id: ::overwatch::services::ServiceId) -> Result<(), ::overwatch::overwatch::Error> { + match service_id { + #( #cases ),* + service_id => Err(::overwatch::overwatch::Error::Unavailable { service_id }) + } + } + } +} + +fn generate_request_relay_impl(fields: &Punctuated) -> proc_macro2::TokenStream { + let cases = fields.iter().map(|field| { + let field_identifier = field.ident.as_ref().expect("A struct attribute identifier"); + let type_id = utils::extract_type_from(&field.ty); + quote! { + <#type_id as ::overwatch::services::ServiceData>::SERVICE_ID => { + Ok(::std::boxed::Box::new( + self.#field_identifier + .relay_with() + .expect("An open relay to service is established") + ) as ::overwatch::services::relay::AnyMessage) + } + } + }); + + quote! { + #[::tracing::instrument(skip(self), err)] + fn request_relay(&mut self, service_id: ::overwatch::services::ServiceId) -> ::overwatch::services::relay::RelayResult { + { + match service_id { + #( #cases )* + service_id => Err(::overwatch::services::relay::RelayError::Unavailable { service_id }) + } + } + } + } +} + +fn generate_update_settings_impl(fields: &Punctuated) -> proc_macro2::TokenStream { + let fields_settings = fields.iter().map(|field| { + let field_identifier = field.ident.as_ref().expect("A struct attribute identifier"); + let settings_field_identifier = service_settings_field_identifier_from(field_identifier); + quote! { + #field_identifier: #settings_field_identifier + } + }); + + let update_settings_call = fields.iter().map(|field| { + let field_identifier = field.ident.as_ref().expect("A struct attribute identifier"); + let settings_field_identifier = service_settings_field_identifier_from(field_identifier); + quote! { + self.#field_identifier.update_settings(#settings_field_identifier); + } + }); + + quote! { + #[::tracing::instrument(skip(self, settings), err)] + fn update_settings(&mut self, settings: Self::Settings) -> Result<(), ::overwatch::overwatch::Error> { + let Self::Settings { + #( #fields_settings ),* + } = settings; + + #( #update_settings_call )* + + Ok(()) + } + } +} diff --git a/overwatch-derive/src/utils.rs b/overwatch-derive/src/utils.rs new file mode 100644 index 0000000..0601a83 --- /dev/null +++ b/overwatch-derive/src/utils.rs @@ -0,0 +1,34 @@ +use proc_macro_error::abort_call_site; +use quote::ToTokens; +use syn::{GenericArgument, PathArguments, Type}; + +pub fn extract_type_from(ty: &Type) -> Type { + let stringify_type = ty.clone().into_token_stream().to_string(); + + match ty { + Type::Path(type_path) if type_path.qself.is_none() => { + // Get the first segment of the path: + let type_params = type_path + .path + .segments + .iter() + .next() + .cloned() + .unwrap() + .arguments; + // It should have only on angle-bracketed param (""): + let generic_arg = match type_params { + PathArguments::AngleBracketed(params) => { + params.args.iter().next().cloned().unwrap() + } + _ => abort_call_site!("Expected single type argument, found {}", stringify_type), + }; + // This argument must be a type: + match generic_arg { + GenericArgument::Type(ty) => ty, + _ => abort_call_site!("Expected single type argument, found {}", stringify_type), + } + } + _ => abort_call_site!("Expected single type argument, found {}", stringify_type), + } +} diff --git a/overwatch/Cargo.toml b/overwatch/Cargo.toml new file mode 100644 index 0000000..aeb1da5 --- /dev/null +++ b/overwatch/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "overwatch" +version = "0.1.0" +edition = "2021" +authors = [ + "Daniel Sanchez Quiros " +] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +overwatch-derive = { path = "../overwatch-derive" } +const-str = "0.3" +async-trait = "0.1" +futures = "0.3" +thiserror = "1.0" +tokio = { version = "1.17", features = ["rt-multi-thread", "sync", "time"] } +tokio-stream = {version ="0.1", features = ["sync"] } +tracing = "0.1" + +[dev-dependencies] +tokio = { version = "1.17", features = ["rt-multi-thread", "sync", "time", "io-std", "io-util", "macros"] } diff --git a/overwatch/src/lib.rs b/overwatch/src/lib.rs new file mode 100644 index 0000000..713f3b6 --- /dev/null +++ b/overwatch/src/lib.rs @@ -0,0 +1,29 @@ +//! Overwatch is a framework to easily construct applications that requires of several independent +//! parts that needs communication between them. +//! Everything is self contained and it matches somewhat the advantages of microservices. +//! +//! ## Design Goals +//! +//! - Modularity: +//! - Components should be self-contained (as possible) +//! - Communication relations between components should be specifically defined +//! - Components should be mockable. This is rather important for measurements and testing. +//! +//! - Single responsibility: +//! - It is easier to isolate problems +//! - Minimal sharing when unavoidable +//! +//! - Debuggeability +//! - Easy to track workflow +//! - Easy to test +//! - Easy to measure +//! - Asynchronous Communication +//! +//! ## Main components +//! +//! - Overwatch: the main messenger relay component (internal communications). It is also be responsible of managing other components lifecycle and handling configuration updates. +//! - Services (handled by the *overwatch*) + +pub mod overwatch; +pub mod services; +pub mod utils; diff --git a/overwatch/src/overwatch/commands.rs b/overwatch/src/overwatch/commands.rs new file mode 100644 index 0000000..a5989f8 --- /dev/null +++ b/overwatch/src/overwatch/commands.rs @@ -0,0 +1,68 @@ +// std + +// crates +use crate::overwatch::AnySettings; +use tokio::sync::oneshot; + +// internal +use crate::services::relay::RelayResult; +use crate::services::ServiceId; + +#[derive(Debug)] +pub(crate) struct ReplyChannel(pub(crate) oneshot::Sender); + +impl From> for ReplyChannel { + fn from(sender: oneshot::Sender) -> Self { + Self(sender) + } +} + +impl ReplyChannel { + pub async fn reply(self, message: M) -> Result<(), M> { + self.0.send(message) + } +} + +/// Command for requesting communications with another service +#[derive(Debug)] +pub struct RelayCommand { + pub(crate) service_id: ServiceId, + pub(crate) reply_channel: ReplyChannel, +} + +/// Command for managing [`ServiceCore`](crate::services::ServiceCore) lifecycle +#[allow(unused)] +#[derive(Debug)] +pub struct ServiceLifeCycle { + service_id: ServiceId, + reply_channel: ReplyChannel, +} + +/// [`ServiceCore`](crate::services::ServiceCore) lifecycle related commands +#[derive(Debug)] +pub enum ServiceLifeCycleCommand { + Shutdown(ServiceLifeCycle<()>), + Kill(ServiceLifeCycle<()>), + Start(ServiceLifeCycle<()>), + Stop(ServiceLifeCycle<()>), +} + +/// [`Overwatch`](crate::overwatch::Overwatch) lifecycle related commands +#[derive(Debug)] +pub enum OverwatchLifeCycleCommand { + Shutdown, + Kill, +} + +/// [`Overwatch`](crate::overwatch::Overwatch) settings update command +#[derive(Debug)] +pub struct SettingsCommand(pub(crate) AnySettings); + +/// [`Overwatch`](crate::overwatch::Overwatch) tasks related commands +#[derive(Debug)] +pub enum OverwatchCommand { + Relay(RelayCommand), + ServiceLifeCycle(ServiceLifeCycleCommand), + OverwatchLifeCycle(OverwatchLifeCycleCommand), + Settings(SettingsCommand), +} diff --git a/overwatch/src/overwatch/handle.rs b/overwatch/src/overwatch/handle.rs new file mode 100644 index 0000000..f4213ef --- /dev/null +++ b/overwatch/src/overwatch/handle.rs @@ -0,0 +1,88 @@ +// std + +// crates +use crate::overwatch::commands::{OverwatchCommand, OverwatchLifeCycleCommand, SettingsCommand}; +use crate::overwatch::Services; +use tokio::runtime::Handle; +use tokio::sync::mpsc::Sender; +use tracing::{error, info, instrument}; + +// internal +use crate::services::relay::Relay; +use crate::services::ServiceCore; + +/// Handler object over the main Overwatch runner +/// It handles communications to the main Overwatch runner. +#[derive(Clone, Debug)] +pub struct OverwatchHandle { + #[allow(unused)] + runtime_handle: Handle, + sender: Sender, +} + +impl OverwatchHandle { + pub fn new(runtime_handle: Handle, sender: Sender) -> Self { + Self { + runtime_handle, + sender, + } + } + + /// Request for a relay to an specific service by type + pub fn relay(&self) -> Relay { + Relay::new(self.clone()) + } + + /// Send a shutdown signal to the overwatch runner + pub async fn shutdown(&mut self) { + info!("Shutting down Overwatch"); + if let Err(e) = self + .sender + .send(OverwatchCommand::OverwatchLifeCycle( + OverwatchLifeCycleCommand::Shutdown, + )) + .await + { + dbg!(e); + } + } + + /// Send a kill signal to the overwatch runner + pub async fn kill(&mut self) { + info!("Killing Overwatch"); + if let Err(e) = self + .sender + .send(OverwatchCommand::OverwatchLifeCycle( + OverwatchLifeCycleCommand::Kill, + )) + .await + { + dbg!(e); + } + } + + /// Send an overwatch command to the overwatch runner + #[instrument(name = "overwatch-command-send", skip(self))] + pub async fn send(&mut self, command: OverwatchCommand) { + if let Err(e) = self.sender.send(command).await { + error!(error=?e, "Error sending overwatch command"); + } + } + + #[instrument(skip(self))] + pub async fn update_settings(&mut self, settings: S::Settings) { + if let Err(e) = self + .sender + .send(OverwatchCommand::Settings(SettingsCommand(Box::new( + settings, + )))) + .await + { + error!(error=?e, "Error updating settings") + } + } + + pub fn runtime(&self) -> &Handle { + &self.runtime_handle + } +} diff --git a/overwatch/src/overwatch/mod.rs b/overwatch/src/overwatch/mod.rs new file mode 100644 index 0000000..9737f06 --- /dev/null +++ b/overwatch/src/overwatch/mod.rs @@ -0,0 +1,302 @@ +pub mod commands; +pub mod handle; +// std + +use std::any::Any; +use std::collections::HashMap; +use std::fmt::Debug; +use std::future::Future; + +// crates + +use async_trait::async_trait; +use thiserror::Error; +use tokio::runtime::{Handle, Runtime}; +use tokio::sync::mpsc::Receiver; +use tokio::sync::oneshot; +use tokio::task::JoinHandle; +use tracing::{info, instrument}; + +// internal + +use crate::overwatch::commands::{ + OverwatchCommand, OverwatchLifeCycleCommand, RelayCommand, SettingsCommand, +}; +use crate::overwatch::handle::OverwatchHandle; +use crate::services::relay::RelayResult; +use crate::services::{ServiceError, ServiceId}; +use crate::utils::runtime::default_multithread_runtime; + +/// Overwatch base error type +#[derive(Error, Debug)] +pub enum Error { + #[error(transparent)] + Relay(#[from] ServiceError), + + #[error("Service {service_id} is unavailable")] + Unavailable { service_id: ServiceId }, +} + +/// Signal sent so overwatch finish execution +type FinishOverwatchSignal = (); + +/// Marker trait for settings related elements +pub type AnySettings = Box; + +/// An overwatch run anything that implements this trait +/// An implementor of this trait would have to handle the inner [`ServiceCore`](crate::services::ServiceCore) +#[async_trait] +pub trait Services: Send + Sync { + /// Inner [`ServiceCore::Settings`](crate::services::ServiceCore) grouping type. + /// Normally this will be a settings object that group all the inner services settings. + type Settings: Debug + Send + 'static; + + /// Spawn a new instance of the Services object + /// It returns a `(ServiceId, Runtime)` where Runtime is the `tokio::runtime::Runtime` attached for each + /// service. + /// It also returns an instance of the implementing type. + fn new( + settings: Self::Settings, + overwatch_handle: OverwatchHandle, + ) -> (HashMap>, Self); + + /// Start a services attached to the trait implementer + fn start(&mut self, service_id: ServiceId) -> Result<(), Error>; + + // TODO: this probably will be removed once the services lifecycle is implemented + /// Start all services attached to the trait implementer + fn start_all(&mut self) -> Result<(), Error>; + + /// Stop a service attached to the trait implementer + fn stop(&mut self, service_id: ServiceId) -> Result<(), Error>; + + /// Request communication relay to one of the services + fn request_relay(&mut self, service_id: ServiceId) -> RelayResult; + + /// Update service settings + fn update_settings(&mut self, settings: Self::Settings) -> Result<(), Error>; +} + +/// `OverwatchRunner` is the entity that handles a running overwatch +/// it is usually one-shot. It contains what it is needed just to be run as a main loop +/// and a system to be able to stop it running. Meaning that it i responsible of the Overwatch +/// application lifecycle. +pub struct OverwatchRunner { + services: S, + #[allow(unused)] + handle: OverwatchHandle, + finish_signal_sender: oneshot::Sender<()>, +} + +/// Overwatch thread identifier +/// it is used when creating the `tokio::runtime::Runtime` that Overwatch uses internally +pub const OVERWATCH_THREAD_NAME: &str = "Overwatch"; + +impl OverwatchRunner +where + S: Services + 'static, +{ + /// Start the Overwatch runner process + /// It creates the `tokio::runtime::Runtime`, initialize the [`Services`] and start listening for + /// Overwatch related tasks. + /// Returns the [`Overwatch`] instance that handles this runner. + pub fn run(settings: S::Settings, runtime: Option) -> Overwatch { + let runtime = runtime.unwrap_or_else(default_multithread_runtime); + + let (finish_signal_sender, finish_runner_signal) = tokio::sync::oneshot::channel(); + let (commands_sender, commands_receiver) = tokio::sync::mpsc::channel(16); + let handle = OverwatchHandle::new(runtime.handle().clone(), commands_sender); + let (services_runtimes, services) = S::new(settings, handle.clone()); + let runner = OverwatchRunner { + services, + handle: handle.clone(), + finish_signal_sender, + }; + runtime.spawn(async move { runner.run_(commands_receiver).await }); + Overwatch { + runtime, + services_runtimes, + handle, + finish_runner_signal, + } + } + + #[instrument(name = "overwatch-run", skip_all)] + async fn run_(self, mut receiver: Receiver) { + let Self { + mut services, + handle: _, + finish_signal_sender, + } = self; + // TODO: this probably need to be manually done, or at least handled by a flag + services.start_all().expect("Services to start running"); + while let Some(command) = receiver.recv().await { + info!(command = ?command, "Overwatch command received"); + match command { + OverwatchCommand::Relay(relay_command) => { + Self::handle_relay(&mut services, relay_command).await; + } + OverwatchCommand::ServiceLifeCycle(_) => { + unimplemented!("Services life cycle is still not supported!"); + } + OverwatchCommand::OverwatchLifeCycle(command) => { + if matches!( + command, + OverwatchLifeCycleCommand::Kill | OverwatchLifeCycleCommand::Shutdown + ) { + break; + } + } + OverwatchCommand::Settings(settings) => { + Self::handle_settings_update(&mut services, settings).await; + } + } + } + // signal that we finished execution + finish_signal_sender + .send(()) + .expect("Overwatch run finish signal to be sent properly"); + } + + async fn handle_relay(services: &mut S, command: RelayCommand) { + let RelayCommand { + service_id, + reply_channel, + } = command; + // send requested rely channel result to requesting service + if let Err(Err(e)) = reply_channel + .reply(services.request_relay(service_id)) + .await + { + info!(error=?e, "Error requesting relay for service {}", service_id) + } + } + + async fn handle_settings_update(services: &mut S, command: SettingsCommand) { + let SettingsCommand(settings) = command; + if let Ok(settings) = settings.downcast::() { + if let Err(e) = services.update_settings(*settings) { + // TODO: add proper logging + dbg!(e); + } + } else { + unreachable!("Statically should always be of the correct type"); + } + } +} + +/// Main Overwatch entity +/// It manages the overwatch runtime and handle +pub struct Overwatch { + runtime: Runtime, + #[allow(unused)] + services_runtimes: HashMap>, + handle: OverwatchHandle, + finish_runner_signal: oneshot::Receiver, +} + +impl Overwatch { + /// Get the overwatch handle + /// [`OverwatchHandle`](crate::overwatch::handle::OverwatchHandle) is cloneable, so it can be done on demand + pub fn handle(&self) -> &OverwatchHandle { + &self.handle + } + + /// Get the underllaying tokio runtime handle + pub fn runtime(&self) -> &Handle { + self.runtime.handle() + } + + /// Spawn a new task within the Overwatch runtime + pub fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.runtime.spawn(future) + } + + /// Block until Overwatch finish its execution + pub fn wait_finished(self) { + let Self { + runtime, + finish_runner_signal, + .. + } = self; + runtime.block_on(async move { + let signal_result = finish_runner_signal.await; + signal_result.expect("A finished signal arrived"); + }); + } +} + +#[cfg(test)] +mod test { + use crate::overwatch::handle::OverwatchHandle; + use crate::overwatch::{Error, OverwatchRunner, Services}; + use crate::services::relay::{RelayError, RelayResult}; + use crate::services::ServiceId; + use std::collections::HashMap; + use std::time::Duration; + use tokio::runtime::Runtime; + use tokio::time::sleep; + + struct EmptyServices; + + impl Services for EmptyServices { + type Settings = (); + + fn new( + _settings: Self::Settings, + _overwatch_handle: OverwatchHandle, + ) -> (HashMap>, Self) { + (HashMap::new(), EmptyServices) + } + + fn start(&mut self, service_id: ServiceId) -> Result<(), Error> { + Err(Error::Unavailable { service_id }) + } + + fn start_all(&mut self) -> Result<(), Error> { + Ok(()) + } + + fn stop(&mut self, service_id: ServiceId) -> Result<(), Error> { + Err(Error::Unavailable { service_id }) + } + + fn request_relay(&mut self, service_id: ServiceId) -> RelayResult { + Err(RelayError::InvalidRequest { to: service_id }) + } + + fn update_settings(&mut self, _settings: Self::Settings) -> Result<(), Error> { + Ok(()) + } + } + + #[test] + fn run_overwatch_then_stop() { + let overwatch = OverwatchRunner::::run((), None); + let mut handle = overwatch.handle().clone(); + + overwatch.spawn(async move { + sleep(Duration::from_millis(500)).await; + handle.shutdown().await; + }); + + overwatch.wait_finished(); + } + + #[test] + fn run_overwatch_then_kill() { + let overwatch = OverwatchRunner::::run((), None); + let mut handle = overwatch.handle().clone(); + + overwatch.spawn(async move { + sleep(Duration::from_millis(500)).await; + handle.kill().await; + }); + + overwatch.wait_finished(); + } +} diff --git a/overwatch/src/services/handle.rs b/overwatch/src/services/handle.rs new file mode 100644 index 0000000..aca71f3 --- /dev/null +++ b/overwatch/src/services/handle.rs @@ -0,0 +1,168 @@ +// std +use std::marker::PhantomData; +// crates +use futures::future::{abortable, AbortHandle}; +use tokio::runtime::{Handle, Runtime}; +use tracing::instrument; +// internal +use crate::overwatch::handle::OverwatchHandle; +use crate::services::relay::{relay, InboundRelay, OutboundRelay}; +use crate::services::settings::{SettingsNotifier, SettingsUpdater}; +use crate::services::state::{StateHandle, StateOperator, StateUpdater}; +use crate::services::{ServiceCore, ServiceId, ServiceState}; + +// TODO: Abstract handle over state, to diferentiate when the service is running and when it is not +// that way we can expose a better API depending on what is happenning. Would get rid of the probably +// unnecessary Option and cloning. +/// Service handle +/// This is used to access different parts of the service +pub struct ServiceHandle { + /// Service id (must match ``) + id: ServiceId, + /// Service runtime handle + runtime: Handle, + /// Message channel relay + /// Would be None if service is not running + /// Will contain the channel if service is running + outbound_relay: Option>, + /// Handle to overwatch + overwatch_handle: OverwatchHandle, + settings: SettingsUpdater, + initial_state: S::State, + _marker: PhantomData, +} + +/// Service core resources +/// It contains whatever is necessary to start a new service runner +pub struct ServiceStateHandle { + /// Service runtime handler + pub runtime: Handle, + /// Relay channel to communicate with the service runner + pub inbound_relay: InboundRelay, + /// Overwatch handle + pub overwatch_handle: OverwatchHandle, + pub settings_reader: SettingsNotifier, + pub state_updater: StateUpdater, + pub _lifecycle_handler: (), +} + +/// Main service executor +/// It is the object that hold the necessary information for the service to run +pub struct ServiceRunner { + #[allow(unused)] + overwatch_handle: OverwatchHandle, + service_state: ServiceStateHandle, + state_handle: StateHandle, +} + +impl ServiceHandle { + pub fn new( + settings: S::Settings, + overwatch_handle: OverwatchHandle, + ) -> (Option, Self) { + let id = S::SERVICE_ID; + let runtime = S::service_runtime(&settings, overwatch_handle.runtime()); + + let initial_state: S::State = S::State::from_settings(&settings); + + let handle = runtime.handle(); + let settings = SettingsUpdater::new(settings); + + ( + runtime.runtime(), + Self { + id, + runtime: handle, + outbound_relay: None, + settings, + initial_state, + overwatch_handle, + _marker: PhantomData::default(), + }, + ) + } + + pub fn id(&self) -> ServiceId { + self.id + } + + /// Service runtime getter + /// it is easily cloneable and can be done on demand + pub fn runtime(&self) -> &Handle { + &self.runtime + } + + /// Overwatch handle + /// it is easily cloneable and can be done on demand + pub fn overwatch_handle(&self) -> &OverwatchHandle { + &self.overwatch_handle + } + + /// Request a relay with this service + pub fn relay_with(&self) -> Option> { + self.outbound_relay.clone() + } + + /// Update settings + pub fn update_settings(&self, settings: S::Settings) { + self.settings.update(settings) + } + + /// Build a runner for this service + pub fn service_runner(&mut self) -> ServiceRunner { + // TODO: add proper status handling here, a service should be able to produce a runner if it is already running. + let (inbound_relay, outbound_relay) = relay::(S::SERVICE_RELAY_BUFFER_SIZE); + let settings_reader = self.settings.notifier(); + // add relay channel to handle + self.outbound_relay = Some(outbound_relay); + let settings = self.settings.notifier().get_updated_settings(); + let operator = S::StateOperator::from_settings::(settings); + let (state_handle, state_updater) = + StateHandle::::new(self.initial_state.clone(), operator); + + let service_state = ServiceStateHandle { + runtime: self.runtime.clone(), + inbound_relay, + overwatch_handle: self.overwatch_handle.clone(), + state_updater, + settings_reader, + _lifecycle_handler: (), + }; + + ServiceRunner { + overwatch_handle: self.overwatch_handle().clone(), + service_state, + state_handle, + } + } +} + +impl ServiceStateHandle { + pub fn id(&self) -> ServiceId { + S::SERVICE_ID + } +} + +impl ServiceRunner { + /// Spawn the service main loop and handle it lifecycle + /// Return a handle to abort execution manually + #[instrument(skip(self), fields(service_id=S::SERVICE_ID))] + pub fn run(self) -> AbortHandle { + let ServiceRunner { + service_state, + state_handle, + .. + } = self; + + let runtime = service_state.runtime.clone(); + let service = S::init(service_state); + let (runner, abortable_handle) = abortable(service.run()); + + runtime.spawn(runner); + runtime.spawn(state_handle.run()); + + // TODO: Handle service lifecycle + // TODO: this handle should not scape this scope, it should actually be handled in the lifecycle part mentioned above + abortable_handle + } +} diff --git a/overwatch/src/services/life_cycle.rs b/overwatch/src/services/life_cycle.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/overwatch/src/services/life_cycle.rs @@ -0,0 +1 @@ + diff --git a/overwatch/src/services/mod.rs b/overwatch/src/services/mod.rs new file mode 100644 index 0000000..b69b4a2 --- /dev/null +++ b/overwatch/src/services/mod.rs @@ -0,0 +1,89 @@ +pub mod handle; +pub mod life_cycle; +pub mod relay; +pub mod settings; +pub mod state; + +// std +use std::fmt::Debug; +// crates +use async_trait::async_trait; +use thiserror::Error; +use tokio::runtime; + +// internal +use crate::services::relay::RelayError; +use crate::services::state::StateOperator; +use handle::ServiceStateHandle; +use relay::RelayMessage; +use state::ServiceState; + +// TODO: Make this type unique for each service? +/// Services identification type +pub type ServiceId = &'static str; + +/// The core data a service needs to handle +/// Holds the necessary information of a service +pub trait ServiceData { + /// Service identification tag + const SERVICE_ID: ServiceId; + /// Service relay buffer size + const SERVICE_RELAY_BUFFER_SIZE: usize = 16; + /// Service settings object + type Settings: Clone; + /// Service state object + type State: ServiceState + Clone; + /// State operator + type StateOperator: StateOperator + Clone; + /// Service messages that the service itself understands and can react to + type Message: RelayMessage + Debug + Send + Sync; +} + +/// Main trait for Services initialization and main loop hook +#[async_trait] +pub trait ServiceCore: ServiceData + Send + Sized + 'static { + /// Initialize the service with the given state + fn init(service_state: ServiceStateHandle) -> Self; + + /// Service default runtime + fn service_runtime(_settings: &Self::Settings, _parent: &runtime::Handle) -> ServiceRuntime { + let id = Self::SERVICE_ID; + ServiceRuntime::Custom( + runtime::Builder::new_multi_thread() + .enable_all() + .thread_name(id) + .build() + .unwrap_or_else(|_| panic!("Async runtime for service {id} didn't build properly")), + ) + } + + /// Service main loop + async fn run(mut self); +} + +#[derive(Error, Debug)] +pub enum ServiceError { + #[error(transparent)] + RelayError(#[from] RelayError), +} + +pub enum ServiceRuntime { + FromParent(runtime::Handle), + Custom(runtime::Runtime), +} + +impl ServiceRuntime { + pub fn handle(&self) -> runtime::Handle { + match self { + ServiceRuntime::FromParent(handle) => handle.clone(), + ServiceRuntime::Custom(runtime) => runtime.handle().clone(), + } + } + + pub fn runtime(self) -> Option { + match self { + ServiceRuntime::Custom(runtime) => Some(runtime), + _ => None, + } + } +} diff --git a/overwatch/src/services/relay.rs b/overwatch/src/services/relay.rs new file mode 100644 index 0000000..29bd54c --- /dev/null +++ b/overwatch/src/services/relay.rs @@ -0,0 +1,196 @@ +// std +use std::any::Any; +use std::fmt::Debug; +// crates +use thiserror::Error; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::oneshot; +use tracing::{error, instrument}; +// internal +use crate::overwatch::commands::{OverwatchCommand, RelayCommand, ReplyChannel}; +use crate::overwatch::handle::OverwatchHandle; +use crate::services::{ServiceCore, ServiceId}; + +#[derive(Error, Debug)] +pub enum RelayError { + #[error("error requesting relay to {to} service")] + InvalidRequest { to: ServiceId }, + #[error("couldn't relay message")] + Send, + #[error("relay is already connected")] + AlreadyConnected, + #[error("service relay is disconnected")] + Disconnected, + #[error("service {service_id} is not available")] + Unavailable { service_id: ServiceId }, + #[error("invalid message with type id [{type_id}] for service {service_id}")] + InvalidMessage { + type_id: String, + service_id: &'static str, + }, + #[error("receiver failed due to {0:?}")] + Receiver(Box), +} + +/// Message wrapper type +pub type AnyMessage = Box; + +#[derive(Debug, Clone)] +pub struct NoMessage; + +impl RelayMessage for NoMessage {} + +/// Result type when creating a relay connection +pub type RelayResult = Result; + +/// Marker type for relay messages +/// Notice that it is bound to 'static. +pub trait RelayMessage: 'static {} + +enum RelayState { + Disconnected, + Connected(OutboundRelay), +} + +impl Clone for RelayState { + fn clone(&self) -> Self { + match self { + RelayState::Disconnected => RelayState::Disconnected, + RelayState::Connected(outbound) => RelayState::Connected(outbound.clone()), + } + } +} + +/// Channel receiver of a relay connection +pub struct InboundRelay { + receiver: Receiver, + _stats: (), // placeholder +} + +/// Channel sender of a relay connection +pub struct OutboundRelay { + sender: Sender, + _stats: (), // placeholder +} + +pub struct Relay { + state: RelayState, + overwatch_handle: OverwatchHandle, +} + +impl Clone for Relay { + fn clone(&self) -> Self { + Self { + state: self.state.clone(), + overwatch_handle: self.overwatch_handle.clone(), + } + } +} + +// TODO: make buffer_size const? +/// Relay channel builder +pub fn relay(buffer_size: usize) -> (InboundRelay, OutboundRelay) { + let (sender, receiver) = channel(buffer_size); + ( + InboundRelay { + receiver, + _stats: (), + }, + OutboundRelay { sender, _stats: () }, + ) +} + +impl InboundRelay { + /// Receive a message from the relay connections + pub async fn recv(&mut self) -> Option { + self.receiver.recv().await + } +} + +impl OutboundRelay { + /// Send a message to the relay connection + pub async fn send(&mut self, message: M) -> Result<(), (RelayError, M)> { + self.sender + .send(message) + .await + .map_err(|e| (RelayError::Send, e.0)) + } +} + +impl Clone for OutboundRelay { + fn clone(&self) -> Self { + Self { + sender: self.sender.clone(), + _stats: (), + } + } +} + +impl Relay { + pub fn new(overwatch_handle: OverwatchHandle) -> Self { + Self { + state: RelayState::Disconnected, + overwatch_handle, + } + } + + #[instrument(skip(self), err(Debug))] + pub async fn connect(&mut self) -> Result<(), RelayError> { + if let RelayState::Disconnected = self.state { + let (reply, receiver) = oneshot::channel(); + self.request_relay(reply).await; + self.handle_relay_response(receiver).await + } else { + Err(RelayError::AlreadyConnected) + } + } + + #[instrument(skip(self), err(Debug))] + pub fn disconnect(&mut self) -> Result<(), RelayError> { + self.state = RelayState::Disconnected; + Ok(()) + } + + #[instrument(skip_all, err(Debug))] + pub async fn send(&mut self, message: S::Message) -> Result<(), RelayError> { + // TODO: we could make a retry system and/or add timeouts + if let RelayState::Connected(outbound_relay) = &mut self.state { + outbound_relay + .send(message) + .await + .map_err(|(e, _message)| e) + } else { + Err(RelayError::Disconnected) + } + } + + async fn request_relay(&mut self, reply: oneshot::Sender) { + let relay_command = OverwatchCommand::Relay(RelayCommand { + service_id: S::SERVICE_ID, + reply_channel: ReplyChannel(reply), + }); + self.overwatch_handle.send(relay_command).await; + } + + #[instrument(skip_all, err(Debug))] + async fn handle_relay_response( + &mut self, + receiver: oneshot::Receiver, + ) -> Result<(), RelayError> { + let response = receiver.await; + match response { + Ok(Ok(message)) => match message.downcast::>() { + Ok(channel) => { + self.state = RelayState::Connected(*channel); + Ok(()) + } + Err(m) => Err(RelayError::InvalidMessage { + type_id: format!("{:?}", m.type_id()), + service_id: S::SERVICE_ID, + }), + }, + Ok(Err(e)) => Err(e), + Err(e) => Err(RelayError::Receiver(Box::new(e))), + } + } +} diff --git a/overwatch/src/services/settings.rs b/overwatch/src/services/settings.rs new file mode 100644 index 0000000..0cf6a85 --- /dev/null +++ b/overwatch/src/services/settings.rs @@ -0,0 +1,90 @@ +//std +//crates +use tokio::sync::watch::{channel, Receiver, Sender}; +use tracing::{error, instrument}; +//internal + +/// Wrapper around [`tokio::sync::watch::Receiver`] +pub struct SettingsNotifier { + notifier_channel: Receiver, +} + +impl SettingsNotifier { + pub fn new(notifier_channel: Receiver) -> Self { + Self { notifier_channel } + } + + /// Get latest settings, it is guaranteed that at least an initial value is present + /// This returns a cloned version of the referenced settings. It simplifies the API + /// at the expense of some efficiency. + // TODO: Maybe we can consider returning the Ref<> from the borrowing. But in doing would be + // be blocking the updating channel so this responsibility would be dumped into the end user + // of the method. Another option would be to spawn a task that updates a settings local value + // each time an updated settings is received. This could not be so easy to do, since it will + // need to hold a &mut to the holder (or needed to use a Cell/RefCell). + pub fn get_updated_settings(&mut self) -> S { + self.notifier_channel.borrow().clone() + } +} + +/// Settings update notification sender +pub struct SettingsUpdater { + sender: Sender, + receiver: Receiver, +} + +impl SettingsUpdater { + pub fn new(settings: S) -> Self { + let (sender, receiver) = channel(settings); + + Self { sender, receiver } + } + + /// Send a new settings update notification to the watcher end + #[instrument(skip_all)] + pub fn update(&self, settings: S) { + self.sender.send(settings).unwrap_or_else(|_e| { + error!("Error sending settings update for service"); + }); + } + + /// Get a new notifier channel, used to get latest settings changes updates + pub fn notifier(&self) -> SettingsNotifier { + SettingsNotifier { + notifier_channel: self.receiver.clone(), + } + } +} + +#[cfg(test)] +mod test { + use crate::services::settings::SettingsUpdater; + use std::collections::HashSet; + use std::time::Duration; + use tokio::time::sleep; + use tokio::time::timeout; + + #[tokio::test] + async fn settings_updater_sequence() { + let updater = SettingsUpdater::new(10usize); + let mut notifier = updater.notifier(); + let values = [10, 0usize]; + let mut seq = HashSet::from(values); + let handle = tokio::spawn(timeout(Duration::from_secs(3), async move { + while !seq.is_empty() { + let new_value = notifier.get_updated_settings(); + seq.remove(&new_value); + sleep(Duration::from_millis(50)).await; + } + true + })); + sleep(Duration::from_millis(100)).await; + for v in &values[1..] { + updater.update(*v); + sleep(Duration::from_millis(100)).await; + } + // all values updates have been seen + let success: Result = handle.await.unwrap(); + assert!(success.unwrap()); + } +} diff --git a/overwatch/src/services/state.rs b/overwatch/src/services/state.rs new file mode 100644 index 0000000..3469433 --- /dev/null +++ b/overwatch/src/services/state.rs @@ -0,0 +1,204 @@ +// std +use std::marker::PhantomData; +use std::sync::Arc; + +// crates +use async_trait::async_trait; +use futures::StreamExt; +use tokio::sync::watch::{channel, Receiver, Ref, Sender}; +use tokio_stream::wrappers::WatchStream; +use tracing::error; +// internal + +// TODO: Constrain this, probably with needed serialize/deserialize options. +/// Service state initialization traits +/// It defines what is needed for a service state to be initialized. +/// Need what set of settings information is required for it to be initialized [`ServiceState::Settings`] +/// which usually is bound to the service itself [`crate::services::ServiceData::Settings`] +pub trait ServiceState: Send + Sync + 'static { + /// Settings object that the state can be initialized from + type Settings; + /// Initialize a stage upon the provided settings + fn from_settings(settings: &Self::Settings) -> Self; +} + +/// A state operator is an entity that can handle a state in a point of time +/// to perform any operation based on it. +#[async_trait] +pub trait StateOperator: Send { + /// The type of state that the operator can handle + type StateInput: ServiceState; + /// Operator initialization method. Can be implemented over some subset of settings + fn from_settings(settings: Settings) -> Self; + /// Asynchronously perform an operation for a given state + async fn run(&mut self, state: Self::StateInput); +} + +/// Operator that doesn't perform any operation upon state update +#[derive(Clone, Copy)] +pub struct NoOperator(PhantomData); + +#[async_trait] +impl StateOperator for NoOperator { + type StateInput = StateInput; + + fn from_settings(_settings: Settings) -> Self { + NoOperator(PhantomData::default()) + } + + async fn run(&mut self, _state: Self::StateInput) {} +} + +/// Empty state +#[derive(Clone, Copy)] +pub struct NoState(PhantomData); + +impl ServiceState for NoState { + type Settings = Settings; + + fn from_settings(_settings: &Self::Settings) -> Self { + Self(Default::default()) + } +} + +/// Receiver part of the state handling mechanism. +/// A state handle watches a stream of incoming states and triggers the attached operator handling +/// method over it. +#[derive(Clone)] +pub struct StateHandle> { + watcher: StateWatcher, + operator: Operator, +} + +/// Sender part of the state handling mechanism. +/// Update the current state and notifies the [`StateHandle`]. +#[derive(Clone)] +pub struct StateUpdater { + sender: Arc>, +} + +/// Wrapper over [`tokio::sync::watch::Receiver`] +#[derive(Clone)] +pub struct StateWatcher { + receiver: Receiver, +} + +impl StateUpdater { + /// Send a new state and notify the [`StateWatcher`] + pub fn update(&mut self, new_state: S) { + self.sender.send(new_state).unwrap_or_else(|_e| { + error!("Error updating state"); + }); + } +} + +impl StateWatcher +where + S: ServiceState + Clone, +{ + /// Get a copy of the most updated state + pub fn state_cloned(&self) -> S { + self.receiver.borrow().clone() + } +} + +impl StateWatcher +where + S: ServiceState, +{ + /// Get a [`Ref`](tokio::sync::watch::Ref) to the last state, this blocks incoming updates until + /// the `Ref` is dropped. Use with caution. + pub fn state_ref(&self) -> Ref { + self.receiver.borrow() + } +} + +impl StateHandle +where + S: ServiceState + Clone, + Operator: StateOperator, +{ + pub fn new(initial_state: S, operator: Operator) -> (Self, StateUpdater) { + let (sender, receiver) = channel(initial_state); + let watcher = StateWatcher { receiver }; + let updater = StateUpdater { + sender: Arc::new(sender), + }; + + (Self { watcher, operator }, updater) + } + + /// Wait for new state updates and run the operator handling method + pub async fn run(self) { + let Self { + watcher, + mut operator, + } = self; + let mut state_stream = WatchStream::new(watcher.receiver); + while let Some(state) = state_stream.next().await { + operator.run(state).await; + } + } +} + +#[cfg(test)] +mod test { + use crate::services::state::{ServiceState, StateHandle, StateOperator, StateUpdater}; + use async_trait::async_trait; + use std::time::Duration; + use tokio::io; + use tokio::io::AsyncWriteExt; + use tokio::time::sleep; + + #[derive(Clone)] + struct UsizeCounter(usize); + + impl ServiceState for UsizeCounter { + type Settings = (); + + fn from_settings(_settings: &Self::Settings) -> Self { + Self(0) + } + } + + struct PanicOnGreaterThanTen; + + #[async_trait] + impl StateOperator for PanicOnGreaterThanTen { + type StateInput = UsizeCounter; + + fn from_settings(_settings: Settings) -> Self { + Self + } + + async fn run(&mut self, state: Self::StateInput) { + let mut stdout = io::stdout(); + let UsizeCounter(value) = state; + stdout + .write_all(format!("{value}\n").as_bytes()) + .await + .expect("stop Output wrote"); + assert!(value < 10); + } + } + + #[tokio::test] + #[should_panic] + async fn state_stream_collects() { + let (handle, mut updater): ( + StateHandle, + StateUpdater, + ) = StateHandle::new( + UsizeCounter::from_settings(&()), + PanicOnGreaterThanTen::from_settings(()), + ); + tokio::task::spawn(async move { + sleep(Duration::from_millis(50)).await; + for i in 0..15 { + updater.update(UsizeCounter(i)); + sleep(Duration::from_millis(50)).await; + } + }); + handle.run().await; + } +} diff --git a/overwatch/src/utils/const_checks.rs b/overwatch/src/utils/const_checks.rs new file mode 100644 index 0000000..2871f07 --- /dev/null +++ b/overwatch/src/utils/const_checks.rs @@ -0,0 +1,35 @@ +use crate::services::ServiceId; + +pub const fn unique_ids(to_check: &[ServiceId]) -> bool { + if to_check.is_empty() { + return true; + } + let mut i: usize = 0; + let mut j: usize = 1; + while i < to_check.len() - 1 { + if const_str::equal!(to_check[i], to_check[j]) { + return false; + } + j += 1; + if j >= to_check.len() { + i += 1; + j = i + 1; + } + } + true +} + +#[cfg(test)] +mod test { + use crate::utils::const_checks::unique_ids; + + #[test] + fn test_unique_ids() { + // this shouldn't even compile if checks fails + const _: () = assert!(unique_ids(&["A", "B"])); + const _: () = assert!(!unique_ids(&["A", "A"])); + + const _: () = assert!(unique_ids(&[])); + const _: () = assert!(unique_ids(&["A"])); + } +} diff --git a/overwatch/src/utils/mod.rs b/overwatch/src/utils/mod.rs new file mode 100644 index 0000000..6b1ad9c --- /dev/null +++ b/overwatch/src/utils/mod.rs @@ -0,0 +1,2 @@ +pub mod const_checks; +pub mod runtime; diff --git a/overwatch/src/utils/runtime.rs b/overwatch/src/utils/runtime.rs new file mode 100644 index 0000000..932460a --- /dev/null +++ b/overwatch/src/utils/runtime.rs @@ -0,0 +1,9 @@ +use crate::overwatch::OVERWATCH_THREAD_NAME; + +pub fn default_multithread_runtime() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name(OVERWATCH_THREAD_NAME) + .build() + .expect("Async runtime to build properly") +} diff --git a/overwatch/tests/print_service.rs b/overwatch/tests/print_service.rs new file mode 100644 index 0000000..c6ddab3 --- /dev/null +++ b/overwatch/tests/print_service.rs @@ -0,0 +1,116 @@ +use async_trait::async_trait; +use futures::future::select; +use overwatch::overwatch::OverwatchRunner; +use overwatch::services::handle::{ServiceHandle, ServiceStateHandle}; +use overwatch::services::relay::RelayMessage; +use overwatch::services::state::{NoOperator, NoState}; +use overwatch::services::{ServiceCore, ServiceData, ServiceId}; +use overwatch_derive::Services; +use std::time::Duration; +use tokio::time::sleep; + +pub struct PrintService { + state: ServiceStateHandle, +} + +#[derive(Clone, Debug)] +pub struct PrintServiceMessage(String); + +impl RelayMessage for PrintServiceMessage {} + +impl ServiceData for PrintService { + const SERVICE_ID: ServiceId = "FooService"; + type Settings = (); + type State = NoState; + type StateOperator = NoOperator; + type Message = PrintServiceMessage; +} + +#[async_trait] +impl ServiceCore for PrintService { + fn init(state: ServiceStateHandle) -> Self { + Self { state } + } + + async fn run(mut self) { + use tokio::io::{self, AsyncWriteExt}; + + let Self { + state: ServiceStateHandle { + mut inbound_relay, .. + }, + } = self; + + let print = async move { + let mut stdout = io::stdout(); + while let Some(message) = inbound_relay.recv().await { + match message.0.as_ref() { + "stop" => { + stdout + .write_all(b"Printing service stopping\n") + .await + .expect("stop Output wrote"); + break; + } + m => { + stdout + .write_all(format!("{m}\n").as_bytes()) + .await + .expect("Message output wrote"); + } + } + } + }; + + let idle = async move { + let mut stdout = io::stdout(); + loop { + stdout + .write_all(b"Waiting for print process to finish...\n") + .await + .expect("Message output wrote"); + sleep(Duration::from_millis(50)).await; + } + }; + + select(Box::pin(print), Box::pin(idle)).await; + } +} + +#[derive(Services)] +struct TestApp { + print_service: ServiceHandle, +} + +#[test] +fn derive_print_service() { + let settings: TestAppServiceSettings = TestAppServiceSettings { print_service: () }; + let overwatch = OverwatchRunner::::run(settings, None); + let mut handle = overwatch.handle().clone(); + let mut print_service_relay = handle.relay::(); + + overwatch.spawn(async move { + print_service_relay + .connect() + .await + .expect("A connection to the print service is established"); + + for _ in 0..3 { + print_service_relay + .send(PrintServiceMessage("Hey oh let's go!".to_string())) + .await + .expect("Message is sent"); + } + sleep(Duration::from_millis(50)).await; + print_service_relay + .send(PrintServiceMessage("stop".to_string())) + .await + .expect("stop message to be sent"); + }); + + overwatch.spawn(async move { + sleep(Duration::from_secs(1)).await; + handle.shutdown().await; + }); + overwatch.wait_finished(); +} diff --git a/overwatch/tests/settings_update.rs b/overwatch/tests/settings_update.rs new file mode 100644 index 0000000..f8d8e23 --- /dev/null +++ b/overwatch/tests/settings_update.rs @@ -0,0 +1,85 @@ +use async_trait::async_trait; +use overwatch::overwatch::OverwatchRunner; +use overwatch::services::handle::{ServiceHandle, ServiceStateHandle}; +use overwatch::services::relay::RelayMessage; +use overwatch::services::state::{NoOperator, NoState}; +use overwatch::services::{ServiceCore, ServiceData, ServiceId}; +use overwatch_derive::Services; +use std::time::Duration; +use tokio::time::sleep; + +pub struct SettingsService { + state: ServiceStateHandle, +} + +type SettingsServiceSettings = String; + +#[derive(Clone, Debug)] +pub struct SettingsMsg; + +impl RelayMessage for SettingsMsg {} + +impl ServiceData for SettingsService { + const SERVICE_ID: ServiceId = "FooService"; + type Settings = SettingsServiceSettings; + type State = NoState; + type StateOperator = NoOperator; + type Message = SettingsMsg; +} + +#[async_trait] +impl ServiceCore for SettingsService { + fn init(state: ServiceStateHandle) -> Self { + Self { state } + } + + async fn run(mut self) { + let Self { + state: + ServiceStateHandle { + mut settings_reader, + .. + }, + } = self; + + let print = async move { + let mut asserted = false; + for _ in 0..10 { + let new_settings = settings_reader.get_updated_settings(); + if new_settings.as_str() == "New settings" { + asserted = true; + } + sleep(Duration::from_millis(50)).await; + } + // TODO: when [this](https://github.com/ockam-network/ockam/issues/2479) + // or (https://github.com/tokio-rs/tokio/issues/2002) lands + // update so this panic is not just a print and the test get actually aborted + assert!(asserted); + }; + print.await; + } +} + +#[derive(Services)] +struct TestApp { + settings_service: ServiceHandle, +} + +#[test] +fn settings_service_update_settings() { + let mut settings: TestAppServiceSettings = TestAppServiceSettings { + settings_service: SettingsServiceSettings::default(), + }; + let overwatch = OverwatchRunner::::run(settings.clone(), None); + let handle = overwatch.handle().clone(); + let mut handle2 = handle.clone(); + settings.settings_service = "New settings".to_string(); + overwatch.spawn(async move { handle.clone().update_settings::(settings).await }); + + overwatch.spawn(async move { + sleep(Duration::from_secs(1)).await; + handle2.shutdown().await; + }); + + overwatch.wait_finished(); +} diff --git a/overwatch/tests/state_handling.rs b/overwatch/tests/state_handling.rs new file mode 100644 index 0000000..08289bc --- /dev/null +++ b/overwatch/tests/state_handling.rs @@ -0,0 +1,101 @@ +use async_trait::async_trait; +use overwatch::overwatch::OverwatchRunner; +use overwatch::services::handle::{ServiceHandle, ServiceStateHandle}; +use overwatch::services::relay::RelayMessage; +use overwatch::services::state::{ServiceState, StateOperator}; +use overwatch::services::{ServiceCore, ServiceData, ServiceId}; +use overwatch_derive::Services; +use std::time::Duration; +use tokio::io::{self, AsyncWriteExt}; +use tokio::time::sleep; + +pub struct UpdateStateService { + state: ServiceStateHandle, +} + +#[derive(Clone, Debug)] +pub struct UpdateStateServiceMessage(String); + +impl RelayMessage for UpdateStateServiceMessage {} + +#[derive(Clone)] +pub struct CounterState { + value: usize, +} + +impl ServiceState for CounterState { + type Settings = (); + + fn from_settings(_settings: &Self::Settings) -> Self { + Self { value: 0 } + } +} + +#[derive(Clone)] +pub struct CounterStateOperator; + +#[async_trait] +impl StateOperator for CounterStateOperator { + type StateInput = CounterState; + + fn from_settings(_settings: Settings) -> Self { + CounterStateOperator + } + + async fn run(&mut self, state: Self::StateInput) { + let value = state.value; + let mut stdout = io::stdout(); + stdout + .write_all(format!("Updated state value received: {value}\n").as_bytes()) + .await + .expect("stop Output wrote"); + assert!(value < 10); + } +} + +impl ServiceData for UpdateStateService { + const SERVICE_ID: ServiceId = "FooService"; + type Settings = (); + type State = CounterState; + type StateOperator = CounterStateOperator; + type Message = UpdateStateServiceMessage; +} + +#[async_trait] +impl ServiceCore for UpdateStateService { + fn init(state: ServiceStateHandle) -> Self { + Self { state } + } + + async fn run(mut self) { + let Self { + state: ServiceStateHandle { + mut state_updater, .. + }, + } = self; + for value in 0..10 { + state_updater.update(CounterState { value }); + sleep(Duration::from_millis(50)).await; + } + } +} + +#[derive(Services)] +struct TestApp { + update_state_service: ServiceHandle, +} + +#[test] +fn state_update_service() { + let settings: TestAppServiceSettings = TestAppServiceSettings { + update_state_service: (), + }; + let overwatch = OverwatchRunner::::run(settings, None); + let mut handle = overwatch.handle().clone(); + + overwatch.spawn(async move { + sleep(Duration::from_secs(1)).await; + handle.shutdown().await; + }); + overwatch.wait_finished(); +}