Merge branch 'main' into tweaks-suggestions

This commit is contained in:
gmega 2023-08-22 08:35:46 -03:00
commit 996bee5d65
4 changed files with 28 additions and 24 deletions

View File

@ -13,13 +13,13 @@ type
PeerDescriptor* = ref object of RootObj PeerDescriptor* = ref object of RootObj
peerId*: int peerId*: int
lastSeen*: uint64 lastSeen*: uint64
expirationTimer: AwaitableHandle peerExpiration: ScheduledEvent
type ArrayShuffler = proc (arr: var seq[PeerDescriptor]): void type ArrayShuffler = proc (arr: var seq[PeerDescriptor]): void
type type
DHTTracker* = ref object of Protocol DHTTracker* = ref object of Protocol
expirationTimer*: Duration peerExpiration*: Duration
maxPeers*: uint maxPeers*: uint
peers: OrderedTable[int, PeerDescriptor] peers: OrderedTable[int, PeerDescriptor]
shuffler: ArrayShuffler shuffler: ArrayShuffler
@ -45,11 +45,11 @@ proc new*(
T: type DHTTracker, T: type DHTTracker,
maxPeers: uint, maxPeers: uint,
shuffler: ArrayShuffler = RandomShuffler, shuffler: ArrayShuffler = RandomShuffler,
expirationTimer: Duration = DHTTracker.defaultExpiry, peerExpiration: Duration = DHTTracker.defaultExpiry,
): DHTTracker = ): DHTTracker =
DHTTracker( DHTTracker(
# This should in general be safe as those are always positive. # This should in general be safe as those are always positive.
expirationTimer: expirationTimer, peerExpiration: peerExpiration,
maxPeers: maxPeers, maxPeers: maxPeers,
shuffler: shuffler, shuffler: shuffler,
peers: initOrderedTable[int, PeerDescriptor](), peers: initOrderedTable[int, PeerDescriptor](),
@ -59,17 +59,17 @@ proc new*(
proc peers*(self: DHTTracker): seq[PeerDescriptor] = self.peers.values.toSeq() proc peers*(self: DHTTracker): seq[PeerDescriptor] = self.peers.values.toSeq()
proc cancelExpiryTimer(self: DHTTracker, peerId: int) = proc cancelExpiryTimer(self: DHTTracker, peerId: int) =
self.peers[peerId].expirationTimer.schedulable.cancel() self.peers[peerId].peerExpiration.schedulable.cancel()
proc createExpiryTimer(self: DHTTracker, peerId: int, proc createExpiryTimer(self: DHTTracker, peerId: int,
engine: EventDrivenEngine): AwaitableHandle = engine: EventDrivenEngine): ScheduledEvent =
let expirationTimer = ExpirationTimer( let peerExpiration = ExpirationTimer(
peerId: peerId, peerId: peerId,
tracker: self, tracker: self,
time: engine.currentTime + uint64(self.expirationTimer.inSeconds()) time: engine.currentTime + uint64(self.peerExpiration.inSeconds())
) )
engine.awaitableSchedule(expirationTimer) engine.awaitableSchedule(peerExpiration)
proc oldestInsertion(self: DHTTracker): int = proc oldestInsertion(self: DHTTracker): int =
# We maintain the invariant that the first element to have been inserted # We maintain the invariant that the first element to have been inserted
@ -78,23 +78,26 @@ proc oldestInsertion(self: DHTTracker): int =
for peerId in self.peers.keys: for peerId in self.peers.keys:
return peerId return peerId
proc removePeer(self: DHTTracker, peerId: int) =
self.cancelExpiryTimer(peerId)
self.peers.del(peerId)
proc addPeer(self: DHTTracker, message: PeerAnnouncement, proc addPeer(self: DHTTracker, message: PeerAnnouncement,
engine: EventDrivenEngine) = engine: EventDrivenEngine) =
let peerId = message.peerId let peerId = message.peerId
if peerId in self.peers: if peerId in self.peers:
self.cancelExpiryTimer(peerId) # Makes sure that the most recently seen peer is always inserted last.
self.removePeer(peerId)
elif uint(len(self.peers)) == self.maxPeers: elif uint(len(self.peers)) == self.maxPeers:
let oldest = self.oldestInsertion() self.removePeer(self.oldestInsertion())
self.cancelExpiryTimer(oldest)
self.peers.del(oldest)
self.peers[peerId] = PeerDescriptor( self.peers[peerId] = PeerDescriptor(
peerId: message.peerId, peerId: message.peerId,
lastSeen: engine.currentTime, lastSeen: engine.currentTime,
expirationTimer: self.createExpiryTimer(peerId, engine) peerExpiration: self.createExpiryTimer(peerId, engine)
) )
method atScheduledTime*(self: ExpirationTimer, engine: EventDrivenEngine): void = method atScheduledTime*(self: ExpirationTimer, engine: EventDrivenEngine): void =

View File

@ -10,7 +10,7 @@ export times
export EventDrivenEngine export EventDrivenEngine
type type
AwaitableHandle* = object of RootObj ScheduledEvent* = object of RootObj
schedulable*: SchedulableEvent schedulable*: SchedulableEvent
engine: EventDrivenEngine engine: EventDrivenEngine
@ -24,9 +24,9 @@ proc schedule*(self: EventDrivenEngine, schedulable: SchedulableEvent): void =
self.queue.push(schedulable) self.queue.push(schedulable)
proc awaitableSchedule*(self: EventDrivenEngine, proc awaitableSchedule*(self: EventDrivenEngine,
schedulable: SchedulableEvent): AwaitableHandle = schedulable: SchedulableEvent): ScheduledEvent =
self.schedule(schedulable) self.schedule(schedulable)
AwaitableHandle(schedulable: schedulable, engine: self) ScheduledEvent(schedulable: schedulable, engine: self)
proc scheduleAll*[T: SchedulableEvent](self: EventDrivenEngine, proc scheduleAll*[T: SchedulableEvent](self: EventDrivenEngine,
schedulables: seq[T]): void = schedulables: seq[T]): void =
@ -65,5 +65,5 @@ proc runUntil*(self: EventDrivenEngine, until: Duration): void =
proc run*(self: EventDrivenEngine): void = proc run*(self: EventDrivenEngine): void =
self.runUntil(high(uint64)) self.runUntil(high(uint64))
proc doAwait*(self: AwaitableHandle): void = proc doAwait*(self: ScheduledEvent): void =
self.engine.runUntil(self.schedulable.time) self.engine.runUntil(self.schedulable.time)

View File

@ -12,7 +12,7 @@ export eventdrivenengine
export types export types
type type
ScheduledMessage = ref object of SchedulableEvent MessageSend = ref object of SchedulableEvent
network: Network network: Network
message: Message message: Message
@ -36,17 +36,17 @@ proc remove*(self: Network, peer: Peer) =
self.peers.excl(peer) self.peers.excl(peer)
proc send*(self: Network, message: Message, proc send*(self: Network, message: Message,
linkDelay: Option[uint64] = none(uint64)): AwaitableHandle = linkDelay: Option[uint64] = none(uint64)): ScheduledEvent =
let delay = linkDelay.get(self.defaultLinkDelay) let delay = linkDelay.get(self.defaultLinkDelay)
self.engine.awaitableSchedule( self.engine.awaitableSchedule(
ScheduledMessage( MessageSend(
time: self.engine.currentTime + delay, time: self.engine.currentTime + delay,
message: message, message: message,
network: self network: self
) )
) )
method atScheduledTime*(self: ScheduledMessage, engine: EventDrivenEngine) = method atScheduledTime*(self: MessageSend, engine: EventDrivenEngine) =
self.message.receiver.deliver(self.message, engine, self.network) self.message.receiver.deliver(self.message, engine, self.network)

View File

@ -78,7 +78,7 @@ suite "tracker node":
engine.runUntil(2*DHTTracker.defaultExpiry + 1.dseconds) engine.runUntil(2*DHTTracker.defaultExpiry + 1.dseconds)
check(len(getPeerArray(trackerPeer)) == 0) check(len(getPeerArray(trackerPeer)) == 0)
test "should drop oldest peers when table is full": test "should drop least recently seen peer when table is full":
announcePeer(network, trackerPeer, 25, delay = 0) announcePeer(network, trackerPeer, 25, delay = 0)
announcePeer(network, trackerPeer, 35, delay = 1) announcePeer(network, trackerPeer, 35, delay = 1)
announcePeer(network, trackerPeer, 45, delay = 2) announcePeer(network, trackerPeer, 45, delay = 2)
@ -87,6 +87,7 @@ suite "tracker node":
check(getPeerIdArray(trackerPeer).sorted == @[25, 35, 45, 55, 65]) check(getPeerIdArray(trackerPeer).sorted == @[25, 35, 45, 55, 65])
announcePeer(network, trackerPeer, 25, delay = 1)
announcePeer(network, trackerPeer, 75, delay = 1) announcePeer(network, trackerPeer, 75, delay = 1)
check(getPeerIdArray(trackerPeer).sorted == @[35, 45, 55, 65, 75]) check(getPeerIdArray(trackerPeer).sorted == @[25, 45, 55, 65, 75])