add dht tracker node, multiple engine feature improvements

This commit is contained in:
gmega 2023-08-18 19:09:02 -03:00
parent 1673dd806c
commit 7548c545f8
17 changed files with 307 additions and 55 deletions

View File

@ -5,12 +5,12 @@ on: [push, pull_request]
jobs:
run-tests:
runs-on: ubuntu-latest
name: Ubuntu, Nim 1.6.12
name: Ubuntu, Nim 2.0.0
steps:
- uses: actions/checkout@v2
- uses: iffy/install-nim@v3
with:
version: 1.6.12
version: 2.0.0
- name: Install Deps
run: nimble install -y
- name: Run tests

3
.gitignore vendored
View File

@ -3,4 +3,5 @@
!*.*
nimbledeps
nimble.develop
nimble.paths
nimble.paths
.vscode

View File

@ -0,0 +1,109 @@
import ../engine/protocol
import ../engine/network
import ../engine/schedulableevent
import ../timeutils
import std/times
import std/options
import std/algorithm
import std/tables
import sequtils
export protocol
export options
type
PeerDescriptor* = ref object of RootObj
peerId*: int
lastSeen*: uint64
expiry: AwaitableHandle
type ArrayShuffler = proc (arr: var seq[PeerDescriptor]): void
type
DHTTracker* = ref object of Protocol
expiry*: Duration
maxPeers*: uint
peers: OrderedTable[int, PeerDescriptor]
shuffler: ArrayShuffler
PeerAnnouncement* = ref object of Message
peerId*: int
SampleSwarm* = ref object of Message
numPeers: uint
PeerExpiry* = ref object of SchedulableEvent
peerId*: int
tracker: DHTTracker
let RandomShuffler = proc (arr: var seq[PeerDescriptor]) =
discard arr.nextPermutation()
proc messageType*(T: type DHTTracker): string = "DHTTracker"
proc defaultExpiry*(T: type DHTTracker): Duration = 15.dminutes
proc new*(
T: type DHTTracker,
maxPeers: uint,
shuffler: ArrayShuffler = RandomShuffler,
expiry: Duration = DHTTracker.defaultExpiry,
): DHTTracker =
DHTTracker(
# This should in general be safe as those are always positive.
expiry: expiry,
maxPeers: maxPeers,
shuffler: shuffler,
peers: initOrderedTable[int, PeerDescriptor](),
messageType: DHTTracker.messageType
)
proc peers*(self: DHTTracker): seq[PeerDescriptor] = self.peers.values.toSeq()
proc oldestInsertion(self: DHTTracker): int =
# We maintain the invariant that the first element to have been inserted
# must be the oldest. We can therefore return the first element in the
# ordered table.
for peerId in self.peers.keys:
return peerId
proc addPeer(self: DHTTracker, message: PeerAnnouncement,
engine: EventDrivenEngine) =
let peerId = message.peerId
if peerId in self.peers:
self.peers[peerId].expiry.schedulable.cancel()
elif uint(len(self.peers)) == self.maxPeers:
let oldest = self.oldestInsertion()
self.peers[oldest].expiry.schedulable.cancel()
self.peers.del(oldest)
let expiry = PeerExpiry(
peerId: message.peerId,
tracker: self,
time: engine.currentTime + uint64(self.expiry.inSeconds())
)
let descriptor = PeerDescriptor(
peerId: message.peerId,
lastSeen: engine.currentTime,
expiry: engine.awaitableSchedule(expiry))
self.peers[peerId] = descriptor
method atScheduledTime*(self: PeerExpiry, engine: EventDrivenEngine): void =
self.tracker.peers.del(self.peerId)
proc sampleSwarm(self: DHTTracker, message: SampleSwarm, network: Network) =
discard
method uncheckedDeliver*(self: DHTTracker, message: Message,
engine: EventDrivenEngine, network: Network) =
if message of PeerAnnouncement:
self.addPeer(PeerAnnouncement(message), engine)
elif message of SampleSwarm:
self.sampleSwarm(SampleSwarm(message), network)

0
swarmsim/codex/types.nim Normal file
View File

View File

@ -4,8 +4,9 @@ import std/math
type Distribution = proc(): float
proc unitUniform(): float =
## Uniform distribution on [0, 1]. Used as a building block for inverse transform
## samplers (https://en.wikipedia.org/wiki/Inverse_transform_sampling), as well as
## Uniform distribution on [0, 1]. Used as a building block for inverse
## transform samplers
## (https://en.wikipedia.org/wiki/Inverse_transform_sampling), as well as
## for the scaled uniform distribution.
rand(1.float)

View File

