add basic simulator abstractions + tests

This commit is contained in:
gmega 2023-08-17 10:56:08 -03:00
parent 463d7d1926
commit 7dde9f3f7f
16 changed files with 380 additions and 73 deletions

View File

@ -0,0 +1,14 @@
import std/random
import std/math
type Distribution = proc(): float
proc unitUniform(): float =
## Uniform distribution on [0, 1]. Used as a building block for inverse transform
## samplers (https://en.wikipedia.org/wiki/Inverse_transform_sampling), as well as
## for the scaled uniform distribution.
rand(1.float)
proc Exp*(lambda: float, unitUniform = unitUniform): Distribution =
## Returns an exponential `Distribution` with parameter lambda.
proc(): float = -ln(1 - unitUniform()) / lambda

View File

@ -0,0 +1,50 @@
import std/options
import std/strformat
import ./types
import ./schedulableevent
type
AwaitableHandle = object of RootObj
schedulable: SchedulableEvent
engine: EventDrivenEngine
proc current_time*(self: EventDrivenEngine): uint64 {.inline.} = self.current_time
proc schedule*(self: EventDrivenEngine, schedulable: SchedulableEvent): void =
if schedulable.time < self.current_time:
raise (ref Defect)(
msg: fmt"Cannot schedule an event in the past ({schedulable.time}) < ({self.current_time})")
self.queue.push(schedulable)
proc awaitableSchedule*(self: EventDrivenEngine, schedulable: SchedulableEvent): AwaitableHandle =
self.schedule(schedulable)
AwaitableHandle(schedulable: schedulable, engine: self)
proc scheduleAll*[T: SchedulableEvent](self: EventDrivenEngine, schedulables: seq[T]): void =
for schedulable in schedulables:
self.schedule(schedulable)
proc nextStep*(self: EventDrivenEngine): Option[SchedulableEvent] =
if len(self.queue) == 0:
return none(SchedulableEvent)
let schedulable = self.queue.pop()
self.current_time = schedulable.time
schedulable.atScheduledTime(engine = self)
some(schedulable)
proc runUntil*(self: EventDrivenEngine, until: uint64): void =
while self.nextStep().isSome and self.current_time < until:
discard
proc run*(self: EventDrivenEngine): void =
self.runUntil(high(uint64))
proc doAwait*(self: AwaitableHandle): void =
self.engine.runUntil(self.schedulable.time)
export EventDrivenEngine
export AwaitableHandle
export options

View File

@ -0,0 +1,9 @@
import std/options
import ./types
proc new*(T: type Message, sender: Option[Peer] = none(Peer), receiver: Peer, messageType: string): Message =
Message(sender: sender, receiver: receiver, messageType: messageType)
export Message
export options

View File

@ -0,0 +1,50 @@
import std/options
import std/sets
import ./types
import ./peer
import ./eventdrivenengine
type
ScheduledMessage = ref object of SchedulableEvent
message: Message
proc new*(
T: type Network,
engine: EventDrivenEngine,
defaultLinkDelay: uint64 = 0
): T =
return Network(
engine: engine,
defaultLinkDelay: defaultLinkDelay,
peers: HashSet[Peer]()
)
proc add*(self: Network, peer: Peer): void =
# TODO: this can be very slow if the array keeps being resized, but for
# now I won't care much.
self.peers.incl(peer)
proc remove*(self: Network, peer: Peer) =
self.peers.excl(peer)
proc send*(self: Network, message: Message,
linkDelay: Option[uint64] = none(uint64)): AwaitableHandle =
let delay = linkDelay.get(self.defaultLinkDelay)
self.engine.awaitableSchedule(
ScheduledMessage(
time: self.engine.current_time + delay,
message: message
)
)
method atScheduledTime*(self: ScheduledMessage, engine: EventDrivenEngine) =
self.message.receiver.deliver(self.message)
export Network
export peer
export eventdrivenengine
export options
export sets

46
swarmsim/engine/peer.nim Normal file
View File

