Public release

This commit is contained in:
Daniel Sanchez Quiros 2022-10-20 11:47:50 +02:00
commit ad32d12019
25 changed files with 2503 additions and 0 deletions

74
.github/workflows/main.yml vendored Normal file
View File

@ -0,0 +1,74 @@
# copy of https://github.com/actions-rs/meta/blob/master/recipes/quickstart.md
on: [push, pull_request]
name: CI
jobs:
check:
name: Check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
submodules: true
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- uses: actions-rs/cargo@v1
continue-on-error: false
with:
command: check
test:
name: Test Suite
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
with:
submodules: true
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- uses: actions-rs/cargo@v1
continue-on-error: false
with:
command: build
- uses: actions-rs/cargo@v1
continue-on-error: false
with:
command: test
lints:
name: Rust lints
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
submodules: true
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
components: rustfmt, clippy
- name: Run cargo fmt
uses: actions-rs/cargo@v1
continue-on-error: false
with:
command: fmt
args: --all -- --check
- name: Run cargo clippy
uses: actions-rs/cargo@v1
continue-on-error: false
with:
command: clippy
args: -- --deny warnings

398
Cargo.lock generated Normal file
View File

@ -0,0 +1,398 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "async-trait"
version = "0.1.58"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e805d94e6b5001b651426cf4cd446b1ab5f319d27bab5c644f61de0a804360c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "bytes"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "const-str"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21077772762a1002bb421c3af42ac1725fa56066bfc53d9a55bb79905df2aaf3"
dependencies = [
"const-str-proc-macro",
]
[[package]]
name = "const-str-proc-macro"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e1e0fdd2e5d3041e530e1b21158aeeef8b5d0e306bc5c1e3d6cf0930d10e25a"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac"
[[package]]
name = "futures-executor"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb"
[[package]]
name = "futures-macro"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9"
[[package]]
name = "futures-task"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea"
[[package]]
name = "futures-util"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
name = "heck"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
[[package]]
name = "hermit-abi"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
dependencies = [
"libc",
]
[[package]]
name = "libc"
version = "0.2.135"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68783febc7782c6c5cb401fbda4de5a9898be1762314da0bb2c10ced61f18b0c"
[[package]]
name = "memchr"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "num_cpus"
version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "once_cell"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e82dad04139b71a90c080c8463fe0dc7902db5192d939bd0950f074d014339e1"
[[package]]
name = "overwatch"
version = "0.1.0"
dependencies = [
"async-trait",
"const-str",
"futures",
"overwatch-derive",
"thiserror",
"tokio",
"tokio-stream",
"tracing",
]
[[package]]
name = "overwatch-derive"
version = "0.1.0"
dependencies = [
"heck",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
"tracing",
]
[[package]]
name = "pin-project-lite"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]]
name = "proc-macro2"
version = "1.0.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179"
dependencies = [
"proc-macro2",
]
[[package]]
name = "slab"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef"
dependencies = [
"autocfg",
]
[[package]]
name = "syn"
version = "1.0.102"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fcd952facd492f9be3ef0d0b7032a6e442ee9b361d4acc2b1d0c4aaa5f613a1"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "thiserror"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tokio"
version = "1.21.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099"
dependencies = [
"autocfg",
"bytes",
"memchr",
"num_cpus",
"pin-project-lite",
"tokio-macros",
]
[[package]]
name = "tokio-macros"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tokio-stream"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
name = "tokio-util"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tracing"
version = "0.1.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
dependencies = [
"cfg-if",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing-core"
version = "0.1.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a"
dependencies = [
"once_cell",
]
[[package]]
name = "unicode-ident"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ceab39d59e4c9499d4e5a8ee0e2735b891bb7308ac83dfb4e80cad195c9f6f3"
[[package]]
name = "version_check"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"

10
Cargo.toml Normal file
View File

@ -0,0 +1,10 @@
[workspace]
members = [
"overwatch",
"overwatch-derive",
]
[profile.release-opt]
inherits = "release"
lto = true

25
LICENSE Normal file
View File

@ -0,0 +1,25 @@
Overwatch is licensed under the MIT License
Copyright (c) 2018 Status Research & Development GmbH
-----------------------------------------------------
The MIT License (MIT)
Copyright (c) 2022 Status Research & Development GmbH
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

45
README.md Normal file
View File

@ -0,0 +1,45 @@
# Overwatch
Overwatch is a framework to easily construct applications that requires of several independent
parts that needs communication between them.
Everything is self-contained, and it matches somewhat the advantages of microservices.
## Design Goals
- Modularity:
- Components should be self-contained (as possible)
- Communication relations between components should be specifically defined
- Components should be mockable. This is rather important for measurements and testing.
- Single responsibility:
- It is easier to isolate problems
- Minimal sharing when unavoidable
- Debuggeability
- Easy to track workflow
- Easy to test
- Easy to measure
- Asynchronous Communication
## Main components
- Overwatch: the main messenger relay component (internal communications). It is also be responsible of managing other components lifecycle and handling configuration updates.
- Services (handled by the *overwatch*)
## Project Structure
* `overwatch`: Services runner framework
* `overwatch-derive`: Overwatch macros
## Build & Test
Minimal Rust supported version: `1.63`
When in development, please, use `cargo clippy` to build the project. Any warning is promoted to an error in our CI.
* Use `cargo test` for executing tests, and `cargo test -- --nocapture` for seeing test outputs.
* Use `cargo run --exampel {example_name}` to run an example.
### Build Documentation
Simply run `cargo doc --open --no-deps` to build and access a copy of the generated documentation.

View File

@ -0,0 +1,19 @@
[package]
name = "overwatch-derive"
version = "0.1.0"
edition = "2021"
authors = [
"Daniel Sanchez Quiros <danielsq@status.im>"
]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
proc-macro = true
[dependencies]
heck = "0.4"
syn = "1.0"
quote = "1.0"
proc-macro2 = "1.0"
proc-macro-error = "1.0"
tracing = "0.1"

293
overwatch-derive/src/lib.rs Normal file
View File

@ -0,0 +1,293 @@
mod utils;
use proc_macro_error::{abort_call_site, proc_macro_error};
use quote::{format_ident, quote};
use syn::{punctuated::Punctuated, token::Comma, Data, DeriveInput, Field};
#[proc_macro_derive(Services)]
#[proc_macro_error]
pub fn derive_services(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
let input: DeriveInput = syn::parse(input).expect("A syn parseable token stream");
let derived = impl_services(&input);
derived.into()
}
fn service_settings_identifier_from(
services_identifier: &proc_macro2::Ident,
) -> proc_macro2::Ident {
format_ident!("{}ServiceSettings", services_identifier)
}
fn service_settings_field_identifier_from(
field_identifier: &proc_macro2::Ident,
) -> proc_macro2::Ident {
format_ident!("{}_settings", field_identifier)
}
fn impl_services(input: &DeriveInput) -> proc_macro2::TokenStream {
use syn::DataStruct;
let struct_identifier = &input.ident;
let data = &input.data;
match data {
Data::Struct(DataStruct {
fields: syn::Fields::Named(fields),
..
}) => impl_services_for_struct(struct_identifier, &fields.named),
_ => {
abort_call_site!("Deriving Services is only supported for named Structs");
}
}
}
fn impl_services_for_struct(
identifier: &proc_macro2::Ident,
fields: &Punctuated<Field, Comma>,
) -> proc_macro2::TokenStream {
let settings = generate_services_settings(identifier, fields);
let unique_ids_check = generate_assert_unique_identifiers(identifier, fields);
let services_impl = generate_services_impl(identifier, fields);
quote! {
#unique_ids_check
#settings
#services_impl
}
}
fn generate_services_settings(
services_identifier: &proc_macro2::Ident,
fields: &Punctuated<Field, Comma>,
) -> proc_macro2::TokenStream {
let services_settings = fields.iter().map(|field| {
let service_name = field.ident.as_ref().expect("A named struct attribute");
let _type = utils::extract_type_from(&field.ty);
quote!(pub #service_name: <#_type as ::overwatch::services::ServiceData>::Settings)
});
let services_settings_identifier = service_settings_identifier_from(services_identifier);
quote! {
#[derive(::std::clone::Clone, ::std::fmt::Debug)]
pub struct #services_settings_identifier {
#( #services_settings ),*
}
}
}
fn generate_assert_unique_identifiers(
services_identifier: &proc_macro2::Ident,
fields: &Punctuated<Field, Comma>,
) -> proc_macro2::TokenStream {
let services_ids = fields.iter().map(|field| {
let _type = utils::extract_type_from(&field.ty);
quote! {
<#_type as ::overwatch::services::ServiceData>::SERVICE_ID
}
});
let services_ids_check = format_ident!(
"__{}__CONST_CHECK_UNIQUE_SERVICES_IDS",
services_identifier.to_string().to_uppercase()
);
quote! {
const #services_ids_check: () = assert!(::overwatch::utils::const_checks::unique_ids(&[#( #services_ids ),*]));
}
}
fn generate_services_impl(
services_identifier: &proc_macro2::Ident,
fields: &Punctuated<Field, Comma>,
) -> proc_macro2::TokenStream {
let services_settings_identifier = service_settings_identifier_from(services_identifier);
let impl_new = generate_new_impl(fields);
let impl_start_all = generate_start_all_impl(fields);
let impl_start = generate_start_impl(fields);
let impl_stop = generate_stop_impl(fields);
let impl_relay = generate_request_relay_impl(fields);
let impl_update_settings = generate_update_settings_impl(fields);
quote! {
impl ::overwatch::overwatch::Services for #services_identifier {
type Settings = #services_settings_identifier;
#impl_new
#impl_start_all
#impl_start
#impl_stop
#impl_relay
#impl_update_settings
}
}
}
fn generate_new_impl(fields: &Punctuated<Field, Comma>) -> proc_macro2::TokenStream {
let fields_settings = fields.iter().map(|field| {
let field_identifier = field.ident.as_ref().expect("A struct attribute identifier");
let settings_field_identifier = service_settings_field_identifier_from(field_identifier);
quote! {
#field_identifier: #settings_field_identifier
}
});
let managers = fields.iter().map(|field| {
let field_identifier = field.ident.as_ref().expect("A struct attribute identifier");
let service_type = utils::extract_type_from(&field.ty);
let settings_field_identifier = service_settings_field_identifier_from(field_identifier);
quote! {
#field_identifier: {
let (runtime, manager) =
::overwatch::services::handle::ServiceHandle::<#service_type>::new(
#settings_field_identifier, overwatch_handle.clone(),
);
runtimes.insert(<#service_type as ::overwatch::services::ServiceData>::SERVICE_ID, runtime);
manager
}
}
});
quote! {
fn new(settings: Self::Settings, overwatch_handle: ::overwatch::overwatch::handle::OverwatchHandle) -> (
std::collections::HashMap<::overwatch::services::ServiceId, ::std::option::Option<::tokio::runtime::Runtime>>,
Self
) {
let Self::Settings {
#( #fields_settings ),*
} = settings;
let mut runtimes = ::std::collections::HashMap::new();
let app = Self {
#( #managers ),*
};
(runtimes, app)
}
}
}
fn generate_start_all_impl(fields: &Punctuated<Field, Comma>) -> proc_macro2::TokenStream {
let call_start = fields.iter().map(|field| {
let field_identifier = field.ident.as_ref().expect("A struct attribute identifier");
quote! {
self.#field_identifier.service_runner().run();
}
});
quote! {
#[::tracing::instrument(skip(self), err)]
fn start_all(&mut self) -> Result<(), ::overwatch::overwatch::Error> {
#( #call_start )*
Ok(())
}
}
}
fn generate_start_impl(fields: &Punctuated<Field, Comma>) -> proc_macro2::TokenStream {
let cases = fields.iter().map(|field| {
let field_identifier = field.ident.as_ref().expect("A struct attribute identifier");
let type_id = utils::extract_type_from(&field.ty);
quote! {
<#type_id as ::overwatch::services::ServiceData>::SERVICE_ID => {
self.#field_identifier.service_runner().run();
Ok(())
}
}
});
quote! {
#[::tracing::instrument(skip(self), err)]
fn start(&mut self, service_id: ::overwatch::services::ServiceId) -> Result<(), ::overwatch::overwatch::Error> {
match service_id {
#( #cases ),*
service_id => Err(::overwatch::overwatch::Error::Unavailable { service_id })
}
}
}
}
fn generate_stop_impl(fields: &Punctuated<Field, Comma>) -> proc_macro2::TokenStream {
let cases = fields.iter().map(|field| {
let _field_identifier = field.ident.as_ref().expect("A struct attribute identifier");
let type_id = utils::extract_type_from(&field.ty);
// TODO: actually stop them here once service lifecycle is implemented
quote! {
<#type_id as ::overwatch::services::ServiceData>::SERVICE_ID => { unimplemented!() }
}
});
quote! {
#[::tracing::instrument(skip(self), err)]
fn stop(&mut self, service_id: ::overwatch::services::ServiceId) -> Result<(), ::overwatch::overwatch::Error> {
match service_id {
#( #cases ),*
service_id => Err(::overwatch::overwatch::Error::Unavailable { service_id })
}
}
}
}
fn generate_request_relay_impl(fields: &Punctuated<Field, Comma>) -> proc_macro2::TokenStream {
let cases = fields.iter().map(|field| {
let field_identifier = field.ident.as_ref().expect("A struct attribute identifier");
let type_id = utils::extract_type_from(&field.ty);
quote! {
<#type_id as ::overwatch::services::ServiceData>::SERVICE_ID => {
Ok(::std::boxed::Box::new(
self.#field_identifier
.relay_with()
.expect("An open relay to service is established")
) as ::overwatch::services::relay::AnyMessage)
}
}
});
quote! {
#[::tracing::instrument(skip(self), err)]
fn request_relay(&mut self, service_id: ::overwatch::services::ServiceId) -> ::overwatch::services::relay::RelayResult {
{
match service_id {
#( #cases )*
service_id => Err(::overwatch::services::relay::RelayError::Unavailable { service_id })
}
}
}
}
}
fn generate_update_settings_impl(fields: &Punctuated<Field, Comma>) -> proc_macro2::TokenStream {
let fields_settings = fields.iter().map(|field| {
let field_identifier = field.ident.as_ref().expect("A struct attribute identifier");
let settings_field_identifier = service_settings_field_identifier_from(field_identifier);
quote! {
#field_identifier: #settings_field_identifier
}
});
let update_settings_call = fields.iter().map(|field| {
let field_identifier = field.ident.as_ref().expect("A struct attribute identifier");
let settings_field_identifier = service_settings_field_identifier_from(field_identifier);
quote! {
self.#field_identifier.update_settings(#settings_field_identifier);
}
});
quote! {
#[::tracing::instrument(skip(self, settings), err)]
fn update_settings(&mut self, settings: Self::Settings) -> Result<(), ::overwatch::overwatch::Error> {
let Self::Settings {
#( #fields_settings ),*
} = settings;
#( #update_settings_call )*
Ok(())
}
}
}

