From 7548c545f87fda02de2243672d4ee017c6e92f99 Mon Sep 17 00:00:00 2001 From: gmega Date: Fri, 18 Aug 2023 19:09:02 -0300 Subject: [PATCH] add dht tracker node, multiple engine feature improvements --- .github/workflows/run-tests.yml | 4 +- .gitignore | 3 +- swarmsim/codex/dhttracker.nim | 109 ++++++++++++++++++++ swarmsim/codex/types.nim | 0 swarmsim/distributions.nim | 5 +- swarmsim/engine/eventdrivenengine.nim | 28 ++++- swarmsim/engine/message.nim | 11 -- swarmsim/engine/network.nim | 4 +- swarmsim/engine/peer.nim | 6 +- swarmsim/engine/protocol.nim | 8 +- swarmsim/engine/schedulableevent.nim | 9 +- swarmsim/engine/types.nim | 23 +++-- swarmsim/timeutils.nim | 9 ++ tests/all_tests.nim | 1 + tests/swarmsim/codex/dhttracker.nim | 93 +++++++++++++++++ tests/swarmsim/engine/eventdrivenengine.nim | 44 ++++++-- tests/swarmsim/engine/network.nim | 5 +- 17 files changed, 307 insertions(+), 55 deletions(-) create mode 100644 swarmsim/codex/dhttracker.nim create mode 100644 swarmsim/codex/types.nim delete mode 100644 swarmsim/engine/message.nim create mode 100644 swarmsim/timeutils.nim create mode 100644 tests/swarmsim/codex/dhttracker.nim diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 1d01845..4edb4d1 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -5,12 +5,12 @@ on: [push, pull_request] jobs: run-tests: runs-on: ubuntu-latest - name: Ubuntu, Nim 1.6.12 + name: Ubuntu, Nim 2.0.0 steps: - uses: actions/checkout@v2 - uses: iffy/install-nim@v3 with: - version: 1.6.12 + version: 2.0.0 - name: Install Deps run: nimble install -y - name: Run tests diff --git a/.gitignore b/.gitignore index c74c5e0..acd019b 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ !*.* nimbledeps nimble.develop -nimble.paths \ No newline at end of file +nimble.paths +.vscode diff --git a/swarmsim/codex/dhttracker.nim b/swarmsim/codex/dhttracker.nim new file mode 100644 index 0000000..3a23ddc --- /dev/null +++ b/swarmsim/codex/dhttracker.nim @@ -0,0 +1,109 @@ +import ../engine/protocol +import ../engine/network +import ../engine/schedulableevent +import ../timeutils + +import std/times +import std/options +import std/algorithm +import std/tables +import sequtils + +export protocol +export options + +type + PeerDescriptor* = ref object of RootObj + peerId*: int + lastSeen*: uint64 + expiry: AwaitableHandle + +type ArrayShuffler = proc (arr: var seq[PeerDescriptor]): void + +type + DHTTracker* = ref object of Protocol + expiry*: Duration + maxPeers*: uint + peers: OrderedTable[int, PeerDescriptor] + shuffler: ArrayShuffler + + PeerAnnouncement* = ref object of Message + peerId*: int + + SampleSwarm* = ref object of Message + numPeers: uint + + PeerExpiry* = ref object of SchedulableEvent + peerId*: int + tracker: DHTTracker + +let RandomShuffler = proc (arr: var seq[PeerDescriptor]) = + discard arr.nextPermutation() + +proc messageType*(T: type DHTTracker): string = "DHTTracker" + +proc defaultExpiry*(T: type DHTTracker): Duration = 15.dminutes + +proc new*( + T: type DHTTracker, + maxPeers: uint, + shuffler: ArrayShuffler = RandomShuffler, + expiry: Duration = DHTTracker.defaultExpiry, +): DHTTracker = + DHTTracker( + # This should in general be safe as those are always positive. + expiry: expiry, + maxPeers: maxPeers, + shuffler: shuffler, + peers: initOrderedTable[int, PeerDescriptor](), + messageType: DHTTracker.messageType + ) + +proc peers*(self: DHTTracker): seq[PeerDescriptor] = self.peers.values.toSeq() + +proc oldestInsertion(self: DHTTracker): int = + # We maintain the invariant that the first element to have been inserted + # must be the oldest. We can therefore return the first element in the + # ordered table. + for peerId in self.peers.keys: + return peerId + +proc addPeer(self: DHTTracker, message: PeerAnnouncement, + engine: EventDrivenEngine) = + + let peerId = message.peerId + + if peerId in self.peers: + self.peers[peerId].expiry.schedulable.cancel() + + elif uint(len(self.peers)) == self.maxPeers: + let oldest = self.oldestInsertion() + self.peers[oldest].expiry.schedulable.cancel() + self.peers.del(oldest) + + let expiry = PeerExpiry( + peerId: message.peerId, + tracker: self, + time: engine.currentTime + uint64(self.expiry.inSeconds()) + ) + + let descriptor = PeerDescriptor( + peerId: message.peerId, + lastSeen: engine.currentTime, + expiry: engine.awaitableSchedule(expiry)) + + self.peers[peerId] = descriptor + +method atScheduledTime*(self: PeerExpiry, engine: EventDrivenEngine): void = + self.tracker.peers.del(self.peerId) + +proc sampleSwarm(self: DHTTracker, message: SampleSwarm, network: Network) = + discard + +method uncheckedDeliver*(self: DHTTracker, message: Message, + engine: EventDrivenEngine, network: Network) = + + if message of PeerAnnouncement: + self.addPeer(PeerAnnouncement(message), engine) + elif message of SampleSwarm: + self.sampleSwarm(SampleSwarm(message), network) diff --git a/swarmsim/codex/types.nim b/swarmsim/codex/types.nim new file mode 100644 index 0000000..e69de29 diff --git a/swarmsim/distributions.nim b/swarmsim/distributions.nim index bf39343..2aa7b9f 100644 --- a/swarmsim/distributions.nim +++ b/swarmsim/distributions.nim @@ -4,8 +4,9 @@ import std/math type Distribution = proc(): float proc unitUniform(): float = - ## Uniform distribution on [0, 1]. Used as a building block for inverse transform - ## samplers (https://en.wikipedia.org/wiki/Inverse_transform_sampling), as well as + ## Uniform distribution on [0, 1]. Used as a building block for inverse + ## transform samplers + ## (https://en.wikipedia.org/wiki/Inverse_transform_sampling), as well as ## for the scaled uniform distribution. rand(1.float) diff --git a/swarmsim/engine/eventdrivenengine.nim b/swarmsim/engine/eventdrivenengine.nim index 4797f33..ee444d1 100644 --- a/swarmsim/engine/eventdrivenengine.nim +++ b/swarmsim/engine/eventdrivenengine.nim @@ -1,10 +1,12 @@ import std/options import std/strformat +import std/times import ./types import ./schedulableevent export options +export times export EventDrivenEngine type @@ -17,33 +19,49 @@ proc currentTime*(self: EventDrivenEngine): uint64 {.inline.} = self.currentTime proc schedule*(self: EventDrivenEngine, schedulable: SchedulableEvent): void = if schedulable.time < self.currentTime: raise (ref Defect)( - msg: fmt"Cannot schedule an event in the past ({schedulable.time}) < ({self.currentTime})") + msg: "Cannot schedule an event in the past " & + fmt"({schedulable.time}) < ({self.currentTime})") self.queue.push(schedulable) -proc awaitableSchedule*(self: EventDrivenEngine, schedulable: SchedulableEvent): AwaitableHandle = +proc awaitableSchedule*(self: EventDrivenEngine, + schedulable: SchedulableEvent): AwaitableHandle = self.schedule(schedulable) AwaitableHandle(schedulable: schedulable, engine: self) -proc scheduleAll*[T: SchedulableEvent](self: EventDrivenEngine, schedulables: seq[T]): void = +proc scheduleAll*[T: SchedulableEvent](self: EventDrivenEngine, + schedulables: seq[T]): void = for schedulable in schedulables: self.schedule(schedulable) -proc nextStep*(self: EventDrivenEngine): Option[SchedulableEvent] = +proc stepUntil(self: EventDrivenEngine, + until: Option[uint64] = none(uint64)): Option[SchedulableEvent] = while len(self.queue) > 0: + if until.isSome and self.queue[0].time > until.get: + self.currentTime = until.get + return none(SchedulableEvent) + let schedulable = self.queue.pop() self.currentTime = schedulable.time if not schedulable.cancelled: schedulable.atScheduledTime(engine = self) + schedulable.completed = true + return some(schedulable) return none(SchedulableEvent) +proc nextStep*(self: EventDrivenEngine): Option[SchedulableEvent] = + self.stepUntil() + proc runUntil*(self: EventDrivenEngine, until: uint64): void = - while self.nextStep().isSome and self.currentTime < until: + while self.stepUntil(until.some).isSome and self.currentTime <= until: discard +proc runUntil*(self: EventDrivenEngine, until: Duration): void = + self.runUntil(uint64(until.inSeconds())) + proc run*(self: EventDrivenEngine): void = self.runUntil(high(uint64)) diff --git a/swarmsim/engine/message.nim b/swarmsim/engine/message.nim deleted file mode 100644 index ff67167..0000000 --- a/swarmsim/engine/message.nim +++ /dev/null @@ -1,11 +0,0 @@ -import std/options - -import ./types - -export options -export Message - -proc new*(T: type Message, sender: Option[Peer] = none(Peer), receiver: Peer, messageType: string): Message = - Message(sender: sender, receiver: receiver, messageType: messageType) - - diff --git a/swarmsim/engine/network.nim b/swarmsim/engine/network.nim index 949f438..aec3ac8 100644 --- a/swarmsim/engine/network.nim +++ b/swarmsim/engine/network.nim @@ -20,8 +20,8 @@ proc new*( T: type Network, engine: EventDrivenEngine, defaultLinkDelay: uint64 = 0 -): T = - return Network( +): Network = + Network( engine: engine, defaultLinkDelay: defaultLinkDelay, peers: HashSet[Peer]() diff --git a/swarmsim/engine/peer.nim b/swarmsim/engine/peer.nim index 4de5fdd..647c15a 100644 --- a/swarmsim/engine/peer.nim +++ b/swarmsim/engine/peer.nim @@ -1,6 +1,7 @@ import std/tables import std/options import std/random +import std/hashes import ./types import ./protocol @@ -18,7 +19,8 @@ proc getProtocol*(self: Peer, protocolId: string): Option[Protocol] = none(Protocol) -proc deliver*(self: Peer, message: Message, engine: EventDrivenEngine, network: Network): void = +proc deliver*(self: Peer, message: Message, engine: EventDrivenEngine, + network: Network): void = self.getProtocol(message.messageType).map( proc (p: Protocol): void = p.deliver(message, engine, network)) @@ -29,7 +31,7 @@ proc initPeer(self: Peer, protocols: seq[Protocol]): Peer = self -proc hash*(self: Peer): int = self.peerId +proc hash*(self: Peer): Hash = self.peerId.hash proc new*( T: type Peer, diff --git a/swarmsim/engine/protocol.nim b/swarmsim/engine/protocol.nim index 91bcecb..066b516 100644 --- a/swarmsim/engine/protocol.nim +++ b/swarmsim/engine/protocol.nim @@ -1,12 +1,9 @@ import ./types -import ./message -import ./network import ./eventdrivenengine -export message -export network export eventdrivenengine export Protocol +export Message method uncheckedDeliver( self: Protocol, @@ -16,6 +13,7 @@ method uncheckedDeliver( ): void {.base.} = raise newException(CatchableError, "Method without implementation override") -proc deliver*(self: Protocol, message: Message, engine: EventDrivenEngine, network: Network): void = +proc deliver*(self: Protocol, message: Message, engine: EventDrivenEngine, + network: Network): void = assert(self.messageType == message.messageType) self.uncheckedDeliver(message, engine, network) diff --git a/swarmsim/engine/schedulableevent.nim b/swarmsim/engine/schedulableevent.nim index 4026f5f..a6bff37 100644 --- a/swarmsim/engine/schedulableevent.nim +++ b/swarmsim/engine/schedulableevent.nim @@ -10,8 +10,9 @@ proc cancel*(self: SchedulableEvent) = ## self.cancelled = true -method atScheduledTime*(self: SchedulableEvent, engine: EventDrivenEngine): void {.base.} = - ## Callback invoked by the event engine indicating that this event is due for execution. By - ## default, it does nothing. +method atScheduledTime*(self: SchedulableEvent, + engine: EventDrivenEngine): void {.base.} = + ## Callback invoked by the event engine indicating that this event is due + ## for execution. ## - discard + raise newException(CatchableError, "Method without implementation override") diff --git a/swarmsim/engine/types.nim b/swarmsim/engine/types.nim index e0131e7..73547ce 100644 --- a/swarmsim/engine/types.nim +++ b/swarmsim/engine/types.nim @@ -10,24 +10,26 @@ export random type SchedulableEvent* = ref object of RootObj - ## A `SchedulableEvent` is an event that can be scheduled for execution in an `EventDrivenEngine` - ## at a well-defined point in simuliation time. + ## A `SchedulableEvent` is an event that can be scheduled for execution + ## in an `EventDrivenEngine` at a well-defined point in simuliation time. ## time*: uint64 cancelled*: bool + completed*: bool type EventDrivenEngine* = ref object of RootObj - ## An `EventDrivenEngine` is a simple simulation engine that executes events in the order of their - ## scheduled time. + ## An `EventDrivenEngine` is a simple simulation engine that executes + ## events in the order of their scheduled time. ## currentTime*: uint64 queue*: HeapQueue[SchedulableEvent] type Protocol* = ref object of RootObj - ## A `Protocol` defines a P2P protocol. It handles messages meant for it, keeps internal state, - ## and may expose services to other `Protocol`s within the same `Peer`. + ## A `Protocol` defines a P2P protocol. It handles messages meant for it, + ## keeps internal state, and may expose services to other `Protocol`s within + ## the same `Peer`. messageType*: string ## "Type" of the message accepted by this protocol. type @@ -37,15 +39,16 @@ type type Message* = ref object of RootObj - ## A `Message` is a piece of data that is sent over the network. Its meaning is typically - ## protocol-specific. + ## A `Message` is a piece of data that is sent over the network. Its meaning + ## is typically protocol-specific. messageType*: string - sender*: Option[Peer] + sender*: Option[Peer] = none(Peer) receiver*: Peer type Network* = ref object of RootObj - ## A `Network` is a collection of `Peer`s that can communicate with each other. + ## A `Network` is a collection of `Peer`s that can communicate with each + ## other. ## engine*: EventDrivenEngine defaultLinkDelay*: uint64 diff --git a/swarmsim/timeutils.nim b/swarmsim/timeutils.nim new file mode 100644 index 0000000..d20ff90 --- /dev/null +++ b/swarmsim/timeutils.nim @@ -0,0 +1,9 @@ +# Syntax sugar for durations + +import std/times + +proc dseconds*(n: int): Duration = initDuration(seconds = n) +proc dminutes*(n: int): Duration = initDuration(minutes = n) +proc dhours*(n: int): Duration = initDuration(hours = n) +proc ddays*(n: int): Duration = initDuration(days = n) +proc dweeks*(n: int): Duration = initDuration(weeks = n) diff --git a/tests/all_tests.nim b/tests/all_tests.nim index 4e79ee0..8fb10fb 100644 --- a/tests/all_tests.nim +++ b/tests/all_tests.nim @@ -2,6 +2,7 @@ import ./swarmsim/engine/eventdrivenengine import ./swarmsim/engine/schedulableevent import ./swarmsim/engine/network import ./swarmsim/engine/peer +import ./swarmsim/codex/dhttracker {.warning[UnusedImport]: off.} diff --git a/tests/swarmsim/codex/dhttracker.nim b/tests/swarmsim/codex/dhttracker.nim new file mode 100644 index 0000000..3c1ea51 --- /dev/null +++ b/tests/swarmsim/codex/dhttracker.nim @@ -0,0 +1,93 @@ +import unittest + +import std/times +import std/algorithm +import sequtils +import sugar + +import pkg/swarmsim/engine/eventdrivenengine +import pkg/swarmsim/engine/peer +import pkg/swarmsim/engine/network +import pkg/swarmsim/codex/dhttracker +import pkg/swarmsim/timeutils + +proc getPeerArray(tracker: Peer): seq[PeerDescriptor] = + DHTTracker( + tracker.getProtocol(DHTTracker.messageType).get()).peers + +proc getPeerIdArray(tracker: Peer): seq[int] = + getPeerArray(tracker).map(p => p.peerId) + +proc announcePeer(network: Network, tracker: Peer, peerId: int, + delay: uint64 = 0) = + network.send( + PeerAnnouncement(receiver: tracker, + messageType: DHTTracker.messageType, peerId: peerId), + delay.some).doAwait() + +suite "tracker node": + + setup: + let engine = EventDrivenEngine() + + let trackerPeer = Peer.new( + protocols = @[ + Protocol DHTTracker.new(maxPeers = 5) + ] + ) + + let network = Network.new(engine = engine) + network.add(trackerPeer) + + test "should retain published descriptors": + announcePeer(network, trackerPeer, 25) + + let peers = getPeerArray(trackerPeer) + + check(len(peers) == 1) + check(peers[0].peerId == 25) + + test "should not include the same peer more than once": + announcePeer(network, trackerPeer, 25) + announcePeer(network, trackerPeer, 25) + + let peers = getPeerArray(trackerPeer) + + check(len(peers) == 1) + check(peers[0].peerId == 25) + + + test "should drop descriptors after expiry time": + announcePeer(network, trackerPeer, 25) + + check(len(getPeerArray(trackerPeer)) == 1) + engine.runUntil(DHTTracker.defaultExpiry + 1.dseconds) + + check(len(getPeerArray(trackerPeer)) == 0) + + test "should renew expiry time if peer republishes its record": + announcePeer(network, trackerPeer, 25) + + check(len(getPeerArray(trackerPeer)) == 1) + engine.runUntil(DHTTracker.defaultExpiry - 1.dseconds) + + announcePeer(network, trackerPeer, 25) + + engine.runUntil(DHTTracker.defaultExpiry + 15.dseconds) + check(len(getPeerArray(trackerPeer)) == 1) + + engine.runUntil(2*DHTTracker.defaultExpiry + 1.dseconds) + check(len(getPeerArray(trackerPeer)) == 0) + + test "should drop oldest peers when table is full": + announcePeer(network, trackerPeer, 25, delay = 0) + announcePeer(network, trackerPeer, 35, delay = 1) + announcePeer(network, trackerPeer, 45, delay = 2) + announcePeer(network, trackerPeer, 55, delay = 3) + announcePeer(network, trackerPeer, 65, delay = 4) + + check(getPeerIdArray(trackerPeer).sorted == @[25, 35, 45, 55, 65]) + + announcePeer(network, trackerPeer, 75, delay = 1) + + check(getPeerIdArray(trackerPeer).sorted == @[35, 45, 55, 65, 75]) diff --git a/tests/swarmsim/engine/eventdrivenengine.nim b/tests/swarmsim/engine/eventdrivenengine.nim index bfbd8da..cf980a5 100644 --- a/tests/swarmsim/engine/eventdrivenengine.nim +++ b/tests/swarmsim/engine/eventdrivenengine.nim @@ -3,17 +3,21 @@ import sequtils import sugar import std/algorithm -import std/tables import pkg/swarmsim/engine/schedulableevent import pkg/swarmsim/engine/eventdrivenengine +type TestSchedulable = ref object of SchedulableEvent + +method atScheduledTime(self: TestSchedulable, engine: EventDrivenEngine): void = + discard + suite "event driven engine tests": test "should run schedulables at the right time": let times = @[1, 10, 5].map(time => uint64(time)) - let schedulables = times.map(time => SchedulableEvent(time: time)) + let schedulables = times.map(time => TestSchedulable(time: time)) let engine = EventDrivenEngine() @@ -27,10 +31,11 @@ suite "event driven engine tests": test "should allow clients to wait until a scheduled event happens": let times = @[1, 2, 3, 4, 5, 6, 7, 8] - let schedulables = times.map(time => SchedulableEvent(time: uint64(time))) + let schedulables = times.map(time => TestSchedulable(time: uint64(time))) let engine = EventDrivenEngine() - let handles = schedulables.map(schedulable => engine.awaitableSchedule(schedulable)) + let handles = schedulables.map(schedulable => + engine.awaitableSchedule(schedulable)) check(engine.currentTime == 0) @@ -40,9 +45,32 @@ suite "event driven engine tests": handles[7].doAwait() check(engine.currentTime == 8) + test "should allow clients run until the desired simulation time": + let times = @[50, 100, 150] + let schedulables = times.map(time => TestSchedulable(time: uint64(time))) + + let engine = EventDrivenEngine() + engine.scheduleAll(schedulables) + + engine.runUntil(80) + + check(engine.currentTime == 80) + check(schedulables[0].completed) + check(not schedulables[1].completed) + + engine.runUntil(110) + + check(engine.currentTime == 110) + check(schedulables[1].completed) + check(not schedulables[2].completed) + + engine.runUntil(200) + check(engine.currentTime == 150) + check(schedulables[2].completed) + test "should not allow schedulables to be scheduled in the past": - let e1 = SchedulableEvent(time: 10) - let e2 = SchedulableEvent(time: 8) + let e1 = TestSchedulable(time: 10) + let e2 = TestSchedulable(time: 8) let engine = EventDrivenEngine() engine.schedule(e1) @@ -52,8 +80,8 @@ suite "event driven engine tests": engine.schedule(e2) test "should allow clients to cancel scheduled events": - let e1 = SchedulableEvent(time: 8) - let e2 = SchedulableEvent(time: 10) + let e1 = TestSchedulable(time: 8) + let e2 = TestSchedulable(time: 10) let engine = EventDrivenEngine() diff --git a/tests/swarmsim/engine/network.nim b/tests/swarmsim/engine/network.nim index 8e65ba0..f1558fa 100644 --- a/tests/swarmsim/engine/network.nim +++ b/tests/swarmsim/engine/network.nim @@ -3,7 +3,6 @@ import unittest import pkg/swarmsim/engine/eventdrivenengine import pkg/swarmsim/engine/network import pkg/swarmsim/engine/peer -import pkg/swarmsim/engine/message import pkg/swarmsim/engine/protocol type @@ -33,8 +32,8 @@ suite "network": network.add(peer) - let m1 = Message.new(receiver = peer, messageType = "protocol1") - let m2 = Message.new(receiver = peer, messageType = "protocol2") + let m1 = Message(receiver: peer, messageType: "protocol1") + let m2 = Message(receiver: peer, messageType: "protocol2") let message2handle = network.send(m2, linkDelay = uint64(10).some) let message1handle = network.send(m1, linkDelay = uint64(5).some)