@ -1,10 +1,12 @@
import std/options
import std/strformat
import std/times
import ./types
import ./schedulableevent
export options
export times
export EventDrivenEngine
type
@ -17,33 +19,49 @@ proc currentTime*(self: EventDrivenEngine): uint64 {.inline.} = self.currentTime
proc schedule*(self: EventDrivenEngine, schedulable: SchedulableEvent): void =
if schedulable.time < self.currentTime:
raise (ref Defect)(
msg: fmt"Cannot schedule an event in the past ({schedulable.time}) < ({self.currentTime})")
msg: "Cannot schedule an event in the past " &
fmt"({schedulable.time}) < ({self.currentTime})")
self.queue.push(schedulable)
proc awaitableSchedule*(self: EventDrivenEngine, schedulable: SchedulableEvent): AwaitableHandle =
proc awaitableSchedule*(self: EventDrivenEngine,
schedulable: SchedulableEvent): AwaitableHandle =
self.schedule(schedulable)
AwaitableHandle(schedulable: schedulable, engine: self)
proc scheduleAll*[T: SchedulableEvent](self: EventDrivenEngine, schedulables: seq[T]): void =
proc scheduleAll*[T: SchedulableEvent](self: EventDrivenEngine,
schedulables: seq[T]): void =
for schedulable in schedulables:
self.schedule(schedulable)
proc nextStep*(self: EventDrivenEngine): Option[SchedulableEvent] =
proc stepUntil(self: EventDrivenEngine,
until: Option[uint64] = none(uint64)): Option[SchedulableEvent] =
while len(self.queue) > 0:
if until.isSome and self.queue[0].time > until.get:
self.currentTime = until.get
return none(SchedulableEvent)
let schedulable = self.queue.pop()
self.currentTime = schedulable.time
if not schedulable.cancelled:
schedulable.atScheduledTime(engine = self)
schedulable.completed = true
return some(schedulable)
return none(SchedulableEvent)
proc nextStep*(self: EventDrivenEngine): Option[SchedulableEvent] =
self.stepUntil()
proc runUntil*(self: EventDrivenEngine, until: uint64): void =
while self.nextStep().isSome and self.currentTime < until:
while self.stepUntil(until.some).isSome and self.currentTime <= until:
discard
proc runUntil*(self: EventDrivenEngine, until: Duration): void =
self.runUntil(uint64(until.inSeconds()))
proc run*(self: EventDrivenEngine): void =
self.runUntil(high(uint64))

View File

@ -1,11 +0,0 @@
import std/options
import ./types
export options
export Message
proc new*(T: type Message, sender: Option[Peer] = none(Peer), receiver: Peer, messageType: string): Message =
Message(sender: sender, receiver: receiver, messageType: messageType)

View File