View File

@ -0,0 +1,34 @@
use proc_macro_error::abort_call_site;
use quote::ToTokens;
use syn::{GenericArgument, PathArguments, Type};
pub fn extract_type_from(ty: &Type) -> Type {
let stringify_type = ty.clone().into_token_stream().to_string();
match ty {
Type::Path(type_path) if type_path.qself.is_none() => {
// Get the first segment of the path:
let type_params = type_path
.path
.segments
.iter()
.next()
.cloned()
.unwrap()
.arguments;
// It should have only on angle-bracketed param ("<Foo>"):
let generic_arg = match type_params {
PathArguments::AngleBracketed(params) => {
params.args.iter().next().cloned().unwrap()
}
_ => abort_call_site!("Expected single type argument, found {}", stringify_type),
};
// This argument must be a type:
match generic_arg {
GenericArgument::Type(ty) => ty,
_ => abort_call_site!("Expected single type argument, found {}", stringify_type),
}
}
_ => abort_call_site!("Expected single type argument, found {}", stringify_type),
}
}

22
overwatch/Cargo.toml Normal file
View File

@ -0,0 +1,22 @@
[package]
name = "overwatch"
version = "0.1.0"
edition = "2021"
authors = [
"Daniel Sanchez Quiros <danielsq@status.im>"
]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
overwatch-derive = { path = "../overwatch-derive" }
const-str = "0.3"
async-trait = "0.1"
futures = "0.3"
thiserror = "1.0"
tokio = { version = "1.17", features = ["rt-multi-thread", "sync", "time"] }
tokio-stream = {version ="0.1", features = ["sync"] }
tracing = "0.1"
[dev-dependencies]
tokio = { version = "1.17", features = ["rt-multi-thread", "sync", "time", "io-std", "io-util", "macros"] }

