mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-14 00:29:04 +00:00
PeerPool fixes. (#1654)
* Refactor peer_pool. Fix eth2_network peer counters. Fix PeerPool do not allow to add more peers when empty space available. * Remove unused imports. * Add test for a bug. * Fix eth2_network disconnect should deletePeer not release. More PeerPool refactoring.
This commit is contained in:
parent
6d8130dc49
commit
6e463257f4
@ -378,7 +378,7 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason,
|
||||
peer.connectionState = Disconnecting
|
||||
await peer.network.switch.disconnect(peer.info.peerId)
|
||||
peer.connectionState = Disconnected
|
||||
peer.network.peerPool.release(peer)
|
||||
discard peer.network.peerPool.deletePeer(peer)
|
||||
let seenTime = case reason
|
||||
of ClientShutDown:
|
||||
SeenTableTimeClientShutDown
|
||||
@ -681,7 +681,6 @@ proc handleOutgoingPeer(peer: Peer): Future[bool] {.async.} =
|
||||
|
||||
proc onPeerClosed(udata: pointer) =
|
||||
debug "Peer (outgoing) lost", peer
|
||||
nbc_peers.set int64(len(network.peerPool))
|
||||
|
||||
let res = await network.peerPool.addOutgoingPeer(peer)
|
||||
if res:
|
||||
@ -690,14 +689,11 @@ proc handleOutgoingPeer(peer: Peer): Future[bool] {.async.} =
|
||||
peer.getFuture().addCallback(onPeerClosed)
|
||||
result = true
|
||||
|
||||
nbc_peers.set int64(len(network.peerPool))
|
||||
|
||||
proc handleIncomingPeer(peer: Peer): Future[bool] {.async.} =
|
||||
let network = peer.network
|
||||
|
||||
proc onPeerClosed(udata: pointer) =
|
||||
debug "Peer (incoming) lost", peer
|
||||
nbc_peers.set int64(len(network.peerPool))
|
||||
|
||||
let res = await network.peerPool.addIncomingPeer(peer)
|
||||
if res:
|
||||
@ -706,8 +702,6 @@ proc handleIncomingPeer(peer: Peer): Future[bool] {.async.} =
|
||||
peer.getFuture().addCallback(onPeerClosed)
|
||||
result = true
|
||||
|
||||
nbc_peers.set int64(len(network.peerPool))
|
||||
|
||||
proc toPeerAddr*(r: enr.TypedRecord):
|
||||
Result[PeerAddr, cstring] {.raises: [Defect].} =
|
||||
if not r.secp256k1.isSome:
|
||||
@ -935,6 +929,12 @@ proc startListening*(node: Eth2Node) {.async.} =
|
||||
await node.pubsub.start()
|
||||
|
||||
proc start*(node: Eth2Node) {.async.} =
|
||||
|
||||
proc onPeerCountChanged() =
|
||||
nbc_peers.set int64(len(node.peerPool))
|
||||
|
||||
node.peerPool.setPeerCounter(onPeerCountChanged)
|
||||
|
||||
for i in 0 ..< ConcurrentConnections:
|
||||
node.connWorkers.add connectWorker(node)
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
import tables, heapqueue
|
||||
import std/[tables, heapqueue]
|
||||
import chronos
|
||||
|
||||
type
|
||||
@ -23,6 +23,8 @@ type
|
||||
|
||||
PeerScoreCheckCallback*[T] = proc(peer: T): bool {.gcsafe, raises: [Defect].}
|
||||
|
||||
PeerCounterCallback* = proc() {.gcsafe, raises: [Defect].}
|
||||
|
||||
PeerPool*[A, B] = ref object
|
||||
incNotEmptyEvent: AsyncEvent
|
||||
outNotEmptyEvent: AsyncEvent
|
||||
@ -34,6 +36,7 @@ type
|
||||
storage: seq[PeerItem[A]]
|
||||
cmp: proc(a, b: PeerIndex): bool {.closure, gcsafe.}
|
||||
scoreCheck: PeerScoreCheckCallback[A]
|
||||
peerCounter: PeerCounterCallback
|
||||
maxPeersCount: int
|
||||
maxIncPeersCount: int
|
||||
maxOutPeersCount: int
|
||||
@ -45,7 +48,7 @@ type
|
||||
PeerPoolError* = object of CatchableError
|
||||
|
||||
proc `<`*(a, b: PeerIndex): bool =
|
||||
result = a.cmp(b, a)
|
||||
a.cmp(b, a)
|
||||
|
||||
proc fireNotEmptyEvent[A, B](pool: PeerPool[A, B],
|
||||
item: PeerItem[A]) {.inline.} =
|
||||
@ -115,38 +118,10 @@ proc waitNotFullEvent[A, B](pool: PeerPool[A, B],
|
||||
filter: set[PeerType]): Future[void] =
|
||||
pool.waitForEvent(EventType.NotFullEvent, filter)
|
||||
|
||||
template getItem[A, B](pool: PeerPool[A, B],
|
||||
filter: set[PeerType]): ptr PeerItem[A] =
|
||||
doAssert((len(pool.outQueue) > 0) or (len(pool.incQueue) > 0))
|
||||
var pindex: int
|
||||
if filter == {PeerType.Incoming, PeerType.Outgoing}:
|
||||
if len(pool.outQueue) > 0 and len(pool.incQueue) > 0:
|
||||
# Don't think `<` is actually `<` here.
|
||||
if pool.incQueue[0] < pool.outQueue[0]:
|
||||
inc(pool.acqIncPeersCount)
|
||||
pindex = pool.incQueue.pop().data
|
||||
else:
|
||||
inc(pool.acqOutPeersCount)
|
||||
pindex = pool.outQueue.pop().data
|
||||
else:
|
||||
if len(pool.outQueue) > 0:
|
||||
inc(pool.acqOutPeersCount)
|
||||
pindex = pool.outQueue.pop().data
|
||||
else:
|
||||
inc(pool.acqIncPeersCount)
|
||||
pindex = pool.incQueue.pop().data
|
||||
else:
|
||||
if PeerType.Outgoing in filter:
|
||||
inc(pool.acqOutPeersCount)
|
||||
pindex = pool.outQueue.pop().data
|
||||
elif PeerType.Incoming in filter:
|
||||
inc(pool.acqIncPeersCount)
|
||||
pindex = pool.incQueue.pop().data
|
||||
addr(pool.storage[pindex])
|
||||
|
||||
proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1,
|
||||
maxOutgoingPeers = -1,
|
||||
scoreCheckCb: PeerScoreCheckCallback[A] = nil): PeerPool[A, B] =
|
||||
scoreCheckCb: PeerScoreCheckCallback[A] = nil,
|
||||
peerCounterCb: PeerCounterCallback = nil): PeerPool[A, B] =
|
||||
## Create new PeerPool.
|
||||
##
|
||||
## ``maxPeers`` - maximum number of peers allowed. All the peers which
|
||||
@ -165,6 +140,9 @@ proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1,
|
||||
## If callback procedure returns ``false`` peer will be removed from
|
||||
## PeerPool.
|
||||
##
|
||||
## ``peerCountCb`` - callback to be called when number of peers in PeerPool
|
||||
## has been changed.
|
||||
##
|
||||
## Please note, that if ``maxPeers`` is positive non-zero value, then equation
|
||||
## ``maxPeers >= maxIncomingPeers + maxOutgoingPeers`` must be ``true``.
|
||||
var res = PeerPool[A, B]()
|
||||
@ -185,50 +163,55 @@ proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1,
|
||||
res.outQueue = initHeapQueue[PeerIndex]()
|
||||
res.registry = initTable[B, PeerIndex]()
|
||||
res.scoreCheck = scoreCheckCb
|
||||
res.peerCounter = peerCounterCb
|
||||
res.storage = newSeq[PeerItem[A]]()
|
||||
|
||||
proc peerCmp(a, b: PeerIndex): bool {.closure, gcsafe.} =
|
||||
let p1 = res.storage[a.data].data
|
||||
let p2 = res.storage[b.data].data
|
||||
result = p1 < p2
|
||||
p1 < p2
|
||||
|
||||
res.cmp = peerCmp
|
||||
result = res
|
||||
res
|
||||
|
||||
proc len*[A, B](pool: PeerPool[A, B]): int =
|
||||
## Returns number of registered peers in PeerPool ``pool``. This number
|
||||
## includes all the peers (acquired and available).
|
||||
result = len(pool.registry)
|
||||
len(pool.registry)
|
||||
|
||||
proc lenAvailable*[A, B](pool: PeerPool[A, B],
|
||||
filter = {PeerType.Incoming,
|
||||
PeerType.Outgoing}): int {.inline.} =
|
||||
## Returns number of available peers in PeerPool ``pool`` which satisfies
|
||||
## filter ``filter``.
|
||||
if PeerType.Incoming in filter:
|
||||
result = result + len(pool.incQueue)
|
||||
if PeerType.Outgoing in filter:
|
||||
result = result + len(pool.outQueue)
|
||||
(if PeerType.Incoming in filter: len(pool.incQueue) else: 0) +
|
||||
(if PeerType.Outgoing in filter: len(pool.outQueue) else: 0)
|
||||
|
||||
proc lenAcquired*[A, B](pool: PeerPool[A, B],
|
||||
filter = {PeerType.Incoming,
|
||||
PeerType.Outgoing}): int {.inline.} =
|
||||
## Returns number of acquired peers in PeerPool ``pool`` which satisifies
|
||||
## filter ``filter``.
|
||||
if PeerType.Incoming in filter:
|
||||
result = result + pool.acqIncPeersCount
|
||||
if PeerType.Outgoing in filter:
|
||||
result = result + pool.acqOutPeersCount
|
||||
(if PeerType.Incoming in filter: pool.acqIncPeersCount else: 0) +
|
||||
(if PeerType.Outgoing in filter: pool.acqOutPeersCount else: 0)
|
||||
|
||||
proc checkPeerScore*[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} =
|
||||
proc shortLogAvailable*[A, B](pool: PeerPool[A, B]): string =
|
||||
$len(pool.incQueue) & "/" & $len(pool.outQueue)
|
||||
|
||||
proc shortLogAcquired*[A, B](pool: PeerPool[A, B]): string =
|
||||
$pool.acqIncPeersCount & "/" & $pool.acqOutPeersCount
|
||||
|
||||
proc checkPeerScore[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} =
|
||||
## Returns ``true`` if peer passing score check.
|
||||
if not(isNil(pool.scoreCheck)):
|
||||
if pool.scoreCheck(peer):
|
||||
result = true
|
||||
else:
|
||||
result = false
|
||||
pool.scoreCheck(peer)
|
||||
else:
|
||||
result = true
|
||||
true
|
||||
|
||||
proc peerCountChanged[A, B](pool: PeerPool[A, B]) {.inline.} =
|
||||
## Call callback when number of peers changed.
|
||||
if not(isNil(pool.peerCounter)):
|
||||
pool.peerCounter()
|
||||
|
||||
proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
|
||||
## Remove ``peer`` from PeerPool ``pool``.
|
||||
@ -237,7 +220,7 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
|
||||
## be deleted only when peer will be released. You can change this behavior
|
||||
## with ``force`` option.
|
||||
mixin getKey
|
||||
var key = getKey(peer)
|
||||
let key = getKey(peer)
|
||||
if pool.registry.hasKey(key):
|
||||
let pindex = pool.registry[key].data
|
||||
var item = addr(pool.storage[pindex])
|
||||
@ -252,10 +235,12 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
|
||||
dec(pool.curOutPeersCount)
|
||||
dec(pool.acqOutPeersCount)
|
||||
|
||||
# Indicate that we have an empty space
|
||||
pool.fireNotFullEvent(item[])
|
||||
# Cleanup storage with default item, and removing key from hashtable.
|
||||
pool.storage[pindex] = PeerItem[A]()
|
||||
pool.registry.del(key)
|
||||
pool.peerCountChanged()
|
||||
else:
|
||||
if item[].peerType == PeerType.Incoming:
|
||||
# If peer is available, then its copy present in heapqueue, so we need
|
||||
@ -274,12 +259,15 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
|
||||
break
|
||||
dec(pool.curOutPeersCount)
|
||||
|
||||
# Indicate that we have an empty space
|
||||
pool.fireNotFullEvent(item[])
|
||||
# Cleanup storage with default item, and removing key from hashtable.
|
||||
pool.storage[pindex] = PeerItem[A]()
|
||||
pool.registry.del(key)
|
||||
|
||||
result = true
|
||||
pool.peerCountChanged()
|
||||
true
|
||||
else:
|
||||
false
|
||||
|
||||
proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B,
|
||||
peerType: PeerType): PeerIndex =
|
||||
@ -293,7 +281,7 @@ proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B,
|
||||
let pindex = PeerIndex(data: item.index, cmp: pool.cmp)
|
||||
pool.registry[peerKey] = pindex
|
||||
pitem[].data.getFuture().addCallback(onPeerClosed)
|
||||
result = pindex
|
||||
pindex
|
||||
|
||||
proc addPeerNoWait*[A, B](pool: PeerPool[A, B],
|
||||
peer: A, peerType: PeerType): bool =
|
||||
@ -311,7 +299,6 @@ proc addPeerNoWait*[A, B](pool: PeerPool[A, B],
|
||||
if not(pool.checkPeerScore(peer)):
|
||||
return false
|
||||
|
||||
result = false
|
||||
let peerKey = getKey(peer)
|
||||
|
||||
if not(pool.registry.hasKey(peerKey)) and not(peer.getFuture().finished):
|
||||
@ -322,14 +309,17 @@ proc addPeerNoWait*[A, B](pool: PeerPool[A, B],
|
||||
inc(pool.curIncPeersCount)
|
||||
pool.incQueue.push(pindex)
|
||||
pool.incNotEmptyEvent.fire()
|
||||
result = true
|
||||
pool.peerCountChanged()
|
||||
return true
|
||||
elif peerType == PeerType.Outgoing:
|
||||
if pool.curOutPeersCount < pool.maxOutPeersCount:
|
||||
let pindex = pool.addPeerImpl(peer, peerKey, peerType)
|
||||
inc(pool.curOutPeersCount)
|
||||
pool.outQueue.push(pindex)
|
||||
pool.outNotEmptyEvent.fire()
|
||||
result = true
|
||||
pool.peerCountChanged()
|
||||
return true
|
||||
return false
|
||||
|
||||
proc addPeer*[A, B](pool: PeerPool[A, B],
|
||||
peer: A, peerType: PeerType): Future[bool] {.async.} =
|
||||
@ -348,7 +338,6 @@ proc addPeer*[A, B](pool: PeerPool[A, B],
|
||||
if not(pool.checkPeerScore(peer)):
|
||||
return false
|
||||
|
||||
var res = false
|
||||
let peerKey = getKey(peer)
|
||||
|
||||
if not(pool.registry.hasKey(peerKey)) and not(peer.getFuture().finished):
|
||||
@ -361,7 +350,8 @@ proc addPeer*[A, B](pool: PeerPool[A, B],
|
||||
inc(pool.curIncPeersCount)
|
||||
pool.incQueue.push(pindex)
|
||||
pool.incNotEmptyEvent.fire()
|
||||
res = true
|
||||
pool.peerCountChanged()
|
||||
return true
|
||||
elif peerType == PeerType.Outgoing:
|
||||
while pool.curOutPeersCount >= pool.maxOutPeersCount:
|
||||
await pool.waitNotFullEvent({peerType})
|
||||
@ -369,78 +359,102 @@ proc addPeer*[A, B](pool: PeerPool[A, B],
|
||||
inc(pool.curOutPeersCount)
|
||||
pool.outQueue.push(pindex)
|
||||
pool.outNotEmptyEvent.fire()
|
||||
res = true
|
||||
|
||||
result = res
|
||||
pool.peerCountChanged()
|
||||
return true
|
||||
else:
|
||||
return false
|
||||
|
||||
proc addIncomingPeerNoWait*[A, B](pool: PeerPool[A, B],
|
||||
peer: A): bool {.inline.} =
|
||||
## Add incoming peer ``peer`` to PeerPool ``pool``.
|
||||
##
|
||||
## Returns ``true`` on success.
|
||||
result = pool.addPeerNoWait(peer, PeerType.Incoming)
|
||||
pool.addPeerNoWait(peer, PeerType.Incoming)
|
||||
|
||||
proc addOutgoingPeerNoWait*[A, B](pool: PeerPool[A, B],
|
||||
peer: A): bool {.inline.} =
|
||||
## Add outgoing peer ``peer`` to PeerPool ``pool``.
|
||||
##
|
||||
## Returns ``true`` on success.
|
||||
result = pool.addPeerNoWait(peer, PeerType.Outgoing)
|
||||
pool.addPeerNoWait(peer, PeerType.Outgoing)
|
||||
|
||||
proc addIncomingPeer*[A, B](pool: PeerPool[A, B],
|
||||
peer: A): Future[bool] {.inline.} =
|
||||
## Add incoming peer ``peer`` to PeerPool ``pool``.
|
||||
##
|
||||
## Returns ``true`` on success.
|
||||
result = pool.addPeer(peer, PeerType.Incoming)
|
||||
pool.addPeer(peer, PeerType.Incoming)
|
||||
|
||||
proc addOutgoingPeer*[A, B](pool: PeerPool[A, B],
|
||||
peer: A): Future[bool] {.inline.} =
|
||||
## Add outgoing peer ``peer`` to PeerPool ``pool``.
|
||||
##
|
||||
## Returns ``true`` on success.
|
||||
result = pool.addPeer(peer, PeerType.Outgoing)
|
||||
pool.addPeer(peer, PeerType.Outgoing)
|
||||
|
||||
proc acquireItemImpl[A, B](pool: PeerPool[A, B],
|
||||
filter: set[PeerType]): A {.inline.} =
|
||||
doAssert(filter != {})
|
||||
doAssert((len(pool.outQueue) > 0) or (len(pool.incQueue) > 0))
|
||||
let pindex =
|
||||
if filter == {PeerType.Incoming, PeerType.Outgoing}:
|
||||
if len(pool.outQueue) > 0 and len(pool.incQueue) > 0:
|
||||
# Don't think `<` is actually `<` here.
|
||||
if pool.incQueue[0] < pool.outQueue[0]:
|
||||
inc(pool.acqIncPeersCount)
|
||||
let item = pool.incQueue.pop()
|
||||
item.data
|
||||
else:
|
||||
inc(pool.acqOutPeersCount)
|
||||
let item = pool.outQueue.pop()
|
||||
item.data
|
||||
else:
|
||||
if len(pool.outQueue) > 0:
|
||||
inc(pool.acqOutPeersCount)
|
||||
let item = pool.outQueue.pop()
|
||||
item.data
|
||||
else:
|
||||
inc(pool.acqIncPeersCount)
|
||||
let item = pool.incQueue.pop()
|
||||
item.data
|
||||
else:
|
||||
if PeerType.Outgoing in filter:
|
||||
inc(pool.acqOutPeersCount)
|
||||
let item = pool.outQueue.pop()
|
||||
item.data
|
||||
else:
|
||||
inc(pool.acqIncPeersCount)
|
||||
let item = pool.incQueue.pop()
|
||||
item.data
|
||||
var pitem = addr(pool.storage[pindex])
|
||||
doAssert(PeerFlags.Acquired notin pitem[].flags)
|
||||
pitem[].flags.incl(PeerFlags.Acquired)
|
||||
pitem[].data
|
||||
|
||||
proc acquire*[A, B](pool: PeerPool[A, B],
|
||||
filter = {PeerType.Incoming,
|
||||
PeerType.Outgoing}): Future[A] {.async.} =
|
||||
## Acquire peer from PeerPool ``pool``, which match the filter ``filter``.
|
||||
mixin getKey
|
||||
doAssert(filter != {}, "Filter must not be empty")
|
||||
while true:
|
||||
var count = 0
|
||||
if PeerType.Incoming in filter:
|
||||
count = count + len(pool.incQueue)
|
||||
if PeerType.Outgoing in filter:
|
||||
count = count + len(pool.outQueue)
|
||||
if count == 0:
|
||||
if pool.lenAvailable(filter) == 0:
|
||||
await pool.waitNotEmptyEvent(filter)
|
||||
else:
|
||||
var item = pool.getItem(filter)
|
||||
doAssert(PeerFlags.Acquired notin item[].flags)
|
||||
item[].flags.incl(PeerFlags.Acquired)
|
||||
result = item[].data
|
||||
break
|
||||
return pool.acquireItemImpl(filter)
|
||||
|
||||
proc acquireNoWait*[A, B](pool: PeerPool[A, B],
|
||||
filter = {PeerType.Incoming,
|
||||
PeerType.Outgoing}): A =
|
||||
doAssert(filter != {}, "Filter must not be empty")
|
||||
var count = 0
|
||||
if PeerType.Incoming in filter:
|
||||
count = count + len(pool.incQueue)
|
||||
if PeerType.Outgoing in filter:
|
||||
count = count + len(pool.outQueue)
|
||||
if count < 1:
|
||||
if pool.lenAvailable(filter) < 1:
|
||||
raise newException(PeerPoolError, "Not enough peers in pool")
|
||||
var item = pool.getItem(filter)
|
||||
doAssert(PeerFlags.Acquired notin item[].flags)
|
||||
item[].flags.incl(PeerFlags.Acquired)
|
||||
result = item[].data
|
||||
pool.acquireItemImpl(filter)
|
||||
|
||||
proc release*[A, B](pool: PeerPool[A, B], peer: A) =
|
||||
## Release peer ``peer`` back to PeerPool ``pool``
|
||||
mixin getKey
|
||||
var key = getKey(peer)
|
||||
let key = getKey(peer)
|
||||
var titem = pool.registry.getOrDefault(key, PeerIndex(data: -1))
|
||||
if titem.data >= 0:
|
||||
let pindex = titem.data
|
||||
@ -448,17 +462,12 @@ proc release*[A, B](pool: PeerPool[A, B], peer: A) =
|
||||
if PeerFlags.Acquired in item[].flags:
|
||||
if not(pool.checkPeerScore(peer)):
|
||||
item[].flags.incl(DeleteOnRelease)
|
||||
item[].flags.excl(PeerFlags.Acquired)
|
||||
if PeerFlags.DeleteOnRelease in item[].flags:
|
||||
if item[].peerType == PeerType.Incoming:
|
||||
dec(pool.curIncPeersCount)
|
||||
dec(pool.acqIncPeersCount)
|
||||
elif item[].peerType == PeerType.Outgoing:
|
||||
dec(pool.curOutPeersCount)
|
||||
dec(pool.acqOutPeersCount)
|
||||
pool.storage[pindex] = PeerItem[A]()
|
||||
pool.registry.del(key)
|
||||
# We do not care about result here because peer is present in registry
|
||||
# and has all proper flags set.
|
||||
discard pool.deletePeer(peer, force = true)
|
||||
else:
|
||||
item[].flags.excl(PeerFlags.Acquired)
|
||||
if item[].peerType == PeerType.Incoming:
|
||||
pool.incQueue.push(titem)
|
||||
dec(pool.acqIncPeersCount)
|
||||
@ -485,18 +494,10 @@ proc acquire*[A, B](pool: PeerPool[A, B],
|
||||
while true:
|
||||
if len(peers) >= number:
|
||||
break
|
||||
var count = 0
|
||||
if PeerType.Incoming in filter:
|
||||
count = count + len(pool.incQueue)
|
||||
if PeerType.Outgoing in filter:
|
||||
count = count + len(pool.outQueue)
|
||||
if count == 0:
|
||||
if pool.lenAvailable(filter) == 0:
|
||||
await pool.waitNotEmptyEvent(filter)
|
||||
else:
|
||||
var item = pool.getItem(filter)
|
||||
doAssert(PeerFlags.Acquired notin item[].flags)
|
||||
item[].flags.incl(PeerFlags.Acquired)
|
||||
peers.add(item[].data)
|
||||
peers.add(pool.acquireItemImpl(filter))
|
||||
except CancelledError as exc:
|
||||
# If we got cancelled, we need to return all the acquired peers back to
|
||||
# pool.
|
||||
@ -504,7 +505,7 @@ proc acquire*[A, B](pool: PeerPool[A, B],
|
||||
pool.release(item)
|
||||
peers.setLen(0)
|
||||
raise exc
|
||||
result = peers
|
||||
return peers
|
||||
|
||||
proc acquireNoWait*[A, B](pool: PeerPool[A, B],
|
||||
number: int,
|
||||
@ -514,19 +515,11 @@ proc acquireNoWait*[A, B](pool: PeerPool[A, B],
|
||||
## filter ``filter``.
|
||||
doAssert(filter != {}, "Filter must not be empty")
|
||||
var peers = newSeq[A]()
|
||||
var count = 0
|
||||
if PeerType.Incoming in filter:
|
||||
count = count + len(pool.incQueue)
|
||||
if PeerType.Outgoing in filter:
|
||||
count = count + len(pool.outQueue)
|
||||
if count < number:
|
||||
if pool.lenAvailable(filter) < number:
|
||||
raise newException(PeerPoolError, "Not enough peers in pool")
|
||||
for i in 0 ..< number:
|
||||
var item = pool.getItem(filter)
|
||||
doAssert(PeerFlags.Acquired notin item[].flags)
|
||||
item[].flags.incl(PeerFlags.Acquired)
|
||||
peers.add(item[].data)
|
||||
result = peers
|
||||
peers.add(pool.acquireItemImpl(filter))
|
||||
return peers
|
||||
|
||||
proc acquireIncomingPeer*[A, B](pool: PeerPool[A, B]): Future[A] {.inline.} =
|
||||
## Acquire single incoming peer from PeerPool ``pool``.
|
||||
@ -596,16 +589,16 @@ iterator acquiredPeers*[A, B](pool: PeerPool[A, B],
|
||||
proc `[]`*[A, B](pool: PeerPool[A, B], key: B): A {.inline.} =
|
||||
## Retrieve peer with key ``key`` from PeerPool ``pool``.
|
||||
let pindex = pool.registry[key]
|
||||
result = pool.storage[pindex.data]
|
||||
pool.storage[pindex.data]
|
||||
|
||||
proc `[]`*[A, B](pool: var PeerPool[A, B], key: B): var A {.inline.} =
|
||||
## Retrieve peer with key ``key`` from PeerPool ``pool``.
|
||||
let pindex = pool.registry[key]
|
||||
result = pool.storage[pindex.data].data
|
||||
pool.storage[pindex.data].data
|
||||
|
||||
proc hasPeer*[A, B](pool: PeerPool[A, B], key: B): bool {.inline.} =
|
||||
## Returns ``true`` if peer with ``key`` present in PeerPool ``pool``.
|
||||
result = pool.registry.hasKey(key)
|
||||
pool.registry.hasKey(key)
|
||||
|
||||
proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B): A {.inline.} =
|
||||
## Retrieves the peer from PeerPool ``pool`` using key ``key``. If peer is
|
||||
@ -613,7 +606,9 @@ proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B): A {.inline.} =
|
||||
## (e.g. 0 for any integer type).
|
||||
let pindex = pool.registry.getOrDefault(key, PeerIndex(data: -1))
|
||||
if pindex.data >= 0:
|
||||
result = pool.storage[pindex.data].data
|
||||
pool.storage[pindex.data].data
|
||||
else:
|
||||
A()
|
||||
|
||||
proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B,
|
||||
default: A): A {.inline.} =
|
||||
@ -621,9 +616,9 @@ proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B,
|
||||
## not present, default value ``default`` is returned.
|
||||
let pindex = pool.registry.getOrDefault(key, PeerIndex(data: -1))
|
||||
if pindex.data >= 0:
|
||||
result = pool.storage[pindex.data].data
|
||||
pool.storage[pindex.data].data
|
||||
else:
|
||||
result = default
|
||||
default
|
||||
|
||||
proc clear*[A, B](pool: PeerPool[A, B]) =
|
||||
## Performs PeerPool's ``pool`` storage and counters reset.
|
||||
@ -652,3 +647,7 @@ proc setScoreCheck*[A, B](pool: PeerPool[A, B],
|
||||
scoreCheckCb: PeerScoreCheckCallback[A]) =
|
||||
## Add ScoreCheck callback.
|
||||
pool.scoreCheck = scoreCheckCb
|
||||
|
||||
proc setPeerCounter*[A, B](pool: PeerPool[A, B],
|
||||
peerCounterCb: PeerCounterCallback) =
|
||||
pool.peerCounter = peerCounterCb
|
||||
|
@ -7,11 +7,10 @@
|
||||
|
||||
{.used.}
|
||||
|
||||
import
|
||||
unittest, random, heapqueue, tables, strutils,
|
||||
./testutil,
|
||||
chronos,
|
||||
../beacon_chain/peer_pool
|
||||
import std/[unittest, random, heapqueue, tables]
|
||||
import chronos
|
||||
import ../beacon_chain/peer_pool
|
||||
import testutil
|
||||
|
||||
type
|
||||
PeerTestID* = string
|
||||
@ -640,3 +639,37 @@ suiteReport "PeerPool testing suite":
|
||||
lenAvailable(pool) == 0
|
||||
lenAcquired(pool) == 0
|
||||
len(pool) == 0
|
||||
|
||||
timedTest "Delete peer on release text":
|
||||
proc testDeleteOnRelease(): Future[bool] {.async.} =
|
||||
proc scoreCheck(peer: PeerTest): bool =
|
||||
if peer.weight >= 0:
|
||||
result = true
|
||||
else:
|
||||
result = false
|
||||
|
||||
var pool = newPeerPool[PeerTest, PeerTestID](maxPeers = 1,
|
||||
maxIncomingPeers = 1,
|
||||
maxOutgoingPeers = 0)
|
||||
pool.setScoreCheck(scoreCheck)
|
||||
|
||||
var peer0 = PeerTest.init("idInc0", 100)
|
||||
var peer1 = PeerTest.init("idOut0", 100)
|
||||
var peer2 = PeerTest.init("idInc1", 100)
|
||||
var fut0 = pool.addIncomingPeer(peer0)
|
||||
var fut1 = pool.addOutgoingPeer(peer1)
|
||||
var fut2 = pool.addIncomingPeer(peer2)
|
||||
doAssert(fut0.finished == true and fut0.failed == false)
|
||||
doAssert(fut1.finished == false)
|
||||
doAssert(fut2.finished == false)
|
||||
var p = await pool.acquire()
|
||||
doAssert(p.id == "idInc0")
|
||||
p.weight = -200
|
||||
pool.release(p)
|
||||
await sleepAsync(100.milliseconds)
|
||||
doAssert(fut1.finished == false)
|
||||
doAssert(fut2.finished == true and fut2.failed == false)
|
||||
doAssert(len(pool) == 1)
|
||||
result = true
|
||||
|
||||
check waitFor(testDeleteOnRelease()) == true
|
||||
|
Loading…
x
Reference in New Issue
Block a user