add cancellation support to scheduled events

This commit is contained in:
gmega 2023-08-17 19:16:03 -03:00
parent f4f265a90e
commit 1673dd806c
8 changed files with 62 additions and 27 deletions

View File

@ -9,15 +9,15 @@ export EventDrivenEngine
type type
AwaitableHandle* = object of RootObj AwaitableHandle* = object of RootObj
schedulable: SchedulableEvent schedulable*: SchedulableEvent
engine: EventDrivenEngine engine: EventDrivenEngine
proc current_time*(self: EventDrivenEngine): uint64 {.inline.} = self.current_time proc currentTime*(self: EventDrivenEngine): uint64 {.inline.} = self.currentTime
proc schedule*(self: EventDrivenEngine, schedulable: SchedulableEvent): void = proc schedule*(self: EventDrivenEngine, schedulable: SchedulableEvent): void =
if schedulable.time < self.current_time: if schedulable.time < self.currentTime:
raise (ref Defect)( raise (ref Defect)(
msg: fmt"Cannot schedule an event in the past ({schedulable.time}) < ({self.current_time})") msg: fmt"Cannot schedule an event in the past ({schedulable.time}) < ({self.currentTime})")
self.queue.push(schedulable) self.queue.push(schedulable)
proc awaitableSchedule*(self: EventDrivenEngine, schedulable: SchedulableEvent): AwaitableHandle = proc awaitableSchedule*(self: EventDrivenEngine, schedulable: SchedulableEvent): AwaitableHandle =
@ -29,17 +29,19 @@ proc scheduleAll*[T: SchedulableEvent](self: EventDrivenEngine, schedulables: se
self.schedule(schedulable) self.schedule(schedulable)
proc nextStep*(self: EventDrivenEngine): Option[SchedulableEvent] = proc nextStep*(self: EventDrivenEngine): Option[SchedulableEvent] =
if len(self.queue) == 0:
return none(SchedulableEvent)
let schedulable = self.queue.pop() while len(self.queue) > 0:
self.current_time = schedulable.time let schedulable = self.queue.pop()
schedulable.atScheduledTime(engine = self) self.currentTime = schedulable.time
some(schedulable) if not schedulable.cancelled:
schedulable.atScheduledTime(engine = self)
return some(schedulable)
return none(SchedulableEvent)
proc runUntil*(self: EventDrivenEngine, until: uint64): void = proc runUntil*(self: EventDrivenEngine, until: uint64): void =
while self.nextStep().isSome and self.current_time < until: while self.nextStep().isSome and self.currentTime < until:
discard discard
proc run*(self: EventDrivenEngine): void = proc run*(self: EventDrivenEngine): void =

View File

@ -13,6 +13,7 @@ export Network
type type
ScheduledMessage = ref object of SchedulableEvent ScheduledMessage = ref object of SchedulableEvent
network: Network
message: Message message: Message
proc new*( proc new*(
@ -41,10 +42,11 @@ proc send*(self: Network, message: Message,
self.engine.awaitableSchedule( self.engine.awaitableSchedule(
ScheduledMessage( ScheduledMessage(
time: self.engine.current_time + delay, time: self.engine.currentTime + delay,
message: message message: message,
network: self
) )
) )
method atScheduledTime*(self: ScheduledMessage, engine: EventDrivenEngine) = method atScheduledTime*(self: ScheduledMessage, engine: EventDrivenEngine) =
self.message.receiver.deliver(self.message) self.message.receiver.deliver(self.message, engine, self.network)

View File

@ -4,10 +4,12 @@ import std/random
import ./types import ./types
import ./protocol import ./protocol
import ./eventdrivenengine
export options export options
export tables export tables
export protocol export protocol
export eventdrivenengine
export Peer export Peer
proc getProtocol*(self: Peer, protocolId: string): Option[Protocol] = proc getProtocol*(self: Peer, protocolId: string): Option[Protocol] =
@ -16,9 +18,9 @@ proc getProtocol*(self: Peer, protocolId: string): Option[Protocol] =
none(Protocol) none(Protocol)
proc deliver*(self: Peer, message: Message): void = proc deliver*(self: Peer, message: Message, engine: EventDrivenEngine, network: Network): void =
self.getProtocol(message.messageType).map( self.getProtocol(message.messageType).map(
proc (p: Protocol): void = p.deliver(message)) proc (p: Protocol): void = p.deliver(message, engine, network))
proc initPeer(self: Peer, protocols: seq[Protocol]): Peer = proc initPeer(self: Peer, protocols: seq[Protocol]): Peer =
# XXX integer indexes or an enum would be better, but this is easier # XXX integer indexes or an enum would be better, but this is easier

View File

@ -1,12 +1,21 @@
import ./types import ./types
import ./message import ./message
import ./network
import ./eventdrivenengine
export message export message
export network
export eventdrivenengine
export Protocol export Protocol
method uncheckedDeliver(self: Protocol, message: Message): void {.base.} = method uncheckedDeliver(
self: Protocol,
message: Message,
engine: EventDrivenEngine,
network: Network
): void {.base.} =
raise newException(CatchableError, "Method without implementation override") raise newException(CatchableError, "Method without implementation override")
proc deliver*(self: Protocol, message: Message): void = proc deliver*(self: Protocol, message: Message, engine: EventDrivenEngine, network: Network): void =
assert(self.messageType == message.messageType) assert(self.messageType == message.messageType)
self.uncheckedDeliver(message) self.uncheckedDeliver(message, engine, network)

View File

@ -5,7 +5,10 @@ export SchedulableEvent
func `<`*(self: SchedulableEvent, other: SchedulableEvent): bool = func `<`*(self: SchedulableEvent, other: SchedulableEvent): bool =
return self.time < other.time return self.time < other.time
proc `time=`*(self: SchedulableEvent, value: float): void {.error: "Cannot assign to `time` property of `SchedulableEvent`.".} proc cancel*(self: SchedulableEvent) =
## Cancels this event.
##
self.cancelled = true
method atScheduledTime*(self: SchedulableEvent, engine: EventDrivenEngine): void {.base.} = method atScheduledTime*(self: SchedulableEvent, engine: EventDrivenEngine): void {.base.} =
## Callback invoked by the event engine indicating that this event is due for execution. By ## Callback invoked by the event engine indicating that this event is due for execution. By

View File

@ -2,9 +2,11 @@ import std/heapqueue
import std/tables import std/tables
import std/sets import std/sets
import std/options import std/options
import std/random
export heapqueue export heapqueue
export option export option
export random
type type
SchedulableEvent* = ref object of RootObj SchedulableEvent* = ref object of RootObj
@ -12,13 +14,14 @@ type
## at a well-defined point in simuliation time. ## at a well-defined point in simuliation time.
## ##
time*: uint64 time*: uint64
cancelled*: bool
type type
EventDrivenEngine* = ref object of RootObj EventDrivenEngine* = ref object of RootObj
## An `EventDrivenEngine` is a simple simulation engine that executes events in the order of their ## An `EventDrivenEngine` is a simple simulation engine that executes events in the order of their
## scheduled time. ## scheduled time.
## ##
current_time*: uint64 currentTime*: uint64
queue*: HeapQueue[SchedulableEvent] queue*: HeapQueue[SchedulableEvent]
type type

View File

@ -21,7 +21,7 @@ suite "event driven engine tests":
for time in times.sorted: for time in times.sorted:
let result = engine.nextStep().get() let result = engine.nextStep().get()
check(result.time == engine.current_time) check(result.time == engine.currentTime)
check(engine.nextStep().isNone) check(engine.nextStep().isNone)
@ -32,14 +32,13 @@ suite "event driven engine tests":
let engine = EventDrivenEngine() let engine = EventDrivenEngine()
let handles = schedulables.map(schedulable => engine.awaitableSchedule(schedulable)) let handles = schedulables.map(schedulable => engine.awaitableSchedule(schedulable))
check(engine.current_time == 0) check(engine.currentTime == 0)
handles[4].doAwait() handles[4].doAwait()
check(engine.current_time == 5) check(engine.currentTime == 5)
handles[7].doAwait() handles[7].doAwait()
check(engine.current_time == 8) check(engine.currentTime == 8)
test "should not allow schedulables to be scheduled in the past": test "should not allow schedulables to be scheduled in the past":
let e1 = SchedulableEvent(time: 10) let e1 = SchedulableEvent(time: 10)
@ -52,6 +51,20 @@ suite "event driven engine tests":
expect(Defect): expect(Defect):
engine.schedule(e2) engine.schedule(e2)
test "should allow clients to cancel scheduled events":
let e1 = SchedulableEvent(time: 8)
let e2 = SchedulableEvent(time: 10)
let engine = EventDrivenEngine()
let e1Handle = engine.awaitableSchedule(e1)
let e2Handle = engine.awaitableSchedule(e2)
e1Handle.schedulable.cancel()
e2Handle.doAwait()
check(engine.currentTime == 10)

View File

@ -10,7 +10,8 @@ type
FakeProtocol = ref object of Protocol FakeProtocol = ref object of Protocol
received: bool received: bool
method uncheckedDeliver(self: FakeProtocol, message: Message) = method uncheckedDeliver(self: FakeProtocol, message: Message,
engine: EventDrivenEngine, network: Network) =
self.received = true self.received = true
proc getFakeProtocol(peer: Peer, protocolId: string): FakeProtocol = proc getFakeProtocol(peer: Peer, protocolId: string): FakeProtocol =