29
overwatch/src/lib.rs Normal file
View File

@ -0,0 +1,29 @@
//! Overwatch is a framework to easily construct applications that requires of several independent
//! parts that needs communication between them.
//! Everything is self contained and it matches somewhat the advantages of microservices.
//!
//! ## Design Goals
//!
//! - Modularity:
//! - Components should be self-contained (as possible)
//! - Communication relations between components should be specifically defined
//! - Components should be mockable. This is rather important for measurements and testing.
//!
//! - Single responsibility:
//! - It is easier to isolate problems
//! - Minimal sharing when unavoidable
//!
//! - Debuggeability
//! - Easy to track workflow
//! - Easy to test
//! - Easy to measure
//! - Asynchronous Communication
//!
//! ## Main components
//!
//! - Overwatch: the main messenger relay component (internal communications). It is also be responsible of managing other components lifecycle and handling configuration updates.
//! - Services (handled by the *overwatch*)
pub mod overwatch;
pub mod services;
pub mod utils;

View File

@ -0,0 +1,68 @@
// std
// crates
use crate::overwatch::AnySettings;
use tokio::sync::oneshot;
// internal
use crate::services::relay::RelayResult;
use crate::services::ServiceId;
#[derive(Debug)]
pub(crate) struct ReplyChannel<M>(pub(crate) oneshot::Sender<M>);
impl<M> From<oneshot::Sender<M>> for ReplyChannel<M> {
fn from(sender: oneshot::Sender<M>) -> Self {
Self(sender)
}
}
impl<M> ReplyChannel<M> {
pub async fn reply(self, message: M) -> Result<(), M> {
self.0.send(message)
}
}
/// Command for requesting communications with another service
#[derive(Debug)]
pub struct RelayCommand {
pub(crate) service_id: ServiceId,
pub(crate) reply_channel: ReplyChannel<RelayResult>,
}
/// Command for managing [`ServiceCore`](crate::services::ServiceCore) lifecycle
#[allow(unused)]
#[derive(Debug)]
pub struct ServiceLifeCycle<R> {
service_id: ServiceId,
reply_channel: ReplyChannel<R>,
}
/// [`ServiceCore`](crate::services::ServiceCore) lifecycle related commands
#[derive(Debug)]
pub enum ServiceLifeCycleCommand {
Shutdown(ServiceLifeCycle<()>),
Kill(ServiceLifeCycle<()>),
Start(ServiceLifeCycle<()>),
Stop(ServiceLifeCycle<()>),
}
/// [`Overwatch`](crate::overwatch::Overwatch) lifecycle related commands
#[derive(Debug)]
pub enum OverwatchLifeCycleCommand {
Shutdown,
Kill,
}
/// [`Overwatch`](crate::overwatch::Overwatch) settings update command
#[derive(Debug)]
pub struct SettingsCommand(pub(crate) AnySettings);
/// [`Overwatch`](crate::overwatch::Overwatch) tasks related commands
#[derive(Debug)]
pub enum OverwatchCommand {
Relay(RelayCommand),
ServiceLifeCycle(ServiceLifeCycleCommand),
OverwatchLifeCycle(OverwatchLifeCycleCommand),
Settings(SettingsCommand),
}

View File

@ -0,0 +1,88 @@
// std
// crates
use crate::overwatch::commands::{OverwatchCommand, OverwatchLifeCycleCommand, SettingsCommand};
use crate::overwatch::Services;
use tokio::runtime::Handle;
use tokio::sync::mpsc::Sender;
use tracing::{error, info, instrument};
// internal
use crate::services::relay::Relay;
use crate::services::ServiceCore;
/// Handler object over the main Overwatch runner
/// It handles communications to the main Overwatch runner.
#[derive(Clone, Debug)]
pub struct OverwatchHandle {
#[allow(unused)]
runtime_handle: Handle,
sender: Sender<OverwatchCommand>,
}
impl OverwatchHandle {
pub fn new(runtime_handle: Handle, sender: Sender<OverwatchCommand>) -> Self {
Self {
runtime_handle,
sender,
}
}
/// Request for a relay to an specific service by type
pub fn relay<S: ServiceCore>(&self) -> Relay<S> {
Relay::new(self.clone())
}
/// Send a shutdown signal to the overwatch runner
pub async fn shutdown(&mut self) {
info!("Shutting down Overwatch");
if let Err(e) = self
.sender
.send(OverwatchCommand::OverwatchLifeCycle(
OverwatchLifeCycleCommand::Shutdown,
))
.await
{
dbg!(e);
}
}
/// Send a kill signal to the overwatch runner
pub async fn kill(&mut self) {
info!("Killing Overwatch");
if let Err(e) = self
.sender
.send(OverwatchCommand::OverwatchLifeCycle(
OverwatchLifeCycleCommand::Kill,
))
.await
{
dbg!(e);
}
}
/// Send an overwatch command to the overwatch runner
#[instrument(name = "overwatch-command-send", skip(self))]
pub async fn send(&mut self, command: OverwatchCommand) {
if let Err(e) = self.sender.send(command).await {
error!(error=?e, "Error sending overwatch command");
}
}
#[instrument(skip(self))]
pub async fn update_settings<S: Services>(&mut self, settings: S::Settings) {
if let Err(e) = self
.sender
.send(OverwatchCommand::Settings(SettingsCommand(Box::new(
settings,
))))
.await
{
error!(error=?e, "Error updating settings")
}
}
pub fn runtime(&self) -> &Handle {
&self.runtime_handle
}
}

View File

