PeerPool fixes. (#1654)

* Refactor peer_pool.
Fix eth2_network peer counters.
Fix PeerPool do not allow to add more peers when empty space available.

* Remove unused imports.

* Add test for a bug.

* Fix eth2_network disconnect should deletePeer not release.
More PeerPool refactoring.
This commit is contained in:
Eugene Kabanov 2020-09-16 13:00:11 +03:00 committed by Mamy Ratsimbazafy
parent 67f3bd203e
commit 8b492ed1ce
3 changed files with 168 additions and 136 deletions

View File

@ -378,7 +378,7 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason,
peer.connectionState = Disconnecting peer.connectionState = Disconnecting
await peer.network.switch.disconnect(peer.info.peerId) await peer.network.switch.disconnect(peer.info.peerId)
peer.connectionState = Disconnected peer.connectionState = Disconnected
peer.network.peerPool.release(peer) discard peer.network.peerPool.deletePeer(peer)
let seenTime = case reason let seenTime = case reason
of ClientShutDown: of ClientShutDown:
SeenTableTimeClientShutDown SeenTableTimeClientShutDown
@ -681,7 +681,6 @@ proc handleOutgoingPeer(peer: Peer): Future[bool] {.async.} =
proc onPeerClosed(udata: pointer) = proc onPeerClosed(udata: pointer) =
debug "Peer (outgoing) lost", peer debug "Peer (outgoing) lost", peer
nbc_peers.set int64(len(network.peerPool))
let res = await network.peerPool.addOutgoingPeer(peer) let res = await network.peerPool.addOutgoingPeer(peer)
if res: if res:
@ -690,14 +689,11 @@ proc handleOutgoingPeer(peer: Peer): Future[bool] {.async.} =
peer.getFuture().addCallback(onPeerClosed) peer.getFuture().addCallback(onPeerClosed)
result = true result = true
nbc_peers.set int64(len(network.peerPool))
proc handleIncomingPeer(peer: Peer): Future[bool] {.async.} = proc handleIncomingPeer(peer: Peer): Future[bool] {.async.} =
let network = peer.network let network = peer.network
proc onPeerClosed(udata: pointer) = proc onPeerClosed(udata: pointer) =
debug "Peer (incoming) lost", peer debug "Peer (incoming) lost", peer
nbc_peers.set int64(len(network.peerPool))
let res = await network.peerPool.addIncomingPeer(peer) let res = await network.peerPool.addIncomingPeer(peer)
if res: if res:
@ -706,8 +702,6 @@ proc handleIncomingPeer(peer: Peer): Future[bool] {.async.} =
peer.getFuture().addCallback(onPeerClosed) peer.getFuture().addCallback(onPeerClosed)
result = true result = true
nbc_peers.set int64(len(network.peerPool))
proc toPeerAddr*(r: enr.TypedRecord): proc toPeerAddr*(r: enr.TypedRecord):
Result[PeerAddr, cstring] {.raises: [Defect].} = Result[PeerAddr, cstring] {.raises: [Defect].} =
if not r.secp256k1.isSome: if not r.secp256k1.isSome:
@ -935,6 +929,12 @@ proc startListening*(node: Eth2Node) {.async.} =
await node.pubsub.start() await node.pubsub.start()
proc start*(node: Eth2Node) {.async.} = proc start*(node: Eth2Node) {.async.} =
proc onPeerCountChanged() =
nbc_peers.set int64(len(node.peerPool))
node.peerPool.setPeerCounter(onPeerCountChanged)
for i in 0 ..< ConcurrentConnections: for i in 0 ..< ConcurrentConnections:
node.connWorkers.add connectWorker(node) node.connWorkers.add connectWorker(node)

View File

@ -1,4 +1,4 @@
import tables, heapqueue import std/[tables, heapqueue]
import chronos import chronos
type type
@ -23,6 +23,8 @@ type
PeerScoreCheckCallback*[T] = proc(peer: T): bool {.gcsafe, raises: [Defect].} PeerScoreCheckCallback*[T] = proc(peer: T): bool {.gcsafe, raises: [Defect].}
PeerCounterCallback* = proc() {.gcsafe, raises: [Defect].}
PeerPool*[A, B] = ref object PeerPool*[A, B] = ref object
incNotEmptyEvent: AsyncEvent incNotEmptyEvent: AsyncEvent
outNotEmptyEvent: AsyncEvent outNotEmptyEvent: AsyncEvent
@ -34,6 +36,7 @@ type
storage: seq[PeerItem[A]] storage: seq[PeerItem[A]]
cmp: proc(a, b: PeerIndex): bool {.closure, gcsafe.} cmp: proc(a, b: PeerIndex): bool {.closure, gcsafe.}
scoreCheck: PeerScoreCheckCallback[A] scoreCheck: PeerScoreCheckCallback[A]
peerCounter: PeerCounterCallback
maxPeersCount: int maxPeersCount: int
maxIncPeersCount: int maxIncPeersCount: int
maxOutPeersCount: int maxOutPeersCount: int
@ -45,7 +48,7 @@ type
PeerPoolError* = object of CatchableError PeerPoolError* = object of CatchableError
proc `<`*(a, b: PeerIndex): bool = proc `<`*(a, b: PeerIndex): bool =
result = a.cmp(b, a) a.cmp(b, a)
proc fireNotEmptyEvent[A, B](pool: PeerPool[A, B], proc fireNotEmptyEvent[A, B](pool: PeerPool[A, B],
item: PeerItem[A]) {.inline.} = item: PeerItem[A]) {.inline.} =
@ -115,38 +118,10 @@ proc waitNotFullEvent[A, B](pool: PeerPool[A, B],
filter: set[PeerType]): Future[void] = filter: set[PeerType]): Future[void] =
pool.waitForEvent(EventType.NotFullEvent, filter) pool.waitForEvent(EventType.NotFullEvent, filter)
template getItem[A, B](pool: PeerPool[A, B],
filter: set[PeerType]): ptr PeerItem[A] =
doAssert((len(pool.outQueue) > 0) or (len(pool.incQueue) > 0))
var pindex: int
if filter == {PeerType.Incoming, PeerType.Outgoing}:
if len(pool.outQueue) > 0 and len(pool.incQueue) > 0:
# Don't think `<` is actually `<` here.
if pool.incQueue[0] < pool.outQueue[0]:
inc(pool.acqIncPeersCount)
pindex = pool.incQueue.pop().data
else:
inc(pool.acqOutPeersCount)
pindex = pool.outQueue.pop().data
else:
if len(pool.outQueue) > 0:
inc(pool.acqOutPeersCount)
pindex = pool.outQueue.pop().data
else:
inc(pool.acqIncPeersCount)
pindex = pool.incQueue.pop().data
else:
if PeerType.Outgoing in filter:
inc(pool.acqOutPeersCount)
pindex = pool.outQueue.pop().data
elif PeerType.Incoming in filter:
inc(pool.acqIncPeersCount)
pindex = pool.incQueue.pop().data
addr(pool.storage[pindex])
proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1, proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1,
maxOutgoingPeers = -1, maxOutgoingPeers = -1,
scoreCheckCb: PeerScoreCheckCallback[A] = nil): PeerPool[A, B] = scoreCheckCb: PeerScoreCheckCallback[A] = nil,
peerCounterCb: PeerCounterCallback = nil): PeerPool[A, B] =
## Create new PeerPool. ## Create new PeerPool.
## ##
## ``maxPeers`` - maximum number of peers allowed. All the peers which ## ``maxPeers`` - maximum number of peers allowed. All the peers which
@ -165,6 +140,9 @@ proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1,
## If callback procedure returns ``false`` peer will be removed from ## If callback procedure returns ``false`` peer will be removed from
## PeerPool. ## PeerPool.
## ##
## ``peerCountCb`` - callback to be called when number of peers in PeerPool
## has been changed.
##
## Please note, that if ``maxPeers`` is positive non-zero value, then equation ## Please note, that if ``maxPeers`` is positive non-zero value, then equation
## ``maxPeers >= maxIncomingPeers + maxOutgoingPeers`` must be ``true``. ## ``maxPeers >= maxIncomingPeers + maxOutgoingPeers`` must be ``true``.
var res = PeerPool[A, B]() var res = PeerPool[A, B]()
@ -185,50 +163,55 @@ proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1,
res.outQueue = initHeapQueue[PeerIndex]() res.outQueue = initHeapQueue[PeerIndex]()
res.registry = initTable[B, PeerIndex]() res.registry = initTable[B, PeerIndex]()
res.scoreCheck = scoreCheckCb res.scoreCheck = scoreCheckCb
res.peerCounter = peerCounterCb
res.storage = newSeq[PeerItem[A]]() res.storage = newSeq[PeerItem[A]]()
proc peerCmp(a, b: PeerIndex): bool {.closure, gcsafe.} = proc peerCmp(a, b: PeerIndex): bool {.closure, gcsafe.} =
let p1 = res.storage[a.data].data let p1 = res.storage[a.data].data
let p2 = res.storage[b.data].data let p2 = res.storage[b.data].data
result = p1 < p2 p1 < p2
res.cmp = peerCmp res.cmp = peerCmp
result = res res
proc len*[A, B](pool: PeerPool[A, B]): int = proc len*[A, B](pool: PeerPool[A, B]): int =
## Returns number of registered peers in PeerPool ``pool``. This number ## Returns number of registered peers in PeerPool ``pool``. This number
## includes all the peers (acquired and available). ## includes all the peers (acquired and available).
result = len(pool.registry) len(pool.registry)
proc lenAvailable*[A, B](pool: PeerPool[A, B], proc lenAvailable*[A, B](pool: PeerPool[A, B],
filter = {PeerType.Incoming, filter = {PeerType.Incoming,
PeerType.Outgoing}): int {.inline.} = PeerType.Outgoing}): int {.inline.} =
## Returns number of available peers in PeerPool ``pool`` which satisfies ## Returns number of available peers in PeerPool ``pool`` which satisfies
## filter ``filter``. ## filter ``filter``.
if PeerType.Incoming in filter: (if PeerType.Incoming in filter: len(pool.incQueue) else: 0) +
result = result + len(pool.incQueue) (if PeerType.Outgoing in filter: len(pool.outQueue) else: 0)
if PeerType.Outgoing in filter:
result = result + len(pool.outQueue)
proc lenAcquired*[A, B](pool: PeerPool[A, B], proc lenAcquired*[A, B](pool: PeerPool[A, B],
filter = {PeerType.Incoming, filter = {PeerType.Incoming,
PeerType.Outgoing}): int {.inline.} = PeerType.Outgoing}): int {.inline.} =
## Returns number of acquired peers in PeerPool ``pool`` which satisifies ## Returns number of acquired peers in PeerPool ``pool`` which satisifies
## filter ``filter``. ## filter ``filter``.
if PeerType.Incoming in filter: (if PeerType.Incoming in filter: pool.acqIncPeersCount else: 0) +
result = result + pool.acqIncPeersCount (if PeerType.Outgoing in filter: pool.acqOutPeersCount else: 0)
if PeerType.Outgoing in filter:
result = result + pool.acqOutPeersCount
proc checkPeerScore*[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} = proc shortLogAvailable*[A, B](pool: PeerPool[A, B]): string =
$len(pool.incQueue) & "/" & $len(pool.outQueue)
proc shortLogAcquired*[A, B](pool: PeerPool[A, B]): string =
$pool.acqIncPeersCount & "/" & $pool.acqOutPeersCount
proc checkPeerScore[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} =
## Returns ``true`` if peer passing score check. ## Returns ``true`` if peer passing score check.
if not(isNil(pool.scoreCheck)): if not(isNil(pool.scoreCheck)):
if pool.scoreCheck(peer): pool.scoreCheck(peer)
result = true
else:
result = false
else: else:
result = true true
proc peerCountChanged[A, B](pool: PeerPool[A, B]) {.inline.} =
## Call callback when number of peers changed.
if not(isNil(pool.peerCounter)):
pool.peerCounter()
proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
## Remove ``peer`` from PeerPool ``pool``. ## Remove ``peer`` from PeerPool ``pool``.
@ -237,7 +220,7 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
## be deleted only when peer will be released. You can change this behavior ## be deleted only when peer will be released. You can change this behavior
## with ``force`` option. ## with ``force`` option.
mixin getKey mixin getKey
var key = getKey(peer) let key = getKey(peer)
if pool.registry.hasKey(key): if pool.registry.hasKey(key):
let pindex = pool.registry[key].data let pindex = pool.registry[key].data
var item = addr(pool.storage[pindex]) var item = addr(pool.storage[pindex])
@ -252,10 +235,12 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
dec(pool.curOutPeersCount) dec(pool.curOutPeersCount)
dec(pool.acqOutPeersCount) dec(pool.acqOutPeersCount)
# Indicate that we have an empty space
pool.fireNotFullEvent(item[]) pool.fireNotFullEvent(item[])
# Cleanup storage with default item, and removing key from hashtable. # Cleanup storage with default item, and removing key from hashtable.
pool.storage[pindex] = PeerItem[A]() pool.storage[pindex] = PeerItem[A]()
pool.registry.del(key) pool.registry.del(key)
pool.peerCountChanged()
else: else:
if item[].peerType == PeerType.Incoming: if item[].peerType == PeerType.Incoming:
# If peer is available, then its copy present in heapqueue, so we need # If peer is available, then its copy present in heapqueue, so we need
@ -274,12 +259,15 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
break break
dec(pool.curOutPeersCount) dec(pool.curOutPeersCount)
# Indicate that we have an empty space
pool.fireNotFullEvent(item[]) pool.fireNotFullEvent(item[])
# Cleanup storage with default item, and removing key from hashtable. # Cleanup storage with default item, and removing key from hashtable.
pool.storage[pindex] = PeerItem[A]() pool.storage[pindex] = PeerItem[A]()
pool.registry.del(key) pool.registry.del(key)
pool.peerCountChanged()
result = true true
else:
false
proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B, proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B,
peerType: PeerType): PeerIndex = peerType: PeerType): PeerIndex =
@ -293,7 +281,7 @@ proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B,
let pindex = PeerIndex(data: item.index, cmp: pool.cmp) let pindex = PeerIndex(data: item.index, cmp: pool.cmp)
pool.registry[peerKey] = pindex pool.registry[peerKey] = pindex
pitem[].data.getFuture().addCallback(onPeerClosed) pitem[].data.getFuture().addCallback(onPeerClosed)
result = pindex pindex
proc addPeerNoWait*[A, B](pool: PeerPool[A, B], proc addPeerNoWait*[A, B](pool: PeerPool[A, B],
peer: A, peerType: PeerType): bool = peer: A, peerType: PeerType): bool =
@ -311,7 +299,6 @@ proc addPeerNoWait*[A, B](pool: PeerPool[A, B],
if not(pool.checkPeerScore(peer)): if not(pool.checkPeerScore(peer)):
return false return false
result = false
let peerKey = getKey(peer) let peerKey = getKey(peer)
if not(pool.registry.hasKey(peerKey)) and not(peer.getFuture().finished): if not(pool.registry.hasKey(peerKey)) and not(peer.getFuture().finished):
@ -322,14 +309,17 @@ proc addPeerNoWait*[A, B](pool: PeerPool[A, B],
inc(pool.curIncPeersCount) inc(pool.curIncPeersCount)
pool.incQueue.push(pindex) pool.incQueue.push(pindex)
pool.incNotEmptyEvent.fire() pool.incNotEmptyEvent.fire()
result = true pool.peerCountChanged()
return true
elif peerType == PeerType.Outgoing: elif peerType == PeerType.Outgoing:
if pool.curOutPeersCount < pool.maxOutPeersCount: if pool.curOutPeersCount < pool.maxOutPeersCount:
let pindex = pool.addPeerImpl(peer, peerKey, peerType) let pindex = pool.addPeerImpl(peer, peerKey, peerType)
inc(pool.curOutPeersCount) inc(pool.curOutPeersCount)
pool.outQueue.push(pindex) pool.outQueue.push(pindex)
pool.outNotEmptyEvent.fire() pool.outNotEmptyEvent.fire()
result = true pool.peerCountChanged()
return true
return false
proc addPeer*[A, B](pool: PeerPool[A, B], proc addPeer*[A, B](pool: PeerPool[A, B],
peer: A, peerType: PeerType): Future[bool] {.async.} = peer: A, peerType: PeerType): Future[bool] {.async.} =
@ -348,7 +338,6 @@ proc addPeer*[A, B](pool: PeerPool[A, B],
if not(pool.checkPeerScore(peer)): if not(pool.checkPeerScore(peer)):
return false return false
var res = false
let peerKey = getKey(peer) let peerKey = getKey(peer)
if not(pool.registry.hasKey(peerKey)) and not(peer.getFuture().finished): if not(pool.registry.hasKey(peerKey)) and not(peer.getFuture().finished):
@ -361,7 +350,8 @@ proc addPeer*[A, B](pool: PeerPool[A, B],
inc(pool.curIncPeersCount) inc(pool.curIncPeersCount)
pool.incQueue.push(pindex) pool.incQueue.push(pindex)
pool.incNotEmptyEvent.fire() pool.incNotEmptyEvent.fire()
res = true pool.peerCountChanged()
return true
elif peerType == PeerType.Outgoing: elif peerType == PeerType.Outgoing:
while pool.curOutPeersCount >= pool.maxOutPeersCount: while pool.curOutPeersCount >= pool.maxOutPeersCount:
await pool.waitNotFullEvent({peerType}) await pool.waitNotFullEvent({peerType})
@ -369,78 +359,102 @@ proc addPeer*[A, B](pool: PeerPool[A, B],
inc(pool.curOutPeersCount) inc(pool.curOutPeersCount)
pool.outQueue.push(pindex) pool.outQueue.push(pindex)
pool.outNotEmptyEvent.fire() pool.outNotEmptyEvent.fire()
res = true pool.peerCountChanged()
return true
result = res else:
return false
proc addIncomingPeerNoWait*[A, B](pool: PeerPool[A, B], proc addIncomingPeerNoWait*[A, B](pool: PeerPool[A, B],
peer: A): bool {.inline.} = peer: A): bool {.inline.} =
## Add incoming peer ``peer`` to PeerPool ``pool``. ## Add incoming peer ``peer`` to PeerPool ``pool``.
## ##
## Returns ``true`` on success. ## Returns ``true`` on success.
result = pool.addPeerNoWait(peer, PeerType.Incoming) pool.addPeerNoWait(peer, PeerType.Incoming)
proc addOutgoingPeerNoWait*[A, B](pool: PeerPool[A, B], proc addOutgoingPeerNoWait*[A, B](pool: PeerPool[A, B],
peer: A): bool {.inline.} = peer: A): bool {.inline.} =
## Add outgoing peer ``peer`` to PeerPool ``pool``. ## Add outgoing peer ``peer`` to PeerPool ``pool``.
## ##
## Returns ``true`` on success. ## Returns ``true`` on success.
result = pool.addPeerNoWait(peer, PeerType.Outgoing) pool.addPeerNoWait(peer, PeerType.Outgoing)
proc addIncomingPeer*[A, B](pool: PeerPool[A, B], proc addIncomingPeer*[A, B](pool: PeerPool[A, B],
peer: A): Future[bool] {.inline.} = peer: A): Future[bool] {.inline.} =
## Add incoming peer ``peer`` to PeerPool ``pool``. ## Add incoming peer ``peer`` to PeerPool ``pool``.
## ##
## Returns ``true`` on success. ## Returns ``true`` on success.
result = pool.addPeer(peer, PeerType.Incoming) pool.addPeer(peer, PeerType.Incoming)
proc addOutgoingPeer*[A, B](pool: PeerPool[A, B], proc addOutgoingPeer*[A, B](pool: PeerPool[A, B],
peer: A): Future[bool] {.inline.} = peer: A): Future[bool] {.inline.} =
## Add outgoing peer ``peer`` to PeerPool ``pool``. ## Add outgoing peer ``peer`` to PeerPool ``pool``.
## ##
## Returns ``true`` on success. ## Returns ``true`` on success.
result = pool.addPeer(peer, PeerType.Outgoing) pool.addPeer(peer, PeerType.Outgoing)
proc acquireItemImpl[A, B](pool: PeerPool[A, B],
filter: set[PeerType]): A {.inline.} =
doAssert(filter != {})
doAssert((len(pool.outQueue) > 0) or (len(pool.incQueue) > 0))
let pindex =
if filter == {PeerType.Incoming, PeerType.Outgoing}:
if len(pool.outQueue) > 0 and len(pool.incQueue) > 0:
# Don't think `<` is actually `<` here.
if pool.incQueue[0] < pool.outQueue[0]:
inc(pool.acqIncPeersCount)
let item = pool.incQueue.pop()
item.data
else:
inc(pool.acqOutPeersCount)
let item = pool.outQueue.pop()
item.data
else:
if len(pool.outQueue) > 0:
inc(pool.acqOutPeersCount)
let item = pool.outQueue.pop()
item.data
else:
inc(pool.acqIncPeersCount)
let item = pool.incQueue.pop()
item.data
else:
if PeerType.Outgoing in filter:
inc(pool.acqOutPeersCount)
let item = pool.outQueue.pop()
item.data
else:
inc(pool.acqIncPeersCount)
let item = pool.incQueue.pop()
item.data
var pitem = addr(pool.storage[pindex])
doAssert(PeerFlags.Acquired notin pitem[].flags)
pitem[].flags.incl(PeerFlags.Acquired)
pitem[].data
proc acquire*[A, B](pool: PeerPool[A, B], proc acquire*[A, B](pool: PeerPool[A, B],
filter = {PeerType.Incoming, filter = {PeerType.Incoming,
PeerType.Outgoing}): Future[A] {.async.} = PeerType.Outgoing}): Future[A] {.async.} =
## Acquire peer from PeerPool ``pool``, which match the filter ``filter``. ## Acquire peer from PeerPool ``pool``, which match the filter ``filter``.
mixin getKey
doAssert(filter != {}, "Filter must not be empty") doAssert(filter != {}, "Filter must not be empty")
while true: while true:
var count = 0 if pool.lenAvailable(filter) == 0:
if PeerType.Incoming in filter:
count = count + len(pool.incQueue)
if PeerType.Outgoing in filter:
count = count + len(pool.outQueue)
if count == 0:
await pool.waitNotEmptyEvent(filter) await pool.waitNotEmptyEvent(filter)
else: else:
var item = pool.getItem(filter) return pool.acquireItemImpl(filter)
doAssert(PeerFlags.Acquired notin item[].flags)
item[].flags.incl(PeerFlags.Acquired)
result = item[].data
break
proc acquireNoWait*[A, B](pool: PeerPool[A, B], proc acquireNoWait*[A, B](pool: PeerPool[A, B],
filter = {PeerType.Incoming, filter = {PeerType.Incoming,
PeerType.Outgoing}): A = PeerType.Outgoing}): A =
doAssert(filter != {}, "Filter must not be empty") doAssert(filter != {}, "Filter must not be empty")
var count = 0 if pool.lenAvailable(filter) < 1:
if PeerType.Incoming in filter:
count = count + len(pool.incQueue)
if PeerType.Outgoing in filter:
count = count + len(pool.outQueue)
if count < 1:
raise newException(PeerPoolError, "Not enough peers in pool") raise newException(PeerPoolError, "Not enough peers in pool")
var item = pool.getItem(filter) pool.acquireItemImpl(filter)
doAssert(PeerFlags.Acquired notin item[].flags)
item[].flags.incl(PeerFlags.Acquired)
result = item[].data
proc release*[A, B](pool: PeerPool[A, B], peer: A) = proc release*[A, B](pool: PeerPool[A, B], peer: A) =
## Release peer ``peer`` back to PeerPool ``pool`` ## Release peer ``peer`` back to PeerPool ``pool``
mixin getKey mixin getKey
var key = getKey(peer) let key = getKey(peer)
var titem = pool.registry.getOrDefault(key, PeerIndex(data: -1)) var titem = pool.registry.getOrDefault(key, PeerIndex(data: -1))
if titem.data >= 0: if titem.data >= 0:
let pindex = titem.data let pindex = titem.data
@ -448,17 +462,12 @@ proc release*[A, B](pool: PeerPool[A, B], peer: A) =
if PeerFlags.Acquired in item[].flags: if PeerFlags.Acquired in item[].flags:
if not(pool.checkPeerScore(peer)): if not(pool.checkPeerScore(peer)):
item[].flags.incl(DeleteOnRelease) item[].flags.incl(DeleteOnRelease)
item[].flags.excl(PeerFlags.Acquired)
if PeerFlags.DeleteOnRelease in item[].flags: if PeerFlags.DeleteOnRelease in item[].flags:
if item[].peerType == PeerType.Incoming: # We do not care about result here because peer is present in registry
dec(pool.curIncPeersCount) # and has all proper flags set.
dec(pool.acqIncPeersCount) discard pool.deletePeer(peer, force = true)
elif item[].peerType == PeerType.Outgoing:
dec(pool.curOutPeersCount)
dec(pool.acqOutPeersCount)
pool.storage[pindex] = PeerItem[A]()
pool.registry.del(key)
else: else:
item[].flags.excl(PeerFlags.Acquired)
if item[].peerType == PeerType.Incoming: if item[].peerType == PeerType.Incoming:
pool.incQueue.push(titem) pool.incQueue.push(titem)
dec(pool.acqIncPeersCount) dec(pool.acqIncPeersCount)
@ -485,18 +494,10 @@ proc acquire*[A, B](pool: PeerPool[A, B],
while true: while true:
if len(peers) >= number: if len(peers) >= number:
break break
var count = 0 if pool.lenAvailable(filter) == 0:
if PeerType.Incoming in filter:
count = count + len(pool.incQueue)
if PeerType.Outgoing in filter:
count = count + len(pool.outQueue)
if count == 0:
await pool.waitNotEmptyEvent(filter) await pool.waitNotEmptyEvent(filter)
else: else:
var item = pool.getItem(filter) peers.add(pool.acquireItemImpl(filter))
doAssert(PeerFlags.Acquired notin item[].flags)
item[].flags.incl(PeerFlags.Acquired)
peers.add(item[].data)
except CancelledError as exc: except CancelledError as exc:
# If we got cancelled, we need to return all the acquired peers back to # If we got cancelled, we need to return all the acquired peers back to
# pool. # pool.
@ -504,7 +505,7 @@ proc acquire*[A, B](pool: PeerPool[A, B],
pool.release(item) pool.release(item)
peers.setLen(0) peers.setLen(0)
raise exc raise exc
result = peers return peers
proc acquireNoWait*[A, B](pool: PeerPool[A, B], proc acquireNoWait*[A, B](pool: PeerPool[A, B],
number: int, number: int,
@ -514,19 +515,11 @@ proc acquireNoWait*[A, B](pool: PeerPool[A, B],
## filter ``filter``. ## filter ``filter``.
doAssert(filter != {}, "Filter must not be empty") doAssert(filter != {}, "Filter must not be empty")
var peers = newSeq[A]() var peers = newSeq[A]()
var count = 0 if pool.lenAvailable(filter) < number:
if PeerType.Incoming in filter:
count = count + len(pool.incQueue)
if PeerType.Outgoing in filter:
count = count + len(pool.outQueue)
if count < number:
raise newException(PeerPoolError, "Not enough peers in pool") raise newException(PeerPoolError, "Not enough peers in pool")
for i in 0 ..< number: for i in 0 ..< number:
var item = pool.getItem(filter) peers.add(pool.acquireItemImpl(filter))
doAssert(PeerFlags.Acquired notin item[].flags) return peers
item[].flags.incl(PeerFlags.Acquired)
peers.add(item[].data)
result = peers
proc acquireIncomingPeer*[A, B](pool: PeerPool[A, B]): Future[A] {.inline.} = proc acquireIncomingPeer*[A, B](pool: PeerPool[A, B]): Future[A] {.inline.} =
## Acquire single incoming peer from PeerPool ``pool``. ## Acquire single incoming peer from PeerPool ``pool``.
@ -596,16 +589,16 @@ iterator acquiredPeers*[A, B](pool: PeerPool[A, B],
proc `[]`*[A, B](pool: PeerPool[A, B], key: B): A {.inline.} = proc `[]`*[A, B](pool: PeerPool[A, B], key: B): A {.inline.} =
## Retrieve peer with key ``key`` from PeerPool ``pool``. ## Retrieve peer with key ``key`` from PeerPool ``pool``.
let pindex = pool.registry[key] let pindex = pool.registry[key]
result = pool.storage[pindex.data] pool.storage[pindex.data]
proc `[]`*[A, B](pool: var PeerPool[A, B], key: B): var A {.inline.} = proc `[]`*[A, B](pool: var PeerPool[A, B], key: B): var A {.inline.} =
## Retrieve peer with key ``key`` from PeerPool ``pool``. ## Retrieve peer with key ``key`` from PeerPool ``pool``.
let pindex = pool.registry[key] let pindex = pool.registry[key]
result = pool.storage[pindex.data].data pool.storage[pindex.data].data
proc hasPeer*[A, B](pool: PeerPool[A, B], key: B): bool {.inline.} = proc hasPeer*[A, B](pool: PeerPool[A, B], key: B): bool {.inline.} =
## Returns ``true`` if peer with ``key`` present in PeerPool ``pool``. ## Returns ``true`` if peer with ``key`` present in PeerPool ``pool``.
result = pool.registry.hasKey(key) pool.registry.hasKey(key)
proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B): A {.inline.} = proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B): A {.inline.} =
## Retrieves the peer from PeerPool ``pool`` using key ``key``. If peer is ## Retrieves the peer from PeerPool ``pool`` using key ``key``. If peer is
@ -613,7 +606,9 @@ proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B): A {.inline.} =
## (e.g. 0 for any integer type). ## (e.g. 0 for any integer type).
let pindex = pool.registry.getOrDefault(key, PeerIndex(data: -1)) let pindex = pool.registry.getOrDefault(key, PeerIndex(data: -1))
if pindex.data >= 0: if pindex.data >= 0:
result = pool.storage[pindex.data].data pool.storage[pindex.data].data
else:
A()
proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B, proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B,
default: A): A {.inline.} = default: A): A {.inline.} =
@ -621,9 +616,9 @@ proc getOrDefault*[A, B](pool: PeerPool[A, B], key: B,
## not present, default value ``default`` is returned. ## not present, default value ``default`` is returned.
let pindex = pool.registry.getOrDefault(key, PeerIndex(data: -1)) let pindex = pool.registry.getOrDefault(key, PeerIndex(data: -1))
if pindex.data >= 0: if pindex.data >= 0:
result = pool.storage[pindex.data].data pool.storage[pindex.data].data
else: else:
result = default default
proc clear*[A, B](pool: PeerPool[A, B]) = proc clear*[A, B](pool: PeerPool[A, B]) =
## Performs PeerPool's ``pool`` storage and counters reset. ## Performs PeerPool's ``pool`` storage and counters reset.
@ -652,3 +647,7 @@ proc setScoreCheck*[A, B](pool: PeerPool[A, B],
scoreCheckCb: PeerScoreCheckCallback[A]) = scoreCheckCb: PeerScoreCheckCallback[A]) =
## Add ScoreCheck callback. ## Add ScoreCheck callback.
pool.scoreCheck = scoreCheckCb pool.scoreCheck = scoreCheckCb
proc setPeerCounter*[A, B](pool: PeerPool[A, B],
peerCounterCb: PeerCounterCallback) =
pool.peerCounter = peerCounterCb

View File

@ -7,11 +7,10 @@
{.used.} {.used.}
import import std/[unittest, random, heapqueue, tables]
unittest, random, heapqueue, tables, strutils, import chronos
./testutil, import ../beacon_chain/peer_pool
chronos, import testutil
../beacon_chain/peer_pool
type type
PeerTestID* = string PeerTestID* = string
@ -640,3 +639,37 @@ suiteReport "PeerPool testing suite":
lenAvailable(pool) == 0 lenAvailable(pool) == 0
lenAcquired(pool) == 0 lenAcquired(pool) == 0
len(pool) == 0 len(pool) == 0
timedTest "Delete peer on release text":
proc testDeleteOnRelease(): Future[bool] {.async.} =
proc scoreCheck(peer: PeerTest): bool =
if peer.weight >= 0:
result = true
else:
result = false
var pool = newPeerPool[PeerTest, PeerTestID](maxPeers = 1,
maxIncomingPeers = 1,
maxOutgoingPeers = 0)
pool.setScoreCheck(scoreCheck)
var peer0 = PeerTest.init("idInc0", 100)
var peer1 = PeerTest.init("idOut0", 100)
var peer2 = PeerTest.init("idInc1", 100)
var fut0 = pool.addIncomingPeer(peer0)
var fut1 = pool.addOutgoingPeer(peer1)
var fut2 = pool.addIncomingPeer(peer2)
doAssert(fut0.finished == true and fut0.failed == false)
doAssert(fut1.finished == false)
doAssert(fut2.finished == false)
var p = await pool.acquire()
doAssert(p.id == "idInc0")
p.weight = -200
pool.release(p)
await sleepAsync(100.milliseconds)
doAssert(fut1.finished == false)
doAssert(fut2.finished == true and fut2.failed == false)
doAssert(len(pool) == 1)
result = true
check waitFor(testDeleteOnRelease()) == true