[statemachine] adds a statemachine for async workflows
Allows events to be scheduled synchronously. See https://github.com/status-im/nim-codex/pull/344 Co-Authored-By: Ben Bierens <thatbenbierens@gmail.com> Co-Authored-By: Eric Mastro <eric.mastro@gmail.com>
This commit is contained in:
parent
47da5c1625
commit
59395d9bbd
|
@ -0,0 +1,54 @@
|
|||
import pkg/questionable
|
||||
import pkg/chronos
|
||||
import pkg/upraises
|
||||
|
||||
type
|
||||
Machine* = ref object of RootObj
|
||||
state: State
|
||||
running: Future[void]
|
||||
scheduled: AsyncQueue[Event]
|
||||
scheduling: Future[void]
|
||||
State* = ref object of RootObj
|
||||
Event = proc(state: State): ?State {.gcsafe, upraises:[].}
|
||||
|
||||
proc transition(_: type Event, previous, next: State): Event =
|
||||
return proc (state: State): ?State =
|
||||
if state == previous:
|
||||
return some next
|
||||
|
||||
proc schedule*(machine: Machine, event: Event) =
|
||||
machine.scheduled.putNoWait(event)
|
||||
|
||||
method run*(state: State): Future[?State] {.base, upraises:[].} =
|
||||
discard
|
||||
|
||||
proc run(machine: Machine, state: State) {.async.} =
|
||||
try:
|
||||
if next =? await state.run():
|
||||
machine.schedule(Event.transition(state, next))
|
||||
except CancelledError:
|
||||
discard
|
||||
|
||||
proc scheduler(machine: Machine) {.async.} =
|
||||
try:
|
||||
while true:
|
||||
let event = await machine.scheduled.get()
|
||||
if next =? event(machine.state):
|
||||
if not machine.running.isNil:
|
||||
await machine.running.cancelAndWait()
|
||||
machine.state = next
|
||||
machine.running = machine.run(machine.state)
|
||||
asyncSpawn machine.running
|
||||
except CancelledError:
|
||||
discard
|
||||
|
||||
proc start*(machine: Machine, initialState: State) =
|
||||
machine.scheduling = machine.scheduler()
|
||||
machine.schedule(Event.transition(machine.state, initialState))
|
||||
|
||||
proc stop*(machine: Machine) =
|
||||
machine.scheduling.cancel()
|
||||
machine.running.cancel()
|
||||
|
||||
proc new*(_: type Machine): Machine =
|
||||
Machine(scheduled: newAsyncQueue[Event]())
|
|
@ -2,5 +2,6 @@ import ./utils/teststatemachine
|
|||
import ./utils/teststatemachineasync
|
||||
import ./utils/testoptionalcast
|
||||
import ./utils/testkeyutils
|
||||
import ./utils/testasyncstatemachine
|
||||
|
||||
{.warning[UnusedImport]: off.}
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
import pkg/asynctest
|
||||
import pkg/questionable
|
||||
import pkg/chronos
|
||||
import pkg/upraises
|
||||
import codex/utils/asyncstatemachine
|
||||
import ../helpers/eventually
|
||||
|
||||
type
|
||||
State1 = ref object of State
|
||||
State2 = ref object of State
|
||||
State3 = ref object of State
|
||||
|
||||
var runs, cancellations = [0, 0, 0]
|
||||
|
||||
method onMoveToNextStateEvent*(state: State): ?State {.base, upraises:[].} =
|
||||
discard
|
||||
|
||||
method run(state: State1): Future[?State] {.async.} =
|
||||
inc runs[0]
|
||||
return some State(State2.new())
|
||||
|
||||
method run(state: State2): Future[?State] {.async.} =
|
||||
inc runs[1]
|
||||
try:
|
||||
await sleepAsync(1.hours)
|
||||
except CancelledError:
|
||||
inc cancellations[1]
|
||||
raise
|
||||
|
||||
method onMoveToNextStateEvent(state: State2): ?State =
|
||||
some State(State3.new())
|
||||
|
||||
method run(state: State3): Future[?State] {.async.} =
|
||||
inc runs[2]
|
||||
|
||||
method onMoveToNextStateEvent(state: State3): ?State =
|
||||
some State(State1.new())
|
||||
|
||||
suite "async state machines":
|
||||
var machine: Machine
|
||||
var state1, state2: State
|
||||
|
||||
proc moveToNextStateEvent(state: State): ?State =
|
||||
state.onMoveToNextStateEvent()
|
||||
|
||||
setup:
|
||||
runs = [0, 0, 0]
|
||||
cancellations = [0, 0, 0]
|
||||
machine = Machine.new()
|
||||
state1 = State1.new()
|
||||
state2 = State2.new()
|
||||
|
||||
test "should call run on start state":
|
||||
machine.start(state1)
|
||||
check eventually runs[0] == 1
|
||||
|
||||
test "moves to next state when run completes":
|
||||
machine.start(state1)
|
||||
check eventually runs == [1, 1, 0]
|
||||
|
||||
test "state2 moves to state3 on event":
|
||||
machine.start(state2)
|
||||
machine.schedule(moveToNextStateEvent)
|
||||
check eventually runs == [0, 1, 1]
|
||||
|
||||
test "state transition will cancel the running state":
|
||||
machine.start(state2)
|
||||
machine.schedule(moveToNextStateEvent)
|
||||
check eventually cancellations == [0, 1, 0]
|
||||
|
||||
test "scheduled events are handled one after the other":
|
||||
machine.start(state2)
|
||||
machine.schedule(moveToNextStateEvent)
|
||||
machine.schedule(moveToNextStateEvent)
|
||||
check eventually runs == [1, 2, 1]
|
||||
|
||||
test "stops scheduling and current state":
|
||||
machine.start(state2)
|
||||
await sleepAsync(1.millis)
|
||||
machine.stop()
|
||||
machine.schedule(moveToNextStateEvent)
|
||||
await sleepAsync(1.millis)
|
||||
check runs == [0, 1, 0]
|
||||
check cancellations == [0, 1, 0]
|
Loading…
Reference in New Issue