@ -0,0 +1,46 @@
import std/tables
import std/options
import std/random
import ./types
import ./protocol
## TODO a "readonly" pragma could probably be my first macro
proc `peerId=`*(self: Peer, id: int): void {.error: "Cannot assign to `peerId` property of `Peer`.".}
proc `protocols=`*(self: Peer, value: Table[string, Protocol]): void {.error: "Cannot assign to `protocols` property of `Peer`.".}
proc `protocols`*(self: Peer): Table[string, Protocol] {.error: "Cannot read from `protocols` property of `Peer`.".}
proc getProtocol*(self: Peer, protocolId: string): Option[Protocol] =
if self.protocols.hasKey(protocolId):
return self.protocols[protocolId].some
none(Protocol)
proc deliver*(self: Peer, message: Message): void =
self.getProtocol(message.messageType).map(
proc (p: Protocol): void = p.deliver(message))
proc initPeer(self: Peer, protocols: seq[Protocol]): Peer =
# XXX integer indexes or an enum would be better, but this is easier
for protocol in protocols:
self.protocols[protocol.messageType] = protocol
self
proc hash*(self: Peer): int = self.peerId
proc new*(
T: type Peer,
protocols: seq[Protocol],
peerId: Option[int] = none(int)
): Peer =
# XXX I can't have put this in the init proc as that would mean allowing public
# write access to every field in Peer. Not sure how to solve this in nim.
let peerId = peerId.get(rand(high(int)))
initPeer(Peer(protocols: initTable[string, Protocol](), peerId: peerId), protocols)
export Peer
export options
export tables

View File

@ -0,0 +1,10 @@
import ./types
method uncheckedDeliver(self: Protocol, message: Message): void {.base.} =
raise newException(CatchableError, "Method without implementation override")
proc deliver*(self: Protocol, message: Message): void =
assert(self.messageType == message.messageType)
self.uncheckedDeliver(message)
export Protocol

View File

@ -1,8 +1,10 @@
import pkg/swarmsim/types
import ./types
func `<`*(self: SchedulableEvent, other: SchedulableEvent): bool =
return self.time < other.time
proc `time=`*(self: SchedulableEvent, value: float): void {.error: "Cannot assign to `time` property of `SchedulableEvent`.".}
method atScheduledTime*(self: SchedulableEvent, engine: EventDrivenEngine): void {.base.} =
## Callback invoked by the event engine indicating that this event is due for execution. By
## default, it does nothing.

49
swarmsim/engine/types.nim Normal file
View File

@ -0,0 +1,49 @@
import std/heapqueue
import std/tables
import std/sets
import std/options
type
SchedulableEvent* = ref object of RootObj
## A `SchedulableEvent` is an event that can be scheduled for execution in an `EventDrivenEngine`
## at a well-defined point in simuliation time.
##
time*: uint64
type
EventDrivenEngine* = ref object of RootObj
## An `EventDrivenEngine` is a simple simulation engine that executes events in the order of their
## scheduled time.
##
current_time*: uint64
queue*: HeapQueue[SchedulableEvent]
type
Protocol* = ref object of RootObj
## A `Protocol` defines a P2P protocol. It handles messages meant for it, keeps internal state,
## and may expose services to other `Protocol`s within the same `Peer`.
messageType*: string ## "Type" of the message accepted by this protocol.
type
Peer* = ref object of RootObj
peerId*: int
protocols*: Table[string, Protocol]
type
Message* = ref object of RootObj
## A `Message` is a piece of data that is sent over the network. Its meaning is typically
## protocol-specific.
messageType*: string
sender*: Option[Peer]
receiver*: Peer
type
Network* = ref object of RootObj
## A `Network` is a collection of `Peer`s that can communicate with each other.
##
engine*: EventDrivenEngine
defaultLinkDelay*: uint64
peers*: HashSet[Peer] # TODO: use an array
export heapqueue
export option

View File

@ -1,31 +0,0 @@
import std/options
import pkg/swarmsim/types
import pkg/swarmsim/schedulableevent
proc current_time*(self: EventDrivenEngine): uint64 {.inline.} = self.current_time
proc schedule*(self: EventDrivenEngine, schedulable: SchedulableEvent): EventDrivenEngine =
self.queue.push(schedulable)
self
proc scheduleAll*[T: SchedulableEvent](self: EventDrivenEngine, schedulables: seq[T]): void =
for schedulable in schedulables:
discard self.schedule(schedulable)
proc nextStep*(self: EventDrivenEngine): Option[SchedulableEvent] =
if len(self.queue) == 0:
return none(SchedulableEvent)
let schedulable = self.queue.pop()
self.current_time = schedulable.time
schedulable.atScheduledTime(engine = self)
some(schedulable)
proc run*(self: EventDrivenEngine): void =
while self.nextStep().isSome:
discard
export EventDrivenEngine
export options

View File

@ -1,15 +0,0 @@
import std/heapqueue
type
SchedulableEvent* = ref object of RootObj
## A `SchedulableEvent` is an event that can be scheduled for execution in an `EventDrivenEngine`
## at a well-defined point in simuliation time.
##
time*: uint64
type
EventDrivenEngine* = ref object of RootObj
current_time*: uint64
queue*: HeapQueue[SchedulableEvent]
export heapqueue

View File

@ -1 +1,7 @@
import ./swarmsim/eventdrivenengine
import ./swarmsim/engine/eventdrivenengine
import ./swarmsim/engine/schedulableevent
import ./swarmsim/engine/network
import ./swarmsim/engine/peer
{.warning[UnusedImport]: off.}

