From 1673dd806cc1df02f306101e438cd7dc0ce60887 Mon Sep 17 00:00:00 2001 From: gmega Date: Thu, 17 Aug 2023 19:16:03 -0300 Subject: [PATCH] add cancellation support to scheduled events --- swarmsim/engine/eventdrivenengine.nim | 24 +++++++++++---------- swarmsim/engine/network.nim | 8 ++++--- swarmsim/engine/peer.nim | 6 ++++-- swarmsim/engine/protocol.nim | 15 ++++++++++--- swarmsim/engine/schedulableevent.nim | 5 ++++- swarmsim/engine/types.nim | 5 ++++- tests/swarmsim/engine/eventdrivenengine.nim | 23 +++++++++++++++----- tests/swarmsim/engine/network.nim | 3 ++- 8 files changed, 62 insertions(+), 27 deletions(-) diff --git a/swarmsim/engine/eventdrivenengine.nim b/swarmsim/engine/eventdrivenengine.nim index 8b5757d..4797f33 100644 --- a/swarmsim/engine/eventdrivenengine.nim +++ b/swarmsim/engine/eventdrivenengine.nim @@ -9,15 +9,15 @@ export EventDrivenEngine type AwaitableHandle* = object of RootObj - schedulable: SchedulableEvent + schedulable*: SchedulableEvent engine: EventDrivenEngine -proc current_time*(self: EventDrivenEngine): uint64 {.inline.} = self.current_time +proc currentTime*(self: EventDrivenEngine): uint64 {.inline.} = self.currentTime proc schedule*(self: EventDrivenEngine, schedulable: SchedulableEvent): void = - if schedulable.time < self.current_time: + if schedulable.time < self.currentTime: raise (ref Defect)( - msg: fmt"Cannot schedule an event in the past ({schedulable.time}) < ({self.current_time})") + msg: fmt"Cannot schedule an event in the past ({schedulable.time}) < ({self.currentTime})") self.queue.push(schedulable) proc awaitableSchedule*(self: EventDrivenEngine, schedulable: SchedulableEvent): AwaitableHandle = @@ -29,17 +29,19 @@ proc scheduleAll*[T: SchedulableEvent](self: EventDrivenEngine, schedulables: se self.schedule(schedulable) proc nextStep*(self: EventDrivenEngine): Option[SchedulableEvent] = - if len(self.queue) == 0: - return none(SchedulableEvent) - let schedulable = self.queue.pop() - self.current_time = schedulable.time - schedulable.atScheduledTime(engine = self) + while len(self.queue) > 0: + let schedulable = self.queue.pop() + self.currentTime = schedulable.time - some(schedulable) + if not schedulable.cancelled: + schedulable.atScheduledTime(engine = self) + return some(schedulable) + + return none(SchedulableEvent) proc runUntil*(self: EventDrivenEngine, until: uint64): void = - while self.nextStep().isSome and self.current_time < until: + while self.nextStep().isSome and self.currentTime < until: discard proc run*(self: EventDrivenEngine): void = diff --git a/swarmsim/engine/network.nim b/swarmsim/engine/network.nim index 6b70ffe..949f438 100644 --- a/swarmsim/engine/network.nim +++ b/swarmsim/engine/network.nim @@ -13,6 +13,7 @@ export Network type ScheduledMessage = ref object of SchedulableEvent + network: Network message: Message proc new*( @@ -41,10 +42,11 @@ proc send*(self: Network, message: Message, self.engine.awaitableSchedule( ScheduledMessage( - time: self.engine.current_time + delay, - message: message + time: self.engine.currentTime + delay, + message: message, + network: self ) ) method atScheduledTime*(self: ScheduledMessage, engine: EventDrivenEngine) = - self.message.receiver.deliver(self.message) + self.message.receiver.deliver(self.message, engine, self.network) diff --git a/swarmsim/engine/peer.nim b/swarmsim/engine/peer.nim index 1df9dc2..4de5fdd 100644 --- a/swarmsim/engine/peer.nim +++ b/swarmsim/engine/peer.nim @@ -4,10 +4,12 @@ import std/random import ./types import ./protocol +import ./eventdrivenengine export options export tables export protocol +export eventdrivenengine export Peer proc getProtocol*(self: Peer, protocolId: string): Option[Protocol] = @@ -16,9 +18,9 @@ proc getProtocol*(self: Peer, protocolId: string): Option[Protocol] = none(Protocol) -proc deliver*(self: Peer, message: Message): void = +proc deliver*(self: Peer, message: Message, engine: EventDrivenEngine, network: Network): void = self.getProtocol(message.messageType).map( - proc (p: Protocol): void = p.deliver(message)) + proc (p: Protocol): void = p.deliver(message, engine, network)) proc initPeer(self: Peer, protocols: seq[Protocol]): Peer = # XXX integer indexes or an enum would be better, but this is easier diff --git a/swarmsim/engine/protocol.nim b/swarmsim/engine/protocol.nim index 1427cfe..91bcecb 100644 --- a/swarmsim/engine/protocol.nim +++ b/swarmsim/engine/protocol.nim @@ -1,12 +1,21 @@ import ./types import ./message +import ./network +import ./eventdrivenengine export message +export network +export eventdrivenengine export Protocol -method uncheckedDeliver(self: Protocol, message: Message): void {.base.} = +method uncheckedDeliver( + self: Protocol, + message: Message, + engine: EventDrivenEngine, + network: Network +): void {.base.} = raise newException(CatchableError, "Method without implementation override") -proc deliver*(self: Protocol, message: Message): void = +proc deliver*(self: Protocol, message: Message, engine: EventDrivenEngine, network: Network): void = assert(self.messageType == message.messageType) - self.uncheckedDeliver(message) + self.uncheckedDeliver(message, engine, network) diff --git a/swarmsim/engine/schedulableevent.nim b/swarmsim/engine/schedulableevent.nim index fea42b9..4026f5f 100644 --- a/swarmsim/engine/schedulableevent.nim +++ b/swarmsim/engine/schedulableevent.nim @@ -5,7 +5,10 @@ export SchedulableEvent func `<`*(self: SchedulableEvent, other: SchedulableEvent): bool = return self.time < other.time -proc `time=`*(self: SchedulableEvent, value: float): void {.error: "Cannot assign to `time` property of `SchedulableEvent`.".} +proc cancel*(self: SchedulableEvent) = + ## Cancels this event. + ## + self.cancelled = true method atScheduledTime*(self: SchedulableEvent, engine: EventDrivenEngine): void {.base.} = ## Callback invoked by the event engine indicating that this event is due for execution. By diff --git a/swarmsim/engine/types.nim b/swarmsim/engine/types.nim index 30b2723..e0131e7 100644 --- a/swarmsim/engine/types.nim +++ b/swarmsim/engine/types.nim @@ -2,9 +2,11 @@ import std/heapqueue import std/tables import std/sets import std/options +import std/random export heapqueue export option +export random type SchedulableEvent* = ref object of RootObj @@ -12,13 +14,14 @@ type ## at a well-defined point in simuliation time. ## time*: uint64 + cancelled*: bool type EventDrivenEngine* = ref object of RootObj ## An `EventDrivenEngine` is a simple simulation engine that executes events in the order of their ## scheduled time. ## - current_time*: uint64 + currentTime*: uint64 queue*: HeapQueue[SchedulableEvent] type diff --git a/tests/swarmsim/engine/eventdrivenengine.nim b/tests/swarmsim/engine/eventdrivenengine.nim index 1021eac..bfbd8da 100644 --- a/tests/swarmsim/engine/eventdrivenengine.nim +++ b/tests/swarmsim/engine/eventdrivenengine.nim @@ -21,7 +21,7 @@ suite "event driven engine tests": for time in times.sorted: let result = engine.nextStep().get() - check(result.time == engine.current_time) + check(result.time == engine.currentTime) check(engine.nextStep().isNone) @@ -32,14 +32,13 @@ suite "event driven engine tests": let engine = EventDrivenEngine() let handles = schedulables.map(schedulable => engine.awaitableSchedule(schedulable)) - check(engine.current_time == 0) + check(engine.currentTime == 0) handles[4].doAwait() - check(engine.current_time == 5) + check(engine.currentTime == 5) handles[7].doAwait() - check(engine.current_time == 8) - + check(engine.currentTime == 8) test "should not allow schedulables to be scheduled in the past": let e1 = SchedulableEvent(time: 10) @@ -52,6 +51,20 @@ suite "event driven engine tests": expect(Defect): engine.schedule(e2) + test "should allow clients to cancel scheduled events": + let e1 = SchedulableEvent(time: 8) + let e2 = SchedulableEvent(time: 10) + + let engine = EventDrivenEngine() + + let e1Handle = engine.awaitableSchedule(e1) + let e2Handle = engine.awaitableSchedule(e2) + + e1Handle.schedulable.cancel() + e2Handle.doAwait() + + check(engine.currentTime == 10) + diff --git a/tests/swarmsim/engine/network.nim b/tests/swarmsim/engine/network.nim index 6f83989..8e65ba0 100644 --- a/tests/swarmsim/engine/network.nim +++ b/tests/swarmsim/engine/network.nim @@ -10,7 +10,8 @@ type FakeProtocol = ref object of Protocol received: bool -method uncheckedDeliver(self: FakeProtocol, message: Message) = +method uncheckedDeliver(self: FakeProtocol, message: Message, + engine: EventDrivenEngine, network: Network) = self.received = true proc getFakeProtocol(peer: Peer, protocolId: string): FakeProtocol =