refactor engine loop

This commit is contained in:
gmega 2023-08-18 19:38:32 -03:00
parent 7548c545f8
commit 614d372e52
2 changed files with 24 additions and 18 deletions

View File

@ -16,13 +16,13 @@ type
PeerDescriptor* = ref object of RootObj
peerId*: int
lastSeen*: uint64
expiry: AwaitableHandle
expiryTimer: AwaitableHandle
type ArrayShuffler = proc (arr: var seq[PeerDescriptor]): void
type
DHTTracker* = ref object of Protocol
expiry*: Duration
expiryTimer*: Duration
maxPeers*: uint
peers: OrderedTable[int, PeerDescriptor]
shuffler: ArrayShuffler
@ -33,7 +33,7 @@ type
SampleSwarm* = ref object of Message
numPeers: uint
PeerExpiry* = ref object of SchedulableEvent
ExpiryTimer* = ref object of SchedulableEvent
peerId*: int
tracker: DHTTracker
@ -48,11 +48,11 @@ proc new*(
T: type DHTTracker,
maxPeers: uint,
shuffler: ArrayShuffler = RandomShuffler,
expiry: Duration = DHTTracker.defaultExpiry,
expiryTimer: Duration = DHTTracker.defaultExpiry,
): DHTTracker =
DHTTracker(
# This should in general be safe as those are always positive.
expiry: expiry,
expiryTimer: expiryTimer,
maxPeers: maxPeers,
shuffler: shuffler,
peers: initOrderedTable[int, PeerDescriptor](),
@ -61,6 +61,19 @@ proc new*(
proc peers*(self: DHTTracker): seq[PeerDescriptor] = self.peers.values.toSeq()
proc cancelExpiryTimer(self: DHTTracker, peerId: int) =
self.peers[peerId].expiryTimer.schedulable.cancel()
proc createExpiryTimer(self: DHTTracker, peerId: int,
engine: EventDrivenEngine): AwaitableHandle =
let expiryTimer = ExpiryTimer(
peerId: peerId,
tracker: self,
time: engine.currentTime + uint64(self.expiryTimer.inSeconds())
)
engine.awaitableSchedule(expiryTimer)
proc oldestInsertion(self: DHTTracker): int =
# We maintain the invariant that the first element to have been inserted
# must be the oldest. We can therefore return the first element in the
@ -74,27 +87,20 @@ proc addPeer(self: DHTTracker, message: PeerAnnouncement,
let peerId = message.peerId
if peerId in self.peers:
self.peers[peerId].expiry.schedulable.cancel()
self.cancelExpiryTimer(peerId)
elif uint(len(self.peers)) == self.maxPeers:
let oldest = self.oldestInsertion()
self.peers[oldest].expiry.schedulable.cancel()
self.cancelExpiryTimer(oldest)
self.peers.del(oldest)
let expiry = PeerExpiry(
peerId: message.peerId,
tracker: self,
time: engine.currentTime + uint64(self.expiry.inSeconds())
)
let descriptor = PeerDescriptor(
self.peers[peerId] = PeerDescriptor(
peerId: message.peerId,
lastSeen: engine.currentTime,
expiry: engine.awaitableSchedule(expiry))
expiryTimer: self.createExpiryTimer(peerId, engine)
)
self.peers[peerId] = descriptor
method atScheduledTime*(self: PeerExpiry, engine: EventDrivenEngine): void =
method atScheduledTime*(self: ExpiryTimer, engine: EventDrivenEngine): void =
self.tracker.peers.del(self.peerId)
proc sampleSwarm(self: DHTTracker, message: SampleSwarm, network: Network) =