diff --git a/swarmsim/distributions.nim b/swarmsim/distributions.nim new file mode 100644 index 0000000..bf39343 --- /dev/null +++ b/swarmsim/distributions.nim @@ -0,0 +1,14 @@ +import std/random +import std/math + +type Distribution = proc(): float + +proc unitUniform(): float = + ## Uniform distribution on [0, 1]. Used as a building block for inverse transform + ## samplers (https://en.wikipedia.org/wiki/Inverse_transform_sampling), as well as + ## for the scaled uniform distribution. + rand(1.float) + +proc Exp*(lambda: float, unitUniform = unitUniform): Distribution = + ## Returns an exponential `Distribution` with parameter lambda. + proc(): float = -ln(1 - unitUniform()) / lambda diff --git a/swarmsim/engine/eventdrivenengine.nim b/swarmsim/engine/eventdrivenengine.nim new file mode 100644 index 0000000..1295f0c --- /dev/null +++ b/swarmsim/engine/eventdrivenengine.nim @@ -0,0 +1,50 @@ +import std/options +import std/strformat + +import ./types +import ./schedulableevent + +type + AwaitableHandle = object of RootObj + schedulable: SchedulableEvent + engine: EventDrivenEngine + +proc current_time*(self: EventDrivenEngine): uint64 {.inline.} = self.current_time + +proc schedule*(self: EventDrivenEngine, schedulable: SchedulableEvent): void = + if schedulable.time < self.current_time: + raise (ref Defect)( + msg: fmt"Cannot schedule an event in the past ({schedulable.time}) < ({self.current_time})") + self.queue.push(schedulable) + +proc awaitableSchedule*(self: EventDrivenEngine, schedulable: SchedulableEvent): AwaitableHandle = + self.schedule(schedulable) + AwaitableHandle(schedulable: schedulable, engine: self) + +proc scheduleAll*[T: SchedulableEvent](self: EventDrivenEngine, schedulables: seq[T]): void = + for schedulable in schedulables: + self.schedule(schedulable) + +proc nextStep*(self: EventDrivenEngine): Option[SchedulableEvent] = + if len(self.queue) == 0: + return none(SchedulableEvent) + + let schedulable = self.queue.pop() + self.current_time = schedulable.time + schedulable.atScheduledTime(engine = self) + + some(schedulable) + +proc runUntil*(self: EventDrivenEngine, until: uint64): void = + while self.nextStep().isSome and self.current_time < until: + discard + +proc run*(self: EventDrivenEngine): void = + self.runUntil(high(uint64)) + +proc doAwait*(self: AwaitableHandle): void = + self.engine.runUntil(self.schedulable.time) + +export EventDrivenEngine +export AwaitableHandle +export options diff --git a/swarmsim/engine/message.nim b/swarmsim/engine/message.nim new file mode 100644 index 0000000..43f52b5 --- /dev/null +++ b/swarmsim/engine/message.nim @@ -0,0 +1,9 @@ +import std/options + +import ./types + +proc new*(T: type Message, sender: Option[Peer] = none(Peer), receiver: Peer, messageType: string): Message = + Message(sender: sender, receiver: receiver, messageType: messageType) + +export Message +export options diff --git a/swarmsim/engine/network.nim b/swarmsim/engine/network.nim new file mode 100644 index 0000000..d113737 --- /dev/null +++ b/swarmsim/engine/network.nim @@ -0,0 +1,50 @@ +import std/options +import std/sets + +import ./types +import ./peer +import ./eventdrivenengine + +type + ScheduledMessage = ref object of SchedulableEvent + message: Message + +proc new*( + T: type Network, + engine: EventDrivenEngine, + defaultLinkDelay: uint64 = 0 +): T = + return Network( + engine: engine, + defaultLinkDelay: defaultLinkDelay, + peers: HashSet[Peer]() + ) + +proc add*(self: Network, peer: Peer): void = + # TODO: this can be very slow if the array keeps being resized, but for + # now I won't care much. + self.peers.incl(peer) + +proc remove*(self: Network, peer: Peer) = + self.peers.excl(peer) + +proc send*(self: Network, message: Message, + linkDelay: Option[uint64] = none(uint64)): AwaitableHandle = + + let delay = linkDelay.get(self.defaultLinkDelay) + + self.engine.awaitableSchedule( + ScheduledMessage( + time: self.engine.current_time + delay, + message: message + ) + ) + +method atScheduledTime*(self: ScheduledMessage, engine: EventDrivenEngine) = + self.message.receiver.deliver(self.message) + +export Network +export peer +export eventdrivenengine +export options +export sets diff --git a/swarmsim/engine/peer.nim b/swarmsim/engine/peer.nim new file mode 100644 index 0000000..35c2ba6 --- /dev/null +++ b/swarmsim/engine/peer.nim @@ -0,0 +1,46 @@ +import std/tables +import std/options +import std/random + +import ./types +import ./protocol + +## TODO a "readonly" pragma could probably be my first macro +proc `peerId=`*(self: Peer, id: int): void {.error: "Cannot assign to `peerId` property of `Peer`.".} + +proc `protocols=`*(self: Peer, value: Table[string, Protocol]): void {.error: "Cannot assign to `protocols` property of `Peer`.".} + +proc `protocols`*(self: Peer): Table[string, Protocol] {.error: "Cannot read from `protocols` property of `Peer`.".} + +proc getProtocol*(self: Peer, protocolId: string): Option[Protocol] = + if self.protocols.hasKey(protocolId): + return self.protocols[protocolId].some + + none(Protocol) + +proc deliver*(self: Peer, message: Message): void = + self.getProtocol(message.messageType).map( + proc (p: Protocol): void = p.deliver(message)) + +proc initPeer(self: Peer, protocols: seq[Protocol]): Peer = + # XXX integer indexes or an enum would be better, but this is easier + for protocol in protocols: + self.protocols[protocol.messageType] = protocol + + self + +proc hash*(self: Peer): int = self.peerId + +proc new*( + T: type Peer, + protocols: seq[Protocol], + peerId: Option[int] = none(int) +): Peer = + # XXX I can't have put this in the init proc as that would mean allowing public + # write access to every field in Peer. Not sure how to solve this in nim. + let peerId = peerId.get(rand(high(int))) + initPeer(Peer(protocols: initTable[string, Protocol](), peerId: peerId), protocols) + +export Peer +export options +export tables diff --git a/swarmsim/engine/protocol.nim b/swarmsim/engine/protocol.nim new file mode 100644 index 0000000..825309e --- /dev/null +++ b/swarmsim/engine/protocol.nim @@ -0,0 +1,10 @@ +import ./types + +method uncheckedDeliver(self: Protocol, message: Message): void {.base.} = + raise newException(CatchableError, "Method without implementation override") + +proc deliver*(self: Protocol, message: Message): void = + assert(self.messageType == message.messageType) + self.uncheckedDeliver(message) + +export Protocol diff --git a/swarmsim/schedulableevent.nim b/swarmsim/engine/schedulableevent.nim similarity index 71% rename from swarmsim/schedulableevent.nim rename to swarmsim/engine/schedulableevent.nim index c1fc844..739e659 100644 --- a/swarmsim/schedulableevent.nim +++ b/swarmsim/engine/schedulableevent.nim @@ -1,8 +1,10 @@ -import pkg/swarmsim/types +import ./types func `<`*(self: SchedulableEvent, other: SchedulableEvent): bool = return self.time < other.time +proc `time=`*(self: SchedulableEvent, value: float): void {.error: "Cannot assign to `time` property of `SchedulableEvent`.".} + method atScheduledTime*(self: SchedulableEvent, engine: EventDrivenEngine): void {.base.} = ## Callback invoked by the event engine indicating that this event is due for execution. By ## default, it does nothing. diff --git a/swarmsim/engine/types.nim b/swarmsim/engine/types.nim new file mode 100644 index 0000000..6fc3466 --- /dev/null +++ b/swarmsim/engine/types.nim @@ -0,0 +1,49 @@ +import std/heapqueue +import std/tables +import std/sets +import std/options + +type + SchedulableEvent* = ref object of RootObj + ## A `SchedulableEvent` is an event that can be scheduled for execution in an `EventDrivenEngine` + ## at a well-defined point in simuliation time. + ## + time*: uint64 + +type + EventDrivenEngine* = ref object of RootObj + ## An `EventDrivenEngine` is a simple simulation engine that executes events in the order of their + ## scheduled time. + ## + current_time*: uint64 + queue*: HeapQueue[SchedulableEvent] + +type + Protocol* = ref object of RootObj + ## A `Protocol` defines a P2P protocol. It handles messages meant for it, keeps internal state, + ## and may expose services to other `Protocol`s within the same `Peer`. + messageType*: string ## "Type" of the message accepted by this protocol. + +type + Peer* = ref object of RootObj + peerId*: int + protocols*: Table[string, Protocol] + +type + Message* = ref object of RootObj + ## A `Message` is a piece of data that is sent over the network. Its meaning is typically + ## protocol-specific. + messageType*: string + sender*: Option[Peer] + receiver*: Peer + +type + Network* = ref object of RootObj + ## A `Network` is a collection of `Peer`s that can communicate with each other. + ## + engine*: EventDrivenEngine + defaultLinkDelay*: uint64 + peers*: HashSet[Peer] # TODO: use an array + +export heapqueue +export option diff --git a/swarmsim/eventdrivenengine.nim b/swarmsim/eventdrivenengine.nim deleted file mode 100644 index 07c912e..0000000 --- a/swarmsim/eventdrivenengine.nim +++ /dev/null @@ -1,31 +0,0 @@ -import std/options - -import pkg/swarmsim/types -import pkg/swarmsim/schedulableevent - -proc current_time*(self: EventDrivenEngine): uint64 {.inline.} = self.current_time - -proc schedule*(self: EventDrivenEngine, schedulable: SchedulableEvent): EventDrivenEngine = - self.queue.push(schedulable) - self - -proc scheduleAll*[T: SchedulableEvent](self: EventDrivenEngine, schedulables: seq[T]): void = - for schedulable in schedulables: - discard self.schedule(schedulable) - -proc nextStep*(self: EventDrivenEngine): Option[SchedulableEvent] = - if len(self.queue) == 0: - return none(SchedulableEvent) - - let schedulable = self.queue.pop() - self.current_time = schedulable.time - schedulable.atScheduledTime(engine = self) - - some(schedulable) - -proc run*(self: EventDrivenEngine): void = - while self.nextStep().isSome: - discard - -export EventDrivenEngine -export options diff --git a/swarmsim/types.nim b/swarmsim/types.nim deleted file mode 100644 index 1db5e2c..0000000 --- a/swarmsim/types.nim +++ /dev/null @@ -1,15 +0,0 @@ -import std/heapqueue - -type - SchedulableEvent* = ref object of RootObj - ## A `SchedulableEvent` is an event that can be scheduled for execution in an `EventDrivenEngine` - ## at a well-defined point in simuliation time. - ## - time*: uint64 - -type - EventDrivenEngine* = ref object of RootObj - current_time*: uint64 - queue*: HeapQueue[SchedulableEvent] - -export heapqueue diff --git a/tests/all_tests.nim b/tests/all_tests.nim index e5a8080..4e79ee0 100644 --- a/tests/all_tests.nim +++ b/tests/all_tests.nim @@ -1 +1,7 @@ -import ./swarmsim/eventdrivenengine \ No newline at end of file +import ./swarmsim/engine/eventdrivenengine +import ./swarmsim/engine/schedulableevent +import ./swarmsim/engine/network +import ./swarmsim/engine/peer + +{.warning[UnusedImport]: off.} + diff --git a/tests/swarmsim/engine/eventdrivenengine.nim b/tests/swarmsim/engine/eventdrivenengine.nim new file mode 100644 index 0000000..1021eac --- /dev/null +++ b/tests/swarmsim/engine/eventdrivenengine.nim @@ -0,0 +1,58 @@ +import unittest +import sequtils +import sugar + +import std/algorithm +import std/tables + +import pkg/swarmsim/engine/schedulableevent +import pkg/swarmsim/engine/eventdrivenengine + +suite "event driven engine tests": + + test "should run schedulables at the right time": + + let times = @[1, 10, 5].map(time => uint64(time)) + let schedulables = times.map(time => SchedulableEvent(time: time)) + + let engine = EventDrivenEngine() + + engine.scheduleAll(schedulables) + + for time in times.sorted: + let result = engine.nextStep().get() + check(result.time == engine.current_time) + + check(engine.nextStep().isNone) + + test "should allow clients to wait until a scheduled event happens": + let times = @[1, 2, 3, 4, 5, 6, 7, 8] + let schedulables = times.map(time => SchedulableEvent(time: uint64(time))) + + let engine = EventDrivenEngine() + let handles = schedulables.map(schedulable => engine.awaitableSchedule(schedulable)) + + check(engine.current_time == 0) + + handles[4].doAwait() + check(engine.current_time == 5) + + handles[7].doAwait() + check(engine.current_time == 8) + + + test "should not allow schedulables to be scheduled in the past": + let e1 = SchedulableEvent(time: 10) + let e2 = SchedulableEvent(time: 8) + + let engine = EventDrivenEngine() + engine.schedule(e1) + discard engine.nextStep() + + expect(Defect): + engine.schedule(e2) + + + + + diff --git a/tests/swarmsim/engine/network.nim b/tests/swarmsim/engine/network.nim new file mode 100644 index 0000000..e9814e0 --- /dev/null +++ b/tests/swarmsim/engine/network.nim @@ -0,0 +1,53 @@ +import unittest + +import pkg/swarmsim/engine/eventdrivenengine +import pkg/swarmsim/engine/network +import pkg/swarmsim/engine/peer +import pkg/swarmsim/engine/message +import pkg/swarmsim/engine/protocol + +type + FakeProtocol = ref object of Protocol + received: bool + +method uncheckedDeliver(self: FakeProtocol, message: Message) = + self.received = true + +proc getFakeProtocol(peer: Peer, protocolId: string): FakeProtocol = + let protocol = peer.getProtocol(protocolId) + check(protocol.isSome) + return FakeProtocol(protocol.get()) + +suite "network": + test "should dispatch message to the correct protocol within a peer": + let engine = EventDrivenEngine() + + var protocols: seq[Protocol] = newSeq[Protocol]() + + protocols.add(FakeProtocol(messageType: "protocol1", received: false)) + protocols.add(FakeProtocol(messageType: "protocol2", received: false)) + + let peer = Peer.new(protocols = protocols) + let network = Network.new(engine = engine, defaultLinkDelay = 20) + + network.add(peer) + + let m1 = Message.new(receiver = peer, messageType = "protocol1") + let m2 = Message.new(receiver = peer, messageType = "protocol2") + + let message2handle = network.send(m2, linkDelay = uint64(10).some) + let message1handle = network.send(m1, linkDelay = uint64(5).some) + + check(not peer.getFakeProtocol("protocol1").received) + check(not peer.getFakeProtocol("protocol2").received) + + message1Handle.doAwait() + + check(peer.getFakeProtocol("protocol1").received) + check(not peer.getFakeProtocol("protocol2").received) + + message2Handle.doAwait() + + check(peer.getFakeProtocol("protocol1").received) + check(peer.getFakeProtocol("protocol2").received) + diff --git a/tests/swarmsim/engine/peer.nim b/tests/swarmsim/engine/peer.nim new file mode 100644 index 0000000..73f76da --- /dev/null +++ b/tests/swarmsim/engine/peer.nim @@ -0,0 +1,20 @@ +import std/unittest +import std/sets + +import pkg/swarmsim/engine/peer + +suite "peer": + test "should allow inclusion and membership tests on a HashSet": + var peerSet = HashSet[Peer]() + + let p1 = Peer.new(protocols = @[], peerId = 1.some) + let p2 = Peer.new(protocols = @[], peerId = 2.some) + + peerSet.incl(p1) + + check(peerSet.contains(p1)) + check(not peerSet.contains(p2)) + + peerSet.excl(p1) + + check(not peerSet.contains(p1)) diff --git a/tests/swarmsim/engine/schedulableevent.nim b/tests/swarmsim/engine/schedulableevent.nim new file mode 100644 index 0000000..5bfd762 --- /dev/null +++ b/tests/swarmsim/engine/schedulableevent.nim @@ -0,0 +1,11 @@ +import unittest + +import pkg/swarmsim/engine/schedulableevent + +suite "schedulable event": + test "should be ordered by time": + let e1 = SchedulableEvent(time: 1) + let e2 = SchedulableEvent(time: 3) + + check(e1 < e2) + check(not (e2 < e1)) diff --git a/tests/swarmsim/eventdrivenengine.nim b/tests/swarmsim/eventdrivenengine.nim deleted file mode 100644 index 1f809a2..0000000 --- a/tests/swarmsim/eventdrivenengine.nim +++ /dev/null @@ -1,25 +0,0 @@ -import unittest -import sequtils -import sugar - -import std/algorithm - -import pkg/swarmsim/schedulableevent -import pkg/swarmsim/eventdrivenengine - -suite "event driven engine tests": - - test "should run schedulables at the right time": - - let times = @[1, 10, 5].map(time => uint64(time)) - let schedulables = times.map(time => SchedulableEvent(time: time)) - - let engine = EventDrivenEngine() - - engine.scheduleAll(schedulables) - - for time in times.sorted: - let result = engine.nextStep().get() - check(result.time == engine.current_time) - - check(engine.nextStep().isNone)