@ -0,0 +1,302 @@
pub mod commands;
pub mod handle;
// std
use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
use std::future::Future;
// crates
use async_trait::async_trait;
use thiserror::Error;
use tokio::runtime::{Handle, Runtime};
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tracing::{info, instrument};
// internal
use crate::overwatch::commands::{
OverwatchCommand, OverwatchLifeCycleCommand, RelayCommand, SettingsCommand,
};
use crate::overwatch::handle::OverwatchHandle;
use crate::services::relay::RelayResult;
use crate::services::{ServiceError, ServiceId};
use crate::utils::runtime::default_multithread_runtime;
/// Overwatch base error type
#[derive(Error, Debug)]
pub enum Error {
#[error(transparent)]
Relay(#[from] ServiceError),
#[error("Service {service_id} is unavailable")]
Unavailable { service_id: ServiceId },
}
/// Signal sent so overwatch finish execution
type FinishOverwatchSignal = ();
/// Marker trait for settings related elements
pub type AnySettings = Box<dyn Any + Send + 'static>;
/// An overwatch run anything that implements this trait
/// An implementor of this trait would have to handle the inner [`ServiceCore`](crate::services::ServiceCore)
#[async_trait]
pub trait Services: Send + Sync {
/// Inner [`ServiceCore::Settings`](crate::services::ServiceCore) grouping type.
/// Normally this will be a settings object that group all the inner services settings.
type Settings: Debug + Send + 'static;
/// Spawn a new instance of the Services object
/// It returns a `(ServiceId, Runtime)` where Runtime is the `tokio::runtime::Runtime` attached for each
/// service.
/// It also returns an instance of the implementing type.
fn new(
settings: Self::Settings,
overwatch_handle: OverwatchHandle,
) -> (HashMap<ServiceId, Option<Runtime>>, Self);
/// Start a services attached to the trait implementer
fn start(&mut self, service_id: ServiceId) -> Result<(), Error>;
// TODO: this probably will be removed once the services lifecycle is implemented
/// Start all services attached to the trait implementer
fn start_all(&mut self) -> Result<(), Error>;
/// Stop a service attached to the trait implementer
fn stop(&mut self, service_id: ServiceId) -> Result<(), Error>;
/// Request communication relay to one of the services
fn request_relay(&mut self, service_id: ServiceId) -> RelayResult;
/// Update service settings
fn update_settings(&mut self, settings: Self::Settings) -> Result<(), Error>;
}
/// `OverwatchRunner` is the entity that handles a running overwatch
/// it is usually one-shot. It contains what it is needed just to be run as a main loop
/// and a system to be able to stop it running. Meaning that it i responsible of the Overwatch
/// application lifecycle.
pub struct OverwatchRunner<S: Services> {
services: S,
#[allow(unused)]
handle: OverwatchHandle,
finish_signal_sender: oneshot::Sender<()>,
}
/// Overwatch thread identifier
/// it is used when creating the `tokio::runtime::Runtime` that Overwatch uses internally
pub const OVERWATCH_THREAD_NAME: &str = "Overwatch";
impl<S> OverwatchRunner<S>
where
S: Services + 'static,
{
/// Start the Overwatch runner process
/// It creates the `tokio::runtime::Runtime`, initialize the [`Services`] and start listening for
/// Overwatch related tasks.
/// Returns the [`Overwatch`] instance that handles this runner.
pub fn run(settings: S::Settings, runtime: Option<Runtime>) -> Overwatch {
let runtime = runtime.unwrap_or_else(default_multithread_runtime);
let (finish_signal_sender, finish_runner_signal) = tokio::sync::oneshot::channel();
let (commands_sender, commands_receiver) = tokio::sync::mpsc::channel(16);
let handle = OverwatchHandle::new(runtime.handle().clone(), commands_sender);
let (services_runtimes, services) = S::new(settings, handle.clone());
let runner = OverwatchRunner {
services,
handle: handle.clone(),
finish_signal_sender,
};
runtime.spawn(async move { runner.run_(commands_receiver).await });
Overwatch {
runtime,
services_runtimes,
handle,
finish_runner_signal,
}
}
#[instrument(name = "overwatch-run", skip_all)]
async fn run_(self, mut receiver: Receiver<OverwatchCommand>) {
let Self {
mut services,
handle: _,
finish_signal_sender,
} = self;
// TODO: this probably need to be manually done, or at least handled by a flag
services.start_all().expect("Services to start running");
while let Some(command) = receiver.recv().await {
info!(command = ?command, "Overwatch command received");
match command {
OverwatchCommand::Relay(relay_command) => {
Self::handle_relay(&mut services, relay_command).await;
}
OverwatchCommand::ServiceLifeCycle(_) => {
unimplemented!("Services life cycle is still not supported!");
}
OverwatchCommand::OverwatchLifeCycle(command) => {
if matches!(
command,
OverwatchLifeCycleCommand::Kill | OverwatchLifeCycleCommand::Shutdown
) {
break;
}
}
OverwatchCommand::Settings(settings) => {
Self::handle_settings_update(&mut services, settings).await;
}
}
}
// signal that we finished execution
finish_signal_sender
.send(())
.expect("Overwatch run finish signal to be sent properly");
}
async fn handle_relay(services: &mut S, command: RelayCommand) {
let RelayCommand {
service_id,
reply_channel,
} = command;
// send requested rely channel result to requesting service
if let Err(Err(e)) = reply_channel
.reply(services.request_relay(service_id))
.await
{
info!(error=?e, "Error requesting relay for service {}", service_id)
}
}
async fn handle_settings_update(services: &mut S, command: SettingsCommand) {
let SettingsCommand(settings) = command;
if let Ok(settings) = settings.downcast::<S::Settings>() {
if let Err(e) = services.update_settings(*settings) {
// TODO: add proper logging
dbg!(e);
}
} else {
unreachable!("Statically should always be of the correct type");
}
}
}
/// Main Overwatch entity
/// It manages the overwatch runtime and handle
pub struct Overwatch {
runtime: Runtime,
#[allow(unused)]
services_runtimes: HashMap<ServiceId, Option<Runtime>>,
handle: OverwatchHandle,
finish_runner_signal: oneshot::Receiver<FinishOverwatchSignal>,
}
impl Overwatch {
/// Get the overwatch handle
/// [`OverwatchHandle`](crate::overwatch::handle::OverwatchHandle) is cloneable, so it can be done on demand
pub fn handle(&self) -> &OverwatchHandle {
&self.handle
}
/// Get the underllaying tokio runtime handle
pub fn runtime(&self) -> &Handle {
self.runtime.handle()
}
/// Spawn a new task within the Overwatch runtime
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.runtime.spawn(future)
}
/// Block until Overwatch finish its execution
pub fn wait_finished(self) {
let Self {
runtime,
finish_runner_signal,
..
} = self;
runtime.block_on(async move {
let signal_result = finish_runner_signal.await;
signal_result.expect("A finished signal arrived");
});
}
}
#[cfg(test)]
mod test {
use crate::overwatch::handle::OverwatchHandle;
use crate::overwatch::{Error, OverwatchRunner, Services};
use crate::services::relay::{RelayError, RelayResult};
use crate::services::ServiceId;
use std::collections::HashMap;
use std::time::Duration;
use tokio::runtime::Runtime;
use tokio::time::sleep;
struct EmptyServices;
impl Services for EmptyServices {
type Settings = ();
fn new(
_settings: Self::Settings,
_overwatch_handle: OverwatchHandle,
) -> (HashMap<ServiceId, Option<Runtime>>, Self) {
(HashMap::new(), EmptyServices)
}
fn start(&mut self, service_id: ServiceId) -> Result<(), Error> {
Err(Error::Unavailable { service_id })
}
fn start_all(&mut self) -> Result<(), Error> {
Ok(())
}
fn stop(&mut self, service_id: ServiceId) -> Result<(), Error> {
Err(Error::Unavailable { service_id })
}
fn request_relay(&mut self, service_id: ServiceId) -> RelayResult {
Err(RelayError::InvalidRequest { to: service_id })
}
fn update_settings(&mut self, _settings: Self::Settings) -> Result<(), Error> {
Ok(())
}
}
#[test]
fn run_overwatch_then_stop() {
let overwatch = OverwatchRunner::<EmptyServices>::run((), None);
let mut handle = overwatch.handle().clone();
overwatch.spawn(async move {
sleep(Duration::from_millis(500)).await;
handle.shutdown().await;
});
overwatch.wait_finished();
}
#[test]
fn run_overwatch_then_kill() {
let overwatch = OverwatchRunner::<EmptyServices>::run((), None);
let mut handle = overwatch.handle().clone();
overwatch.spawn(async move {
sleep(Duration::from_millis(500)).await;
handle.kill().await;
});
overwatch.wait_finished();
}
}

