From 59395d9bbd84d1372573b4475736eea3c236dd02 Mon Sep 17 00:00:00 2001 From: Mark Spanbroek Date: Tue, 14 Feb 2023 13:41:53 +0100 Subject: [PATCH] [statemachine] adds a statemachine for async workflows Allows events to be scheduled synchronously. See https://github.com/status-im/nim-codex/pull/344 Co-Authored-By: Ben Bierens Co-Authored-By: Eric Mastro --- codex/utils/asyncstatemachine.nim | 54 +++++++++++++ tests/codex/testutils.nim | 1 + tests/codex/utils/testasyncstatemachine.nim | 84 +++++++++++++++++++++ 3 files changed, 139 insertions(+) create mode 100644 codex/utils/asyncstatemachine.nim create mode 100644 tests/codex/utils/testasyncstatemachine.nim diff --git a/codex/utils/asyncstatemachine.nim b/codex/utils/asyncstatemachine.nim new file mode 100644 index 00000000..ec82b9c0 --- /dev/null +++ b/codex/utils/asyncstatemachine.nim @@ -0,0 +1,54 @@ +import pkg/questionable +import pkg/chronos +import pkg/upraises + +type + Machine* = ref object of RootObj + state: State + running: Future[void] + scheduled: AsyncQueue[Event] + scheduling: Future[void] + State* = ref object of RootObj + Event = proc(state: State): ?State {.gcsafe, upraises:[].} + +proc transition(_: type Event, previous, next: State): Event = + return proc (state: State): ?State = + if state == previous: + return some next + +proc schedule*(machine: Machine, event: Event) = + machine.scheduled.putNoWait(event) + +method run*(state: State): Future[?State] {.base, upraises:[].} = + discard + +proc run(machine: Machine, state: State) {.async.} = + try: + if next =? await state.run(): + machine.schedule(Event.transition(state, next)) + except CancelledError: + discard + +proc scheduler(machine: Machine) {.async.} = + try: + while true: + let event = await machine.scheduled.get() + if next =? event(machine.state): + if not machine.running.isNil: + await machine.running.cancelAndWait() + machine.state = next + machine.running = machine.run(machine.state) + asyncSpawn machine.running + except CancelledError: + discard + +proc start*(machine: Machine, initialState: State) = + machine.scheduling = machine.scheduler() + machine.schedule(Event.transition(machine.state, initialState)) + +proc stop*(machine: Machine) = + machine.scheduling.cancel() + machine.running.cancel() + +proc new*(_: type Machine): Machine = + Machine(scheduled: newAsyncQueue[Event]()) diff --git a/tests/codex/testutils.nim b/tests/codex/testutils.nim index e174d07f..72a6bcc7 100644 --- a/tests/codex/testutils.nim +++ b/tests/codex/testutils.nim @@ -2,5 +2,6 @@ import ./utils/teststatemachine import ./utils/teststatemachineasync import ./utils/testoptionalcast import ./utils/testkeyutils +import ./utils/testasyncstatemachine {.warning[UnusedImport]: off.} diff --git a/tests/codex/utils/testasyncstatemachine.nim b/tests/codex/utils/testasyncstatemachine.nim new file mode 100644 index 00000000..d7331822 --- /dev/null +++ b/tests/codex/utils/testasyncstatemachine.nim @@ -0,0 +1,84 @@ +import pkg/asynctest +import pkg/questionable +import pkg/chronos +import pkg/upraises +import codex/utils/asyncstatemachine +import ../helpers/eventually + +type + State1 = ref object of State + State2 = ref object of State + State3 = ref object of State + +var runs, cancellations = [0, 0, 0] + +method onMoveToNextStateEvent*(state: State): ?State {.base, upraises:[].} = + discard + +method run(state: State1): Future[?State] {.async.} = + inc runs[0] + return some State(State2.new()) + +method run(state: State2): Future[?State] {.async.} = + inc runs[1] + try: + await sleepAsync(1.hours) + except CancelledError: + inc cancellations[1] + raise + +method onMoveToNextStateEvent(state: State2): ?State = + some State(State3.new()) + +method run(state: State3): Future[?State] {.async.} = + inc runs[2] + +method onMoveToNextStateEvent(state: State3): ?State = + some State(State1.new()) + +suite "async state machines": + var machine: Machine + var state1, state2: State + + proc moveToNextStateEvent(state: State): ?State = + state.onMoveToNextStateEvent() + + setup: + runs = [0, 0, 0] + cancellations = [0, 0, 0] + machine = Machine.new() + state1 = State1.new() + state2 = State2.new() + + test "should call run on start state": + machine.start(state1) + check eventually runs[0] == 1 + + test "moves to next state when run completes": + machine.start(state1) + check eventually runs == [1, 1, 0] + + test "state2 moves to state3 on event": + machine.start(state2) + machine.schedule(moveToNextStateEvent) + check eventually runs == [0, 1, 1] + + test "state transition will cancel the running state": + machine.start(state2) + machine.schedule(moveToNextStateEvent) + check eventually cancellations == [0, 1, 0] + + test "scheduled events are handled one after the other": + machine.start(state2) + machine.schedule(moveToNextStateEvent) + machine.schedule(moveToNextStateEvent) + check eventually runs == [1, 2, 1] + + test "stops scheduling and current state": + machine.start(state2) + await sleepAsync(1.millis) + machine.stop() + machine.schedule(moveToNextStateEvent) + await sleepAsync(1.millis) + check runs == [0, 1, 0] + check cancellations == [0, 1, 0]