View File

@ -0,0 +1,58 @@
import unittest
import sequtils
import sugar
import std/algorithm
import std/tables
import pkg/swarmsim/engine/schedulableevent
import pkg/swarmsim/engine/eventdrivenengine
suite "event driven engine tests":
test "should run schedulables at the right time":
let times = @[1, 10, 5].map(time => uint64(time))
let schedulables = times.map(time => SchedulableEvent(time: time))
let engine = EventDrivenEngine()
engine.scheduleAll(schedulables)
for time in times.sorted:
let result = engine.nextStep().get()
check(result.time == engine.current_time)
check(engine.nextStep().isNone)
test "should allow clients to wait until a scheduled event happens":
let times = @[1, 2, 3, 4, 5, 6, 7, 8]
let schedulables = times.map(time => SchedulableEvent(time: uint64(time)))
let engine = EventDrivenEngine()
let handles = schedulables.map(schedulable => engine.awaitableSchedule(schedulable))
check(engine.current_time == 0)
handles[4].doAwait()
check(engine.current_time == 5)
handles[7].doAwait()
check(engine.current_time == 8)
test "should not allow schedulables to be scheduled in the past":
let e1 = SchedulableEvent(time: 10)
let e2 = SchedulableEvent(time: 8)
let engine = EventDrivenEngine()
engine.schedule(e1)
discard engine.nextStep()
expect(Defect):
engine.schedule(e2)

View File

@ -0,0 +1,53 @@
import unittest
import pkg/swarmsim/engine/eventdrivenengine
import pkg/swarmsim/engine/network
import pkg/swarmsim/engine/peer
import pkg/swarmsim/engine/message
import pkg/swarmsim/engine/protocol
type
FakeProtocol = ref object of Protocol
received: bool
method uncheckedDeliver(self: FakeProtocol, message: Message) =
self.received = true
proc getFakeProtocol(peer: Peer, protocolId: string): FakeProtocol =
let protocol = peer.getProtocol(protocolId)
check(protocol.isSome)
return FakeProtocol(protocol.get())
suite "network":
test "should dispatch message to the correct protocol within a peer":
let engine = EventDrivenEngine()
var protocols: seq[Protocol] = newSeq[Protocol]()
protocols.add(FakeProtocol(messageType: "protocol1", received: false))
protocols.add(FakeProtocol(messageType: "protocol2", received: false))
let peer = Peer.new(protocols = protocols)
let network = Network.new(engine = engine, defaultLinkDelay = 20)
network.add(peer)
let m1 = Message.new(receiver = peer, messageType = "protocol1")
let m2 = Message.new(receiver = peer, messageType = "protocol2")
let message2handle = network.send(m2, linkDelay = uint64(10).some)
let message1handle = network.send(m1, linkDelay = uint64(5).some)
check(not peer.getFakeProtocol("protocol1").received)
check(not peer.getFakeProtocol("protocol2").received)
message1Handle.doAwait()
check(peer.getFakeProtocol("protocol1").received)
check(not peer.getFakeProtocol("protocol2").received)
message2Handle.doAwait()
check(peer.getFakeProtocol("protocol1").received)
check(peer.getFakeProtocol("protocol2").received)

View File

@ -0,0 +1,20 @@
import std/unittest
import std/sets
import pkg/swarmsim/engine/peer
suite "peer":
test "should allow inclusion and membership tests on a HashSet":
var peerSet = HashSet[Peer]()
let p1 = Peer.new(protocols = @[], peerId = 1.some)
let p2 = Peer.new(protocols = @[], peerId = 2.some)
peerSet.incl(p1)
check(peerSet.contains(p1))
check(not peerSet.contains(p2))
peerSet.excl(p1)
check(not peerSet.contains(p1))

View File

@ -0,0 +1,11 @@
import unittest
import pkg/swarmsim/engine/schedulableevent
suite "schedulable event":
test "should be ordered by time":
let e1 = SchedulableEvent(time: 1)
let e2 = SchedulableEvent(time: 3)
check(e1 < e2)
check(not (e2 < e1))

View File

@ -1,25 +0,0 @@
import unittest
import sequtils
import sugar
import std/algorithm
import pkg/swarmsim/schedulableevent
import pkg/swarmsim/eventdrivenengine
suite "event driven engine tests":
test "should run schedulables at the right time":
let times = @[1, 10, 5].map(time => uint64(time))
let schedulables = times.map(time => SchedulableEvent(time: time))
let engine = EventDrivenEngine()
engine.scheduleAll(schedulables)
for time in times.sorted:
let result = engine.nextStep().get()
check(result.time == engine.current_time)
check(engine.nextStep().isNone)