View File

@ -0,0 +1,168 @@
// std
use std::marker::PhantomData;
// crates
use futures::future::{abortable, AbortHandle};
use tokio::runtime::{Handle, Runtime};
use tracing::instrument;
// internal
use crate::overwatch::handle::OverwatchHandle;
use crate::services::relay::{relay, InboundRelay, OutboundRelay};
use crate::services::settings::{SettingsNotifier, SettingsUpdater};
use crate::services::state::{StateHandle, StateOperator, StateUpdater};
use crate::services::{ServiceCore, ServiceId, ServiceState};
// TODO: Abstract handle over state, to diferentiate when the service is running and when it is not
// that way we can expose a better API depending on what is happenning. Would get rid of the probably
// unnecessary Option and cloning.
/// Service handle
/// This is used to access different parts of the service
pub struct ServiceHandle<S: ServiceCore> {
/// Service id (must match `<ServiceCore::ServiceId>`)
id: ServiceId,
/// Service runtime handle
runtime: Handle,
/// Message channel relay
/// Would be None if service is not running
/// Will contain the channel if service is running
outbound_relay: Option<OutboundRelay<S::Message>>,
/// Handle to overwatch
overwatch_handle: OverwatchHandle,
settings: SettingsUpdater<S::Settings>,
initial_state: S::State,
_marker: PhantomData<S>,
}
/// Service core resources
/// It contains whatever is necessary to start a new service runner
pub struct ServiceStateHandle<S: ServiceCore> {
/// Service runtime handler
pub runtime: Handle,
/// Relay channel to communicate with the service runner
pub inbound_relay: InboundRelay<S::Message>,
/// Overwatch handle
pub overwatch_handle: OverwatchHandle,
pub settings_reader: SettingsNotifier<S::Settings>,
pub state_updater: StateUpdater<S::State>,
pub _lifecycle_handler: (),
}
/// Main service executor
/// It is the object that hold the necessary information for the service to run
pub struct ServiceRunner<S: ServiceCore> {
#[allow(unused)]
overwatch_handle: OverwatchHandle,
service_state: ServiceStateHandle<S>,
state_handle: StateHandle<S::State, S::StateOperator>,
}
impl<S: ServiceCore> ServiceHandle<S> {
pub fn new(
settings: S::Settings,
overwatch_handle: OverwatchHandle,
) -> (Option<Runtime>, Self) {
let id = S::SERVICE_ID;
let runtime = S::service_runtime(&settings, overwatch_handle.runtime());
let initial_state: S::State = S::State::from_settings(&settings);
let handle = runtime.handle();
let settings = SettingsUpdater::new(settings);
(
runtime.runtime(),
Self {
id,
runtime: handle,
outbound_relay: None,
settings,
initial_state,
overwatch_handle,
_marker: PhantomData::default(),
},
)
}
pub fn id(&self) -> ServiceId {
self.id
}
/// Service runtime getter
/// it is easily cloneable and can be done on demand
pub fn runtime(&self) -> &Handle {
&self.runtime
}
/// Overwatch handle
/// it is easily cloneable and can be done on demand
pub fn overwatch_handle(&self) -> &OverwatchHandle {
&self.overwatch_handle
}
/// Request a relay with this service
pub fn relay_with(&self) -> Option<OutboundRelay<S::Message>> {
self.outbound_relay.clone()
}
/// Update settings
pub fn update_settings(&self, settings: S::Settings) {
self.settings.update(settings)
}
/// Build a runner for this service
pub fn service_runner(&mut self) -> ServiceRunner<S> {
// TODO: add proper status handling here, a service should be able to produce a runner if it is already running.
let (inbound_relay, outbound_relay) = relay::<S::Message>(S::SERVICE_RELAY_BUFFER_SIZE);
let settings_reader = self.settings.notifier();
// add relay channel to handle
self.outbound_relay = Some(outbound_relay);
let settings = self.settings.notifier().get_updated_settings();
let operator = S::StateOperator::from_settings::<S::Settings>(settings);
let (state_handle, state_updater) =
StateHandle::<S::State, S::StateOperator>::new(self.initial_state.clone(), operator);
let service_state = ServiceStateHandle {
runtime: self.runtime.clone(),
inbound_relay,
overwatch_handle: self.overwatch_handle.clone(),
state_updater,
settings_reader,
_lifecycle_handler: (),
};
ServiceRunner {
overwatch_handle: self.overwatch_handle().clone(),
service_state,
state_handle,
}
}
}
impl<S: ServiceCore> ServiceStateHandle<S> {
pub fn id(&self) -> ServiceId {
S::SERVICE_ID
}
}
impl<S: ServiceCore> ServiceRunner<S> {
/// Spawn the service main loop and handle it lifecycle
/// Return a handle to abort execution manually
#[instrument(skip(self), fields(service_id=S::SERVICE_ID))]
pub fn run(self) -> AbortHandle {
let ServiceRunner {
service_state,
state_handle,
..
} = self;
let runtime = service_state.runtime.clone();
let service = S::init(service_state);
let (runner, abortable_handle) = abortable(service.run());
runtime.spawn(runner);
runtime.spawn(state_handle.run());
// TODO: Handle service lifecycle
// TODO: this handle should not scape this scope, it should actually be handled in the lifecycle part mentioned above
abortable_handle
}
}

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,89 @@
pub mod handle;
pub mod life_cycle;
pub mod relay;
pub mod settings;
pub mod state;
// std
use std::fmt::Debug;
// crates
use async_trait::async_trait;
use thiserror::Error;
use tokio::runtime;
// internal
use crate::services::relay::RelayError;
use crate::services::state::StateOperator;
use handle::ServiceStateHandle;
use relay::RelayMessage;
use state::ServiceState;
// TODO: Make this type unique for each service?
/// Services identification type
pub type ServiceId = &'static str;
/// The core data a service needs to handle
/// Holds the necessary information of a service
pub trait ServiceData {
/// Service identification tag
const SERVICE_ID: ServiceId;
/// Service relay buffer size
const SERVICE_RELAY_BUFFER_SIZE: usize = 16;
/// Service settings object
type Settings: Clone;
/// Service state object
type State: ServiceState<Settings = Self::Settings> + Clone;
/// State operator
type StateOperator: StateOperator<StateInput = Self::State> + Clone;
/// Service messages that the service itself understands and can react to
type Message: RelayMessage + Debug + Send + Sync;
}
/// Main trait for Services initialization and main loop hook
#[async_trait]
pub trait ServiceCore: ServiceData + Send + Sized + 'static {
/// Initialize the service with the given state
fn init(service_state: ServiceStateHandle<Self>) -> Self;
/// Service default runtime
fn service_runtime(_settings: &Self::Settings, _parent: &runtime::Handle) -> ServiceRuntime {
let id = Self::SERVICE_ID;
ServiceRuntime::Custom(
runtime::Builder::new_multi_thread()
.enable_all()
.thread_name(id)
.build()
.unwrap_or_else(|_| panic!("Async runtime for service {id} didn't build properly")),
)
}
/// Service main loop
async fn run(mut self);
}
#[derive(Error, Debug)]
pub enum ServiceError {
#[error(transparent)]
RelayError(#[from] RelayError),
}
pub enum ServiceRuntime {
FromParent(runtime::Handle),
Custom(runtime::Runtime),
}
impl ServiceRuntime {
pub fn handle(&self) -> runtime::Handle {
match self {
ServiceRuntime::FromParent(handle) => handle.clone(),
ServiceRuntime::Custom(runtime) => runtime.handle().clone(),
}
}
pub fn runtime(self) -> Option<runtime::Runtime> {
match self {
ServiceRuntime::Custom(runtime) => Some(runtime),
_ => None,
}
}
}

View File

@ -0,0 +1,196 @@
// std
use std::any::Any;
use std::fmt::Debug;
// crates
use thiserror::Error;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::oneshot;
use tracing::{error, instrument};
// internal
use crate::overwatch::commands::{OverwatchCommand, RelayCommand, ReplyChannel};
use crate::overwatch::handle::OverwatchHandle;
use crate::services::{ServiceCore, ServiceId};
#[derive(Error, Debug)]
pub enum RelayError {
#[error("error requesting relay to {to} service")]
InvalidRequest { to: ServiceId },
#[error("couldn't relay message")]
Send,
#[error("relay is already connected")]
AlreadyConnected,
#[error("service relay is disconnected")]
Disconnected,
#[error("service {service_id} is not available")]
Unavailable { service_id: ServiceId },
#[error("invalid message with type id [{type_id}] for service {service_id}")]
InvalidMessage {
type_id: String,
service_id: &'static str,
},
#[error("receiver failed due to {0:?}")]
Receiver(Box<dyn Debug + Send + Sync>),
}
/// Message wrapper type
pub type AnyMessage = Box<dyn Any + Send + 'static>;
#[derive(Debug, Clone)]
pub struct NoMessage;
impl RelayMessage for NoMessage {}
/// Result type when creating a relay connection
pub type RelayResult = Result<AnyMessage, RelayError>;
/// Marker type for relay messages
/// Notice that it is bound to 'static.
pub trait RelayMessage: 'static {}
enum RelayState<M> {
Disconnected,
Connected(OutboundRelay<M>),
}
impl<M> Clone for RelayState<M> {
fn clone(&self) -> Self {
match self {
RelayState::Disconnected => RelayState::Disconnected,
RelayState::Connected(outbound) => RelayState::Connected(outbound.clone()),
}
}
}
/// Channel receiver of a relay connection
pub struct InboundRelay<M> {
receiver: Receiver<M>,
_stats: (), // placeholder
}
/// Channel sender of a relay connection
pub struct OutboundRelay<M> {
sender: Sender<M>,
_stats: (), // placeholder
}
pub struct Relay<S: ServiceCore> {
state: RelayState<S::Message>,
overwatch_handle: OverwatchHandle,
}
impl<S: ServiceCore> Clone for Relay<S> {
fn clone(&self) -> Self {
Self {
state: self.state.clone(),
overwatch_handle: self.overwatch_handle.clone(),
}
}
}
// TODO: make buffer_size const?
/// Relay channel builder
pub fn relay<M>(buffer_size: usize) -> (InboundRelay<M>, OutboundRelay<M>) {
let (sender, receiver) = channel(buffer_size);
(
InboundRelay {
receiver,
_stats: (),
},
OutboundRelay { sender, _stats: () },
)
}
impl<M> InboundRelay<M> {
/// Receive a message from the relay connections
pub async fn recv(&mut self) -> Option<M> {
self.receiver.recv().await
}
}
impl<M> OutboundRelay<M> {
/// Send a message to the relay connection
pub async fn send(&mut self, message: M) -> Result<(), (RelayError, M)> {
self.sender
.send(message)
.await
.map_err(|e| (RelayError::Send, e.0))
}
}
impl<M> Clone for OutboundRelay<M> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
_stats: (),
}
}
}
impl<S: ServiceCore> Relay<S> {
pub fn new(overwatch_handle: OverwatchHandle) -> Self {
Self {
state: RelayState::Disconnected,
overwatch_handle,
}
}
#[instrument(skip(self), err(Debug))]
pub async fn connect(&mut self) -> Result<(), RelayError> {
if let RelayState::Disconnected = self.state {
let (reply, receiver) = oneshot::channel();
self.request_relay(reply).await;
self.handle_relay_response(receiver).await
} else {
Err(RelayError::AlreadyConnected)
}
}
#[instrument(skip(self), err(Debug))]
pub fn disconnect(&mut self) -> Result<(), RelayError> {
self.state = RelayState::Disconnected;
Ok(())
}
#[instrument(skip_all, err(Debug))]
pub async fn send(&mut self, message: S::Message) -> Result<(), RelayError> {
// TODO: we could make a retry system and/or add timeouts
if let RelayState::Connected(outbound_relay) = &mut self.state {
outbound_relay
.send(message)
.await
.map_err(|(e, _message)| e)
} else {
Err(RelayError::Disconnected)
}
}
async fn request_relay(&mut self, reply: oneshot::Sender<RelayResult>) {
let relay_command = OverwatchCommand::Relay(RelayCommand {
service_id: S::SERVICE_ID,
reply_channel: ReplyChannel(reply),
});
self.overwatch_handle.send(relay_command).await;
}
#[instrument(skip_all, err(Debug))]
async fn handle_relay_response(
&mut self,
receiver: oneshot::Receiver<RelayResult>,
) -> Result<(), RelayError> {
let response = receiver.await;
match response {
Ok(Ok(message)) => match message.downcast::<OutboundRelay<S::Message>>() {
Ok(channel) => {
self.state = RelayState::Connected(*channel);
Ok(())
}
Err(m) => Err(RelayError::InvalidMessage {
type_id: format!("{:?}", m.type_id()),
service_id: S::SERVICE_ID,
}),
},
Ok(Err(e)) => Err(e),
Err(e) => Err(RelayError::Receiver(Box::new(e))),
}
}
}