@ -20,8 +20,8 @@ proc new*(
T: type Network,
engine: EventDrivenEngine,
defaultLinkDelay: uint64 = 0
): T =
return Network(
): Network =
Network(
engine: engine,
defaultLinkDelay: defaultLinkDelay,
peers: HashSet[Peer]()

View File

@ -1,6 +1,7 @@
import std/tables
import std/options
import std/random
import std/hashes
import ./types
import ./protocol
@ -18,7 +19,8 @@ proc getProtocol*(self: Peer, protocolId: string): Option[Protocol] =
none(Protocol)
proc deliver*(self: Peer, message: Message, engine: EventDrivenEngine, network: Network): void =
proc deliver*(self: Peer, message: Message, engine: EventDrivenEngine,
network: Network): void =
self.getProtocol(message.messageType).map(
proc (p: Protocol): void = p.deliver(message, engine, network))
@ -29,7 +31,7 @@ proc initPeer(self: Peer, protocols: seq[Protocol]): Peer =
self
proc hash*(self: Peer): int = self.peerId
proc hash*(self: Peer): Hash = self.peerId.hash
proc new*(
T: type Peer,

View File

@ -1,12 +1,9 @@
import ./types
import ./message
import ./network
import ./eventdrivenengine
export message
export network
export eventdrivenengine
export Protocol
export Message
method uncheckedDeliver(
self: Protocol,
@ -16,6 +13,7 @@ method uncheckedDeliver(
): void {.base.} =
raise newException(CatchableError, "Method without implementation override")
proc deliver*(self: Protocol, message: Message, engine: EventDrivenEngine, network: Network): void =
proc deliver*(self: Protocol, message: Message, engine: EventDrivenEngine,
network: Network): void =
assert(self.messageType == message.messageType)
self.uncheckedDeliver(message, engine, network)

View File

@ -10,8 +10,9 @@ proc cancel*(self: SchedulableEvent) =
##
self.cancelled = true
method atScheduledTime*(self: SchedulableEvent, engine: EventDrivenEngine): void {.base.} =
## Callback invoked by the event engine indicating that this event is due for execution. By
## default, it does nothing.
method atScheduledTime*(self: SchedulableEvent,
engine: EventDrivenEngine): void {.base.} =
## Callback invoked by the event engine indicating that this event is due
## for execution.
##
discard
raise newException(CatchableError, "Method without implementation override")

View File

@ -10,24 +10,26 @@ export random
type
SchedulableEvent* = ref object of RootObj
## A `SchedulableEvent` is an event that can be scheduled for execution in an `EventDrivenEngine`
## at a well-defined point in simuliation time.
## A `SchedulableEvent` is an event that can be scheduled for execution
## in an `EventDrivenEngine` at a well-defined point in simuliation time.
##
time*: uint64
cancelled*: bool
completed*: bool
type
EventDrivenEngine* = ref object of RootObj
## An `EventDrivenEngine` is a simple simulation engine that executes events in the order of their
## scheduled time.
## An `EventDrivenEngine` is a simple simulation engine that executes
## events in the order of their scheduled time.
##
currentTime*: uint64
queue*: HeapQueue[SchedulableEvent]
type
Protocol* = ref object of RootObj
## A `Protocol` defines a P2P protocol. It handles messages meant for it, keeps internal state,
## and may expose services to other `Protocol`s within the same `Peer`.
## A `Protocol` defines a P2P protocol. It handles messages meant for it,
## keeps internal state, and may expose services to other `Protocol`s within
## the same `Peer`.
messageType*: string ## "Type" of the message accepted by this protocol.
type
@ -37,15 +39,16 @@ type
type
Message* = ref object of RootObj
## A `Message` is a piece of data that is sent over the network. Its meaning is typically
## protocol-specific.
## A `Message` is a piece of data that is sent over the network. Its meaning
## is typically protocol-specific.
messageType*: string
sender*: Option[Peer]
sender*: Option[Peer] = none(Peer)
receiver*: Peer
type
Network* = ref object of RootObj
## A `Network` is a collection of `Peer`s that can communicate with each other.
## A `Network` is a collection of `Peer`s that can communicate with each
## other.
##
engine*: EventDrivenEngine
defaultLinkDelay*: uint64

9
swarmsim/timeutils.nim Normal file
View File

@ -0,0 +1,9 @@
# Syntax sugar for durations
import std/times
proc dseconds*(n: int): Duration = initDuration(seconds = n)
proc dminutes*(n: int): Duration = initDuration(minutes = n)
proc dhours*(n: int): Duration = initDuration(hours = n)
proc ddays*(n: int): Duration = initDuration(days = n)
proc dweeks*(n: int): Duration = initDuration(weeks = n)

View File

@ -2,6 +2,7 @@ import ./swarmsim/engine/eventdrivenengine
import ./swarmsim/engine/schedulableevent
import ./swarmsim/engine/network
import ./swarmsim/engine/peer
import ./swarmsim/codex/dhttracker
{.warning[UnusedImport]: off.}

View File

@ -0,0 +1,93 @@
import unittest
import std/times
import std/algorithm
import sequtils
import sugar
import pkg/swarmsim/engine/eventdrivenengine
import pkg/swarmsim/engine/peer
import pkg/swarmsim/engine/network
import pkg/swarmsim/codex/dhttracker
import pkg/swarmsim/timeutils
proc getPeerArray(tracker: Peer): seq[PeerDescriptor] =
DHTTracker(
tracker.getProtocol(DHTTracker.messageType).get()).peers
proc getPeerIdArray(tracker: Peer): seq[int] =
getPeerArray(tracker).map(p => p.peerId)
proc announcePeer(network: Network, tracker: Peer, peerId: int,
delay: uint64 = 0) =
network.send(
PeerAnnouncement(receiver: tracker,
messageType: DHTTracker.messageType, peerId: peerId),
delay.some).doAwait()
suite "tracker node":
setup:
let engine = EventDrivenEngine()
let trackerPeer = Peer.new(
protocols = @[
Protocol DHTTracker.new(maxPeers = 5)
]
)
let network = Network.new(engine = engine)
network.add(trackerPeer)
test "should retain published descriptors":
announcePeer(network, trackerPeer, 25)
let peers = getPeerArray(trackerPeer)
check(len(peers) == 1)
check(peers[0].peerId == 25)
test "should not include the same peer more than once":
announcePeer(network, trackerPeer, 25)
announcePeer(network, trackerPeer, 25)
let peers = getPeerArray(trackerPeer)
check(len(peers) == 1)
check(peers[0].peerId == 25)
test "should drop descriptors after expiry time":
announcePeer(network, trackerPeer, 25)
check(len(getPeerArray(trackerPeer)) == 1)
engine.runUntil(DHTTracker.defaultExpiry + 1.dseconds)
check(len(getPeerArray(trackerPeer)) == 0)
test "should renew expiry time if peer republishes its record":
announcePeer(network, trackerPeer, 25)
check(len(getPeerArray(trackerPeer)) == 1)
engine.runUntil(DHTTracker.defaultExpiry - 1.dseconds)
announcePeer(network, trackerPeer, 25)
engine.runUntil(DHTTracker.defaultExpiry + 15.dseconds)
check(len(getPeerArray(trackerPeer)) == 1)
engine.runUntil(2*DHTTracker.defaultExpiry + 1.dseconds)
check(len(getPeerArray(trackerPeer)) == 0)
test "should drop oldest peers when table is full":
announcePeer(network, trackerPeer, 25, delay = 0)
announcePeer(network, trackerPeer, 35, delay = 1)
announcePeer(network, trackerPeer, 45, delay = 2)
announcePeer(network, trackerPeer, 55, delay = 3)
announcePeer(network, trackerPeer, 65, delay = 4)
check(getPeerIdArray(trackerPeer).sorted == @[25, 35, 45, 55, 65])
announcePeer(network, trackerPeer, 75, delay = 1)
check(getPeerIdArray(trackerPeer).sorted == @[35, 45, 55, 65, 75])

View File

@ -3,17 +3,21 @@ import sequtils
import sugar
import std/algorithm
import std/tables
import pkg/swarmsim/engine/schedulableevent
import pkg/swarmsim/engine/eventdrivenengine
type TestSchedulable = ref object of SchedulableEvent
method atScheduledTime(self: TestSchedulable, engine: EventDrivenEngine): void =
discard
suite "event driven engine tests":
test "should run schedulables at the right time":
let times = @[1, 10, 5].map(time => uint64(time))
let schedulables = times.map(time => SchedulableEvent(time: time))
let schedulables = times.map(time => TestSchedulable(time: time))
let engine = EventDrivenEngine()
@ -27,10 +31,11 @@ suite "event driven engine tests":
test "should allow clients to wait until a scheduled event happens":
let times = @[1, 2, 3, 4, 5, 6, 7, 8]
let schedulables = times.map(time => SchedulableEvent(time: uint64(time)))
let schedulables = times.map(time => TestSchedulable(time: uint64(time)))
let engine = EventDrivenEngine()
let handles = schedulables.map(schedulable => engine.awaitableSchedule(schedulable))
let handles = schedulables.map(schedulable =>
engine.awaitableSchedule(schedulable))
check(engine.currentTime == 0)
@ -40,9 +45,32 @@ suite "event driven engine tests":
handles[7].doAwait()
check(engine.currentTime == 8)
test "should allow clients run until the desired simulation time":
let times = @[50, 100, 150]
let schedulables = times.map(time => TestSchedulable(time: uint64(time)))
let engine = EventDrivenEngine()
engine.scheduleAll(schedulables)
engine.runUntil(80)
check(engine.currentTime == 80)
check(schedulables[0].completed)
check(not schedulables[1].completed)
engine.runUntil(110)
check(engine.currentTime == 110)
check(schedulables[1].completed)
check(not schedulables[2].completed)
engine.runUntil(200)
check(engine.currentTime == 150)
check(schedulables[2].completed)
test "should not allow schedulables to be scheduled in the past":
let e1 = SchedulableEvent(time: 10)
let e2 = SchedulableEvent(time: 8)
let e1 = TestSchedulable(time: 10)
let e2 = TestSchedulable(time: 8)
let engine = EventDrivenEngine()
engine.schedule(e1)
@ -52,8 +80,8 @@ suite "event driven engine tests":
engine.schedule(e2)
test "should allow clients to cancel scheduled events":
let e1 = SchedulableEvent(time: 8)
let e2 = SchedulableEvent(time: 10)
let e1 = TestSchedulable(time: 8)
let e2 = TestSchedulable(time: 10)
let engine = EventDrivenEngine()

View File

@ -3,7 +3,6 @@ import unittest
import pkg/swarmsim/engine/eventdrivenengine
import pkg/swarmsim/engine/network
import pkg/swarmsim/engine/peer
import pkg/swarmsim/engine/message
import pkg/swarmsim/engine/protocol
type
@ -33,8 +32,8 @@ suite "network":
network.add(peer)
let m1 = Message.new(receiver = peer, messageType = "protocol1")
let m2 = Message.new(receiver = peer, messageType = "protocol2")
let m1 = Message(receiver: peer, messageType: "protocol1")
let m2 = Message(receiver: peer, messageType: "protocol2")
let message2handle = network.send(m2, linkDelay = uint64(10).some)
let message1handle = network.send(m1, linkDelay = uint64(5).some)