View File

@ -0,0 +1,90 @@
//std
//crates
use tokio::sync::watch::{channel, Receiver, Sender};
use tracing::{error, instrument};
//internal
/// Wrapper around [`tokio::sync::watch::Receiver`]
pub struct SettingsNotifier<S> {
notifier_channel: Receiver<S>,
}
impl<S: Clone> SettingsNotifier<S> {
pub fn new(notifier_channel: Receiver<S>) -> Self {
Self { notifier_channel }
}
/// Get latest settings, it is guaranteed that at least an initial value is present
/// This returns a cloned version of the referenced settings. It simplifies the API
/// at the expense of some efficiency.
// TODO: Maybe we can consider returning the Ref<> from the borrowing. But in doing would be
// be blocking the updating channel so this responsibility would be dumped into the end user
// of the method. Another option would be to spawn a task that updates a settings local value
// each time an updated settings is received. This could not be so easy to do, since it will
// need to hold a &mut to the holder (or needed to use a Cell/RefCell).
pub fn get_updated_settings(&mut self) -> S {
self.notifier_channel.borrow().clone()
}
}
/// Settings update notification sender
pub struct SettingsUpdater<S> {
sender: Sender<S>,
receiver: Receiver<S>,
}
impl<S> SettingsUpdater<S> {
pub fn new(settings: S) -> Self {
let (sender, receiver) = channel(settings);
Self { sender, receiver }
}
/// Send a new settings update notification to the watcher end
#[instrument(skip_all)]
pub fn update(&self, settings: S) {
self.sender.send(settings).unwrap_or_else(|_e| {
error!("Error sending settings update for service");
});
}
/// Get a new notifier channel, used to get latest settings changes updates
pub fn notifier(&self) -> SettingsNotifier<S> {
SettingsNotifier {
notifier_channel: self.receiver.clone(),
}
}
}
#[cfg(test)]
mod test {
use crate::services::settings::SettingsUpdater;
use std::collections::HashSet;
use std::time::Duration;
use tokio::time::sleep;
use tokio::time::timeout;
#[tokio::test]
async fn settings_updater_sequence() {
let updater = SettingsUpdater::new(10usize);
let mut notifier = updater.notifier();
let values = [10, 0usize];
let mut seq = HashSet::from(values);
let handle = tokio::spawn(timeout(Duration::from_secs(3), async move {
while !seq.is_empty() {
let new_value = notifier.get_updated_settings();
seq.remove(&new_value);
sleep(Duration::from_millis(50)).await;
}
true
}));
sleep(Duration::from_millis(100)).await;
for v in &values[1..] {
updater.update(*v);
sleep(Duration::from_millis(100)).await;
}
// all values updates have been seen
let success: Result<bool, _> = handle.await.unwrap();
assert!(success.unwrap());
}
}

View File

@ -0,0 +1,204 @@
// std
use std::marker::PhantomData;
use std::sync::Arc;
// crates
use async_trait::async_trait;
use futures::StreamExt;
use tokio::sync::watch::{channel, Receiver, Ref, Sender};
use tokio_stream::wrappers::WatchStream;
use tracing::error;
// internal
// TODO: Constrain this, probably with needed serialize/deserialize options.
/// Service state initialization traits
/// It defines what is needed for a service state to be initialized.
/// Need what set of settings information is required for it to be initialized [`ServiceState::Settings`]
/// which usually is bound to the service itself [`crate::services::ServiceData::Settings`]
pub trait ServiceState: Send + Sync + 'static {
/// Settings object that the state can be initialized from
type Settings;
/// Initialize a stage upon the provided settings
fn from_settings(settings: &Self::Settings) -> Self;
}
/// A state operator is an entity that can handle a state in a point of time
/// to perform any operation based on it.
#[async_trait]
pub trait StateOperator: Send {
/// The type of state that the operator can handle
type StateInput: ServiceState;
/// Operator initialization method. Can be implemented over some subset of settings
fn from_settings<Settings>(settings: Settings) -> Self;
/// Asynchronously perform an operation for a given state
async fn run(&mut self, state: Self::StateInput);
}
/// Operator that doesn't perform any operation upon state update
#[derive(Clone, Copy)]
pub struct NoOperator<StateInput>(PhantomData<StateInput>);
#[async_trait]
impl<StateInput: ServiceState> StateOperator for NoOperator<StateInput> {
type StateInput = StateInput;
fn from_settings<Settings>(_settings: Settings) -> Self {
NoOperator(PhantomData::default())
}
async fn run(&mut self, _state: Self::StateInput) {}
}
/// Empty state
#[derive(Clone, Copy)]
pub struct NoState<Settings>(PhantomData<Settings>);
impl<Settings: Send + Sync + 'static> ServiceState for NoState<Settings> {
type Settings = Settings;
fn from_settings(_settings: &Self::Settings) -> Self {
Self(Default::default())
}
}
/// Receiver part of the state handling mechanism.
/// A state handle watches a stream of incoming states and triggers the attached operator handling
/// method over it.
#[derive(Clone)]
pub struct StateHandle<S: ServiceState, Operator: StateOperator<StateInput = S>> {
watcher: StateWatcher<S>,
operator: Operator,
}
/// Sender part of the state handling mechanism.
/// Update the current state and notifies the [`StateHandle`].
#[derive(Clone)]
pub struct StateUpdater<S> {
sender: Arc<Sender<S>>,
}
/// Wrapper over [`tokio::sync::watch::Receiver`]
#[derive(Clone)]
pub struct StateWatcher<S> {
receiver: Receiver<S>,
}
impl<S: ServiceState> StateUpdater<S> {
/// Send a new state and notify the [`StateWatcher`]
pub fn update(&mut self, new_state: S) {
self.sender.send(new_state).unwrap_or_else(|_e| {
error!("Error updating state");
});
}
}
impl<S> StateWatcher<S>
where
S: ServiceState + Clone,
{
/// Get a copy of the most updated state
pub fn state_cloned(&self) -> S {
self.receiver.borrow().clone()
}
}
impl<S> StateWatcher<S>
where
S: ServiceState,
{
/// Get a [`Ref`](tokio::sync::watch::Ref) to the last state, this blocks incoming updates until
/// the `Ref` is dropped. Use with caution.
pub fn state_ref(&self) -> Ref<S> {
self.receiver.borrow()
}
}
impl<S, Operator> StateHandle<S, Operator>
where
S: ServiceState + Clone,
Operator: StateOperator<StateInput = S>,
{
pub fn new(initial_state: S, operator: Operator) -> (Self, StateUpdater<S>) {
let (sender, receiver) = channel(initial_state);
let watcher = StateWatcher { receiver };
let updater = StateUpdater {
sender: Arc::new(sender),
};
(Self { watcher, operator }, updater)
}
/// Wait for new state updates and run the operator handling method
pub async fn run(self) {
let Self {
watcher,
mut operator,
} = self;
let mut state_stream = WatchStream::new(watcher.receiver);
while let Some(state) = state_stream.next().await {
operator.run(state).await;
}
}
}
#[cfg(test)]
mod test {
use crate::services::state::{ServiceState, StateHandle, StateOperator, StateUpdater};
use async_trait::async_trait;
use std::time::Duration;
use tokio::io;
use tokio::io::AsyncWriteExt;
use tokio::time::sleep;
#[derive(Clone)]
struct UsizeCounter(usize);
impl ServiceState for UsizeCounter {
type Settings = ();
fn from_settings(_settings: &Self::Settings) -> Self {
Self(0)
}
}
struct PanicOnGreaterThanTen;
#[async_trait]
impl StateOperator for PanicOnGreaterThanTen {
type StateInput = UsizeCounter;
fn from_settings<Settings>(_settings: Settings) -> Self {
Self
}
async fn run(&mut self, state: Self::StateInput) {
let mut stdout = io::stdout();
let UsizeCounter(value) = state;
stdout
.write_all(format!("{value}\n").as_bytes())
.await
.expect("stop Output wrote");
assert!(value < 10);
}
}
#[tokio::test]
#[should_panic]
async fn state_stream_collects() {
let (handle, mut updater): (
StateHandle<UsizeCounter, PanicOnGreaterThanTen>,
StateUpdater<UsizeCounter>,
) = StateHandle::new(
UsizeCounter::from_settings(&()),
PanicOnGreaterThanTen::from_settings(()),
);
tokio::task::spawn(async move {
sleep(Duration::from_millis(50)).await;
for i in 0..15 {
updater.update(UsizeCounter(i));
sleep(Duration::from_millis(50)).await;
}
});
handle.run().await;
}
}

View File

@ -0,0 +1,35 @@
use crate::services::ServiceId;
pub const fn unique_ids(to_check: &[ServiceId]) -> bool {
if to_check.is_empty() {
return true;
}
let mut i: usize = 0;
let mut j: usize = 1;
while i < to_check.len() - 1 {
if const_str::equal!(to_check[i], to_check[j]) {
return false;
}
j += 1;
if j >= to_check.len() {
i += 1;
j = i + 1;
}
}
true
}
#[cfg(test)]
mod test {
use crate::utils::const_checks::unique_ids;
#[test]
fn test_unique_ids() {
// this shouldn't even compile if checks fails
const _: () = assert!(unique_ids(&["A", "B"]));
const _: () = assert!(!unique_ids(&["A", "A"]));
const _: () = assert!(unique_ids(&[]));
const _: () = assert!(unique_ids(&["A"]));
}
}

View File

@ -0,0 +1,2 @@
pub mod const_checks;
pub mod runtime;

View File

@ -0,0 +1,9 @@
use crate::overwatch::OVERWATCH_THREAD_NAME;
pub fn default_multithread_runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name(OVERWATCH_THREAD_NAME)
.build()
.expect("Async runtime to build properly")
}

View File

@ -0,0 +1,116 @@
use async_trait::async_trait;
use futures::future::select;
use overwatch::overwatch::OverwatchRunner;
use overwatch::services::handle::{ServiceHandle, ServiceStateHandle};
use overwatch::services::relay::RelayMessage;
use overwatch::services::state::{NoOperator, NoState};
use overwatch::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_derive::Services;
use std::time::Duration;
use tokio::time::sleep;
pub struct PrintService {
state: ServiceStateHandle<Self>,
}
#[derive(Clone, Debug)]
pub struct PrintServiceMessage(String);
impl RelayMessage for PrintServiceMessage {}
impl ServiceData for PrintService {
const SERVICE_ID: ServiceId = "FooService";
type Settings = ();
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = PrintServiceMessage;
}
#[async_trait]
impl ServiceCore for PrintService {
fn init(state: ServiceStateHandle<Self>) -> Self {
Self { state }
}
async fn run(mut self) {
use tokio::io::{self, AsyncWriteExt};
let Self {
state: ServiceStateHandle {
mut inbound_relay, ..
},
} = self;
let print = async move {
let mut stdout = io::stdout();
while let Some(message) = inbound_relay.recv().await {
match message.0.as_ref() {
"stop" => {
stdout
.write_all(b"Printing service stopping\n")
.await
.expect("stop Output wrote");
break;
}
m => {
stdout
.write_all(format!("{m}\n").as_bytes())
.await
.expect("Message output wrote");
}
}
}
};
let idle = async move {
let mut stdout = io::stdout();
loop {
stdout
.write_all(b"Waiting for print process to finish...\n")
.await
.expect("Message output wrote");
sleep(Duration::from_millis(50)).await;
}
};
select(Box::pin(print), Box::pin(idle)).await;
}
}
#[derive(Services)]
struct TestApp {
print_service: ServiceHandle<PrintService>,
}
#[test]
fn derive_print_service() {
let settings: TestAppServiceSettings = TestAppServiceSettings { print_service: () };
let overwatch = OverwatchRunner::<TestApp>::run(settings, None);
let mut handle = overwatch.handle().clone();
let mut print_service_relay = handle.relay::<PrintService>();
overwatch.spawn(async move {
print_service_relay
.connect()
.await
.expect("A connection to the print service is established");
for _ in 0..3 {
print_service_relay
.send(PrintServiceMessage("Hey oh let's go!".to_string()))
.await
.expect("Message is sent");
}
sleep(Duration::from_millis(50)).await;
print_service_relay
.send(PrintServiceMessage("stop".to_string()))
.await
.expect("stop message to be sent");
});
overwatch.spawn(async move {
sleep(Duration::from_secs(1)).await;
handle.shutdown().await;
});
overwatch.wait_finished();
}

View File

@ -0,0 +1,85 @@
use async_trait::async_trait;
use overwatch::overwatch::OverwatchRunner;
use overwatch::services::handle::{ServiceHandle, ServiceStateHandle};
use overwatch::services::relay::RelayMessage;
use overwatch::services::state::{NoOperator, NoState};
use overwatch::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_derive::Services;
use std::time::Duration;
use tokio::time::sleep;
pub struct SettingsService {
state: ServiceStateHandle<Self>,
}
type SettingsServiceSettings = String;
#[derive(Clone, Debug)]
pub struct SettingsMsg;
impl RelayMessage for SettingsMsg {}
impl ServiceData for SettingsService {
const SERVICE_ID: ServiceId = "FooService";
type Settings = SettingsServiceSettings;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = SettingsMsg;
}
#[async_trait]
impl ServiceCore for SettingsService {
fn init(state: ServiceStateHandle<Self>) -> Self {
Self { state }
}
async fn run(mut self) {
let Self {
state:
ServiceStateHandle {
mut settings_reader,
..
},
} = self;
let print = async move {
let mut asserted = false;
for _ in 0..10 {
let new_settings = settings_reader.get_updated_settings();
if new_settings.as_str() == "New settings" {
asserted = true;
}
sleep(Duration::from_millis(50)).await;
}
// TODO: when [this](https://github.com/ockam-network/ockam/issues/2479)
// or (https://github.com/tokio-rs/tokio/issues/2002) lands
// update so this panic is not just a print and the test get actually aborted
assert!(asserted);
};
print.await;
}
}
#[derive(Services)]
struct TestApp {
settings_service: ServiceHandle<SettingsService>,
}
#[test]
fn settings_service_update_settings() {
let mut settings: TestAppServiceSettings = TestAppServiceSettings {
settings_service: SettingsServiceSettings::default(),
};
let overwatch = OverwatchRunner::<TestApp>::run(settings.clone(), None);
let handle = overwatch.handle().clone();
let mut handle2 = handle.clone();
settings.settings_service = "New settings".to_string();
overwatch.spawn(async move { handle.clone().update_settings::<TestApp>(settings).await });
overwatch.spawn(async move {
sleep(Duration::from_secs(1)).await;
handle2.shutdown().await;
});
overwatch.wait_finished();
}

View File

@ -0,0 +1,101 @@
use async_trait::async_trait;
use overwatch::overwatch::OverwatchRunner;
use overwatch::services::handle::{ServiceHandle, ServiceStateHandle};
use overwatch::services::relay::RelayMessage;
use overwatch::services::state::{ServiceState, StateOperator};
use overwatch::services::{ServiceCore, ServiceData, ServiceId};
use overwatch_derive::Services;
use std::time::Duration;
use tokio::io::{self, AsyncWriteExt};
use tokio::time::sleep;
pub struct UpdateStateService {
state: ServiceStateHandle<Self>,
}
#[derive(Clone, Debug)]
pub struct UpdateStateServiceMessage(String);
impl RelayMessage for UpdateStateServiceMessage {}
#[derive(Clone)]
pub struct CounterState {
value: usize,
}
impl ServiceState for CounterState {
type Settings = ();
fn from_settings(_settings: &Self::Settings) -> Self {
Self { value: 0 }
}
}
#[derive(Clone)]
pub struct CounterStateOperator;
#[async_trait]
impl StateOperator for CounterStateOperator {
type StateInput = CounterState;
fn from_settings<Settings>(_settings: Settings) -> Self {
CounterStateOperator
}
async fn run(&mut self, state: Self::StateInput) {
let value = state.value;
let mut stdout = io::stdout();
stdout
.write_all(format!("Updated state value received: {value}\n").as_bytes())
.await
.expect("stop Output wrote");
assert!(value < 10);
}
}
impl ServiceData for UpdateStateService {
const SERVICE_ID: ServiceId = "FooService";
type Settings = ();
type State = CounterState;
type StateOperator = CounterStateOperator;
type Message = UpdateStateServiceMessage;
}
#[async_trait]
impl ServiceCore for UpdateStateService {
fn init(state: ServiceStateHandle<Self>) -> Self {
Self { state }
}
async fn run(mut self) {
let Self {
state: ServiceStateHandle {
mut state_updater, ..
},
} = self;
for value in 0..10 {
state_updater.update(CounterState { value });
sleep(Duration::from_millis(50)).await;
}
}
}
#[derive(Services)]
struct TestApp {
update_state_service: ServiceHandle<UpdateStateService>,
}
#[test]
fn state_update_service() {
let settings: TestAppServiceSettings = TestAppServiceSettings {
update_state_service: (),
};
let overwatch = OverwatchRunner::<TestApp>::run(settings, None);
let mut handle = overwatch.handle().clone();
overwatch.spawn(async move {
sleep(Duration::from_secs(1)).await;
handle.shutdown().await;
});
overwatch.wait_finished();
}