Peer management (#1707)

* addPeer() and addPeerNoWait() now returns PeerStatus, not bool.
Minor refactoring of PeerPool.
Fix tests.

* Refactor PeerPool.
Add lenSpace.
Add tests for lenSpace.
PeerPool.add procedures now return different error codes.
Fix SyncManager break/continue problem.
Fix connectWorker break/continue problem.
Refactor connectWorker and discoveryLoop.
Fix incoming/outgoing blocking problem.

* Refactor discovery loop.
Add checkPeer.

* Fix logic and compilation bugs.

* Adjust position of debugging log.

* Fix issue with maximum peers in PeerPool.
Optimize node record decoding.

* fix discoveryLoop.

* Remove aliases and fix tests using aliases.
This commit is contained in:
Eugene Kabanov 2020-09-21 19:02:27 +03:00 committed by GitHub
parent 3190c695b0
commit 654b8d66bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 795 additions and 460 deletions

View File

@ -459,7 +459,7 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} =
lastSlot = shortLog(lastSlot),
scheduledSlot = shortLog(scheduledSlot),
beaconTime = shortLog(beaconTime),
peers = node.network.peersCount,
peers = len(node.network.peerPool),
head = shortLog(node.chainDag.head),
headEpoch = shortLog(node.chainDag.head.slot.compute_epoch_at_slot()),
finalized = shortLog(node.chainDag.finalizedHead.blck),
@ -903,7 +903,7 @@ proc initializeNetworking(node: BeaconNode) {.async.} =
let addressFile = node.config.dataDir / "beacon_node.enr"
writeFile(addressFile, node.network.announcedENR.toURI)
await node.network.startLookingForPeers()
await node.network.start()
info "Networking initialized",
enr = node.network.announcedENR.toURI,

View File

@ -223,11 +223,9 @@ const
## Period of time for `IrrelevantNetwork` error reason.
SeenTableTimeClientShutDown* = 10.minutes
## Period of time for `ClientShutDown` error reason.
SeemTableTimeFaultOrError* = 10.minutes
SeenTableTimeFaultOrError* = 10.minutes
## Period of time for `FaultOnError` error reason.
var successfullyDialledAPeer = false # used to show a warning
template neterr(kindParam: Eth2NetworkingErrorKind): auto =
err(type(result), Eth2NetworkingError(kind: kindParam))
@ -356,6 +354,8 @@ proc `<`*(a, b: Peer): bool =
false
proc isSeen*(network: ETh2Node, peerId: PeerID): bool =
## Returns ``true`` if ``peerId`` present in SeenTable and time period is not
## yet expired.
let currentTime = now(chronos.Moment)
if peerId notin network.seenTable:
return false
@ -368,6 +368,7 @@ proc isSeen*(network: ETh2Node, peerId: PeerID): bool =
proc addSeen*(network: ETh2Node, peerId: PeerID,
period: chronos.Duration) =
## Adds peer with PeerID ``peerId`` to SeenTable and timeout ``period``.
let item = SeenItem(peerId: peerId, stamp: now(chronos.Moment) + period)
network.seenTable[peerId] = item
@ -385,7 +386,7 @@ proc disconnect*(peer: Peer, reason: DisconnectionReason,
of IrrelevantNetwork:
SeenTableTimeIrrelevantNetwork
of FaultOrError:
SeemTableTimeFaultOrError
SeenTableTimeFaultOrError
peer.network.addSeen(peer.info.peerId, seenTime)
include eth/p2p/p2p_backends_helpers
@ -682,32 +683,6 @@ proc handleIncomingStream(network: Eth2Node,
finally:
await conn.close()
proc handleOutgoingPeer(peer: Peer): Future[bool] {.async.} =
let network = peer.network
proc onPeerClosed(udata: pointer) =
debug "Peer (outgoing) lost", peer
let res = await network.peerPool.addOutgoingPeer(peer)
if res:
peer.updateScore(NewPeerScore)
debug "Peer (outgoing) has been added to PeerPool", peer
peer.getFuture().addCallback(onPeerClosed)
result = true
proc handleIncomingPeer(peer: Peer): Future[bool] {.async.} =
let network = peer.network
proc onPeerClosed(udata: pointer) =
debug "Peer (incoming) lost", peer
let res = await network.peerPool.addIncomingPeer(peer)
if res:
peer.updateScore(NewPeerScore)
debug "Peer (incoming) has been added to PeerPool", peer
peer.getFuture().addCallback(onPeerClosed)
result = true
proc toPeerAddr*(r: enr.TypedRecord):
Result[PeerAddr, cstring] {.raises: [Defect].} =
if not r.secp256k1.isSome:
@ -738,92 +713,117 @@ proc toPeerAddr*(r: enr.TypedRecord):
ok(PeerAddr(peerId: peerId, addrs: addrs))
proc dialPeer*(node: Eth2Node, peerAddr: PeerAddr) {.async.} =
proc checkPeer(node: Eth2Node, peerAddr: PeerAddr): bool =
logScope: peer = peerAddr.peerId
let peerId = peerAddr.peerId
if node.peerPool.hasPeer(peerId):
trace "Already connected"
false
else:
if node.isSeen(peerId):
trace "Recently connected"
false
else:
true
proc dialPeer*(node: Eth2Node, peerAddr: PeerAddr, index = 0) {.async.} =
## Establish connection with remote peer identified by address ``peerAddr``.
logScope:
peer = peerAddr.peerId
index = index
if not(node.checkPeer(peerAddr)):
return
debug "Connecting to discovered peer"
var deadline = sleepAsync(node.connectTimeout)
var workfut = node.switch.connect(peerAddr.peerId, peerAddr.addrs)
# TODO connect is called here, but there's no guarantee that the connection
# we get when using dialPeer later on is the one we just connected
await node.switch.connect(peerAddr.peerId, peerAddr.addrs)
#let msDial = newMultistream()
#let conn = node.switch.connections.getOrDefault(peerInfo.id)
#let ls = await msDial.list(conn)
#debug "Supported protocols", ls
inc nbc_successful_dials
successfullyDialledAPeer = true
debug "Network handshakes completed"
proc connectWorker(network: Eth2Node) {.async.} =
debug "Connection worker started"
while true:
let
remotePeerAddr = await network.connQueue.popFirst()
peerPoolHasRemotePeer = network.peerPool.hasPeer(remotePeerAddr.peerId)
seenTableHasRemotePeer = network.isSeen(remotePeerAddr.peerId)
remotePeerAlreadyConnected = remotePeerAddr.peerId in network.connTable
if not(peerPoolHasRemotePeer) and not(seenTableHasRemotePeer) and not(remotePeerAlreadyConnected):
network.connTable.incl(remotePeerAddr.peerId)
try:
# We trying to connect to peers which are not in PeerPool, SeenTable and
# ConnTable.
var fut = network.dialPeer(remotePeerAddr)
# We discarding here just because we going to check future state, to avoid
# condition where connection happens and timeout reached.
discard await withTimeout(fut, network.connectTimeout)
# We handling only timeout and errors, because successfull connections
# will be stored in PeerPool.
if fut.finished():
if fut.failed() and not(fut.cancelled()):
debug "Unable to establish connection with peer", peer = remotePeerAddr.peerId,
errMsg = fut.readError().msg
inc nbc_failed_dials
network.addSeen(remotePeerAddr.peerId, SeenTableTimeDeadPeer)
continue
debug "Connection to remote peer timed out", peer = remotePeerAddr.peerId
inc nbc_timeout_dials
network.addSeen(remotePeerAddr.peerId, SeenTableTimeTimeout)
finally:
network.connTable.excl(remotePeerAddr.peerId)
try:
# `or` operation will only raise exception of `workfut`, because `deadline`
# could not raise exception.
await workfut or deadline
if workfut.finished():
if not deadline.finished():
deadline.cancel()
inc nbc_successful_dials
else:
trace "Peer is already connected, connecting or already seen",
peer = remotePeerAddr.peerId, peer_pool_has_peer = $peerPoolHasRemotePeer, seen_table_has_peer = $seenTableHasRemotePeer,
connecting_peer = $remotePeerAlreadyConnected, seen_table_size = len(network.seenTable)
# TODO: As soon as `nim-libp2p` will be able to handle cancellation
# properly and will have cancellation tests, we need add here cancellation
# of `workfut`.
# workfut.cancel()
debug "Connection to remote peer timed out"
inc nbc_timeout_dials
node.addSeen(peerAddr.peerId, SeenTableTimeTimeout)
except CatchableError as exc:
debug "Connection to remote peer failed", msg = exc.msg
inc nbc_failed_dials
node.addSeen(peerAddr.peerId, SeenTableTimeDeadPeer)
# Prevent (a purely theoretical) high CPU usage when losing connectivity.
await sleepAsync(1.seconds)
proc connectWorker(node: Eth2Node, index: int) {.async.} =
debug "Connection worker started", index = index
while true:
# This loop will never produce HIGH CPU usage because it will wait
# and block until it not obtains new peer from the queue ``connQueue``.
let remotePeerAddr = await node.connQueue.popFirst()
await node.dialPeer(remotePeerAddr, index)
# Peer was added to `connTable` before adding it to `connQueue`, so we
# excluding peer here after processing.
node.connTable.excl(remotePeerAddr.peerId)
proc toPeerAddr(node: Node): Result[PeerAddr, cstring] {.raises: [Defect].} =
let nodeRecord = ? node.record.toTypedRecord()
let peerAddr = ? nodeRecord.toPeerAddr()
ok(peerAddr)
proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
debug "Starting discovery loop"
let enrField = ("eth2", SSZ.encode(node.forkId))
while true:
let currentPeerCount = node.peerPool.len
if currentPeerCount < node.wantedPeers:
try:
let discoveredPeers =
node.discovery.randomNodes(node.wantedPeers - currentPeerCount,
enrField)
for peer in discoveredPeers:
try:
let peerRecord = peer.record.toTypedRecord
if peerRecord.isOk:
let peerAddr = peerRecord.value.toPeerAddr
if peerAddr.isOk:
if not node.switch.isConnected(peerAddr.get().peerId):
await node.connQueue.addLast(peerAddr.get())
else:
discard # peerInfo.close()
except CatchableError as err:
debug "Failed to connect to peer", peer = $peer, err = err.msg
except CatchableError as err:
debug "Failure in discovery", err = err.msg
await sleepAsync seconds(1)
while true:
# We always request constant number of peers to avoid problem with
# low amount of returned peers.
let discoveredNodes = node.discovery.randomNodes(node.wantedPeers, enrField)
var newPeers = 0
for discNode in discoveredNodes:
let res = discNode.toPeerAddr()
if res.isOk():
let peerAddr = res.get()
# Waiting for an empty space in PeerPool.
while true:
if node.peerPool.lenSpace({PeerType.Outgoing}) == 0:
await node.peerPool.waitForEmptySpace(PeerType.Outgoing)
else:
break
# Check if peer present in SeenTable or PeerPool.
if node.checkPeer(peerAddr):
if peerAddr.peerId notin node.connTable:
# We adding to pending connections table here, but going
# to remove it only in `connectWorker`.
node.connTable.incl(peerAddr.peerId)
await node.connQueue.addLast(peerAddr)
inc(newPeers)
else:
debug "Failed to decode discovery's node address",
node = $discnode, errMsg = res.error
debug "Discovery tick", wanted_peers = node.wantedPeers,
space = node.peerPool.shortLogSpace(),
acquired = node.peerPool.shortLogAcquired(),
available = node.peerPool.shortLogAvailable(),
current = node.peerPool.shortLogCurrent(),
length = len(node.peerPool),
discovered_nodes = len(discoveredNodes),
new_peers = newPeers
if newPeers == 0:
warn "Could not discover any new nodes in network, waiting",
discovered = len(discoveredNodes), new_peers = newPeers,
wanted_peers = node.wantedPeers
await sleepAsync(5.seconds)
else:
await sleepAsync(1.seconds)
proc getPersistentNetMetadata*(conf: BeaconNodeConf): Eth2Metadata =
let metadataPath = conf.dataDir / nodeMetadataFilename
@ -860,17 +860,29 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =
# there's still a slim chance of a race condition here if a reconnect
# happens quickly
if peer.connections == 1:
let res =
if event.incoming:
node.peerPool.addPeerNoWait(peer, PeerType.Incoming)
else:
node.peerPool.addPeerNoWait(peer, PeerType.Outgoing)
# TODO when the pool is full, adding it will block - this means peers
# will be left in limbo until some other peer makes room for it
let added = if event.incoming:
await handleIncomingPeer(peer)
else:
await handleOutgoingPeer(peer)
if not added:
# We must have hit a limit!
case res:
of PeerStatus.LowScoreError, PeerStatus.NoSpaceError:
# Peer has low score or we do not have enough space in PeerPool,
# we are going to disconnect it gracefully.
await peer.disconnect(FaultOrError)
of PeerStatus.DeadPeerError:
# Peer's lifetime future is finished, so its already dead,
# we do not need to perform gracefull disconect.
discard
of PeerStatus.DuplicateError:
# Peer is already present in PeerPool, we can't perform disconnect,
# because in such case we could kill both connections (connection
# which is present in PeerPool and new one).
discard
of PeerStatus.Success:
# Peer was added to PeerPool.
discard
of ConnEventKind.Disconnected:
dec peer.connections
@ -898,6 +910,8 @@ proc init*(T: type Eth2Node, conf: BeaconNodeConf, enrForkId: ENRForkID,
result.seenThreshold = 10.seconds
result.seenTable = initTable[PeerID, SeenItem]()
result.connTable = initHashSet[PeerID]()
# Its important here to create AsyncQueue with limited size, otherwise
# it could produce HIGH cpu usage.
result.connQueue = newAsyncQueue[PeerAddr](ConcurrentConnections)
result.metadata = getPersistentNetMetadata(conf)
result.forkId = enrForkId
@ -935,12 +949,18 @@ proc startListening*(node: Eth2Node) {.async.} =
proc start*(node: Eth2Node) {.async.} =
proc onPeerCountChanged() =
trace "Number of peers has been changed",
space = node.peerPool.shortLogSpace(),
acquired = node.peerPool.shortLogAcquired(),
available = node.peerPool.shortLogAvailable(),
current = node.peerPool.shortLogCurrent(),
length = len(node.peerPool)
nbc_peers.set int64(len(node.peerPool))
node.peerPool.setPeerCounter(onPeerCountChanged)
for i in 0 ..< ConcurrentConnections:
node.connWorkers.add connectWorker(node)
node.connWorkers.add connectWorker(node, i)
if node.discoveryEnabled:
node.discovery.start()
@ -1239,23 +1259,6 @@ proc announcedENR*(node: Eth2Node): enr.Record =
proc shortForm*(id: KeyPair): string =
$PeerID.init(id.pubkey)
let BOOTSTRAP_NODE_CHECK_INTERVAL = 30.seconds
proc checkIfConnectedToBootstrapNode(p: pointer) {.gcsafe.} =
# Keep showing warnings until we connect to at least one bootstrap node
# successfully, in order to allow detection of an invalid configuration.
let node = cast[Eth2Node](p)
if node.discovery.bootstrapRecords.len > 0 and not successfullyDialledAPeer:
warn "Failed to connect to any bootstrap node",
bootstrapEnrs = node.discovery.bootstrapRecords
addTimer(BOOTSTRAP_NODE_CHECK_INTERVAL, checkIfConnectedToBootstrapNode, p)
proc startLookingForPeers*(node: Eth2Node) {.async.} =
await node.start()
addTimer(BOOTSTRAP_NODE_CHECK_INTERVAL, checkIfConnectedToBootstrapNode, node[].addr)
func peersCount*(node: Eth2Node): int =
len(node.peerPool)
proc subscribe*[MsgType](node: Eth2Node,
topic: string,
msgHandler: proc(msg: MsgType) {.gcsafe.} ) {.async.} =

View File

@ -3,7 +3,7 @@ import chronos
type
PeerType* = enum
None, Incoming, Outgoing
Incoming, Outgoing
PeerFlags = enum
Acquired, DeleteOnRelease
@ -11,6 +11,13 @@ type
EventType = enum
NotEmptyEvent, NotFullEvent
PeerStatus* = enum
Success, ## Peer was successfully added to PeerPool.
DuplicateError, ## Peer is already present in PeerPool.
NoSpaceError, ## There no space for the peer in PeerPool.
LowScoreError, ## Peer has too low score.
DeadPeerError ## Peer is already dead.
PeerItem[T] = object
data: T
peerType: PeerType
@ -26,10 +33,10 @@ type
PeerCounterCallback* = proc() {.gcsafe, raises: [Defect].}
PeerPool*[A, B] = ref object
incNotEmptyEvent: AsyncEvent
outNotEmptyEvent: AsyncEvent
incNotFullEvent: AsyncEvent
outNotFullEvent: AsyncEvent
incNotEmptyEvent*: AsyncEvent
outNotEmptyEvent*: AsyncEvent
incNotFullEvent*: AsyncEvent
outNotFullEvent*: AsyncEvent
incQueue: HeapQueue[PeerIndex]
outQueue: HeapQueue[PeerIndex]
registry: Table[B, PeerIndex]
@ -48,20 +55,24 @@ type
PeerPoolError* = object of CatchableError
proc `<`*(a, b: PeerIndex): bool =
## PeerIndex ``a`` holds reference to ``cmp()`` procedure which has captured
## PeerPool instance.
a.cmp(b, a)
proc fireNotEmptyEvent[A, B](pool: PeerPool[A, B],
item: PeerItem[A]) {.inline.} =
if item.peerType == PeerType.Incoming:
case item.peerType:
of PeerType.Incoming:
pool.incNotEmptyEvent.fire()
elif item.peerType == PeerType.Outgoing:
of PeerType.Outgoing:
pool.outNotEmptyEvent.fire()
proc fireNotFullEvent[A, B](pool: PeerPool[A, B],
item: PeerItem[A]) {.inline.} =
if item.peerType == PeerType.Incoming:
case item.peerType:
of PeerType.Incoming:
pool.incNotFullEvent.fire()
elif item.peerType == PeerType.Outgoing:
of PeerType.Outgoing:
pool.outNotFullEvent.fire()
iterator pairs*[A, B](pool: PeerPool[A, B]): (B, A) =
@ -149,12 +160,18 @@ proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1,
if maxPeers != -1:
doAssert(maxPeers >= maxIncomingPeers + maxOutgoingPeers)
res.maxPeersCount = if maxPeers < 0: high(int)
else: maxPeers
res.maxIncPeersCount = if maxIncomingPeers < 0: high(int)
else: maxIncomingPeers
res.maxOutPeersCount = if maxOutgoingPeers < 0: high(int)
else: maxOutgoingPeers
res.maxPeersCount = if maxPeers < 0: high(int) else: maxPeers
res.maxIncPeersCount =
if maxIncomingPeers < 0:
high(int)
else:
maxIncomingPeers
res.maxOutPeersCount =
if maxOutgoingPeers < 0:
high(int)
else:
maxOutgoingPeers
res.incNotEmptyEvent = newAsyncEvent()
res.outNotEmptyEvent = newAsyncEvent()
res.incNotFullEvent = newAsyncEvent()
@ -179,6 +196,14 @@ proc len*[A, B](pool: PeerPool[A, B]): int =
## includes all the peers (acquired and available).
len(pool.registry)
proc lenCurrent*[A, B](pool: PeerPool[A, B],
filter = {PeerType.Incoming,
PeerType.Outgoing}): int {.inline.} =
## Returns number of registered peers in PeerPool ``pool`` which satisfies
## filter ``filter``.
(if PeerType.Incoming in filter: pool.curIncPeersCount else: 0) +
(if PeerType.Outgoing in filter: pool.curOutPeersCount else: 0)
proc lenAvailable*[A, B](pool: PeerPool[A, B],
filter = {PeerType.Incoming,
PeerType.Outgoing}): int {.inline.} =
@ -195,12 +220,39 @@ proc lenAcquired*[A, B](pool: PeerPool[A, B],
(if PeerType.Incoming in filter: pool.acqIncPeersCount else: 0) +
(if PeerType.Outgoing in filter: pool.acqOutPeersCount else: 0)
proc lenSpace*[A, B](pool: PeerPool[A, B],
filter = {PeerType.Incoming,
PeerType.Outgoing}): int {.inline.} =
## Returns number of available space for peers in PeerPool ``pool`` which
## satisfies filter ``filter``.
let curPeersCount = pool.curIncPeersCount + pool.curOutPeersCount
let totalSpace = pool.maxPeersCount - curPeersCount
let incoming = min(totalSpace, pool.maxIncPeersCount - pool.curIncPeersCount)
let outgoing = min(totalSpace, pool.maxOutPeersCount - pool.curOutPeersCount)
if filter == {PeerType.Incoming, PeerType.Outgoing}:
# To avoid overflow check we need to check by ourself.
if uint64(incoming) + uint64(outgoing) > uint64(high(int)):
min(totalSpace, high(int))
else:
min(totalSpace, incoming + outgoing)
elif PeerType.Incoming in filter:
incoming
else:
outgoing
proc shortLogAvailable*[A, B](pool: PeerPool[A, B]): string =
$len(pool.incQueue) & "/" & $len(pool.outQueue)
proc shortLogAcquired*[A, B](pool: PeerPool[A, B]): string =
$pool.acqIncPeersCount & "/" & $pool.acqOutPeersCount
proc shortLogSpace*[A, B](pool: PeerPool[A, B]): string =
$pool.lenSpace({PeerType.Incoming}) & "/" &
$pool.lenSpace({PeerType.Outgoing})
proc shortLogCurrent*[A, B](pool: PeerPool[A, B]): string =
$pool.curIncPeersCount & "/" & $pool.curOutPeersCount
proc checkPeerScore[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} =
## Returns ``true`` if peer passing score check.
if not(isNil(pool.scoreCheck)):
@ -270,131 +322,151 @@ proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
false
proc addPeerImpl[A, B](pool: PeerPool[A, B], peer: A, peerKey: B,
peerType: PeerType): PeerIndex =
peerType: PeerType) =
proc onPeerClosed(udata: pointer) {.gcsafe.} =
discard pool.deletePeer(peer)
var item = PeerItem[A](data: peer, peerType: peerType,
let item = PeerItem[A](data: peer, peerType: peerType,
index: len(pool.storage))
pool.storage.add(item)
var pitem = addr(pool.storage[^1])
let pindex = PeerIndex(data: item.index, cmp: pool.cmp)
pool.registry[peerKey] = pindex
pitem[].data.getFuture().addCallback(onPeerClosed)
pindex
if peerType == PeerType.Incoming:
inc(pool.curIncPeersCount)
pool.incQueue.push(pindex)
pool.incNotEmptyEvent.fire()
elif peerType == PeerType.Outgoing:
inc(pool.curOutPeersCount)
pool.outQueue.push(pindex)
pool.outNotEmptyEvent.fire()
pool.peerCountChanged()
proc checkPeer*[A, B](pool: PeerPool[A, B], peer: A): PeerStatus {.inline.} =
## Checks if peer could be added to PeerPool, e.g. it has:
##
## * Positive value of peer's score - (PeerStatus.LowScoreError)
## * Peer's key is not present in PeerPool - (PeerStatus.DuplicateError)
## * Peer's lifetime future is not finished yet - (PeerStatus.DeadPeerError)
##
## If peer could be added to PeerPool procedure returns (PeerStatus.Success)
mixin getKey, getFuture
if not(pool.checkPeerScore(peer)):
PeerStatus.LowScoreError
else:
let peerKey = getKey(peer)
if not(pool.registry.hasKey(peerKey)):
if not(peer.getFuture().finished):
PeerStatus.Success
else:
PeerStatus.DeadPeerError
else:
PeerStatus.DuplicateError
proc addPeerNoWait*[A, B](pool: PeerPool[A, B],
peer: A, peerType: PeerType): bool =
peer: A, peerType: PeerType): PeerStatus =
## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``.
##
## Procedure returns ``false`` in case
## * if ``peer`` is already closed.
## * if ``pool`` already has peer ``peer`` inside.
## Procedure returns ``PeerStatus``
## * if ``peer`` is already closed - (PeerStatus.DeadPeerError)
## * if ``pool`` already has peer ``peer`` - (PeerStatus.DuplicateError)
## * if ``pool`` currently has a maximum of peers.
## (PeerStatus.NoSpaceError)
## * if ``pool`` currently has a maximum of `Incoming` or `Outgoing` peers.
## (PeerStatus.NoSpaceError)
##
## Procedure returns ``true`` on success.
## Procedure returns (PeerStatus.Success) on success.
mixin getKey, getFuture
let res = pool.checkPeer(peer)
if res != PeerStatus.Success:
res
else:
let peerKey = peer.getKey()
case peerType:
of PeerType.Incoming:
if pool.lenSpace({PeerType.Incoming}) > 0:
pool.addPeerImpl(peer, peerKey, peerType)
PeerStatus.Success
else:
PeerStatus.NoSpaceError
of PeerType.Outgoing:
if pool.lenSpace({PeerType.Outgoing}) > 0:
pool.addPeerImpl(peer, peerKey, peerType)
PeerStatus.Success
else:
PeerStatus.NoSpaceError
if not(pool.checkPeerScore(peer)):
return false
proc getPeerSpaceMask[A, B](pool: PeerPool[A, B],
peerType: PeerType): set[PeerType] {.inline.} =
## This procedure returns set of events which you need to wait to get empty
## space for peer type ``peerType``. This set can be used for call to
## ``waitNotFullEvent()``.
case peerType:
of PeerType.Incoming:
if pool.maxIncPeersCount >= pool.maxPeersCount:
# If maximum number of `incoming` peers is only limited by
# maximum number of peers, then we could wait for both events.
# It means that we do not care about what peer will left pool.
{PeerType.Incoming, PeerType.Outgoing}
else:
# Otherwise we could wait only for `incoming` event
{PeerType.Incoming}
of PeerType.Outgoing:
if pool.maxOutPeersCount >= pool.maxPeersCount:
# If maximum number of `outgoing` peers is only limited by
# maximum number of peers, then we could wait for both events.
# It means that we do not care about what peer will left pool.
{PeerType.Incoming, PeerType.Outgoing}
else:
# Otherwise we could wait only for `outgoing` event
{PeerType.Outgoing}
let peerKey = getKey(peer)
if not(pool.registry.hasKey(peerKey)) and not(peer.getFuture().finished):
if len(pool.registry) < pool.maxPeersCount:
if peerType == PeerType.Incoming:
if pool.curIncPeersCount < pool.maxIncPeersCount:
let pindex = pool.addPeerImpl(peer, peerKey, peerType)
inc(pool.curIncPeersCount)
pool.incQueue.push(pindex)
pool.incNotEmptyEvent.fire()
pool.peerCountChanged()
return true
elif peerType == PeerType.Outgoing:
if pool.curOutPeersCount < pool.maxOutPeersCount:
let pindex = pool.addPeerImpl(peer, peerKey, peerType)
inc(pool.curOutPeersCount)
pool.outQueue.push(pindex)
pool.outNotEmptyEvent.fire()
pool.peerCountChanged()
return true
return false
proc waitForEmptySpace*[A, B](pool: PeerPool[A, B],
peerType: PeerType) {.async.} =
## This procedure will block until ``pool`` will have an empty space for peer
## of type ``peerType``.
let mask = pool.getPeerSpaceMask(peerType)
while pool.lenSpace({peerType}) == 0:
await pool.waitNotFullEvent(mask)
proc addPeer*[A, B](pool: PeerPool[A, B],
peer: A, peerType: PeerType): Future[bool] {.async.} =
peer: A, peerType: PeerType): Future[PeerStatus] {.async.} =
## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``.
##
## This procedure will wait for an empty space in PeerPool ``pool``, if
## PeerPool ``pool`` is full.
##
## Procedure returns ``false`` in case:
## * if ``peer`` is already closed.
## * if ``pool`` already has peer ``peer`` inside.
## Procedure returns ``PeerStatus``
## * if ``peer`` is already closed - (PeerStatus.DeadPeerError)
## * if ``pool`` already has peer ``peer`` - (PeerStatus.DuplicateError)
##
## Procedure returns ``true`` on success.
mixin getKey, getFuture
if not(pool.checkPeerScore(peer)):
return false
let peerKey = getKey(peer)
if not(pool.registry.hasKey(peerKey)) and not(peer.getFuture().finished):
while len(pool.registry) >= pool.maxPeersCount:
await pool.waitNotFullEvent({PeerType.Incoming, PeerType.Outgoing})
if peerType == PeerType.Incoming:
while pool.curIncPeersCount >= pool.maxIncPeersCount:
await pool.waitNotFullEvent({peerType})
let pindex = pool.addPeerImpl(peer, peerKey, peerType)
inc(pool.curIncPeersCount)
pool.incQueue.push(pindex)
pool.incNotEmptyEvent.fire()
pool.peerCountChanged()
return true
elif peerType == PeerType.Outgoing:
while pool.curOutPeersCount >= pool.maxOutPeersCount:
await pool.waitNotFullEvent({peerType})
let pindex = pool.addPeerImpl(peer, peerKey, peerType)
inc(pool.curOutPeersCount)
pool.outQueue.push(pindex)
pool.outNotEmptyEvent.fire()
pool.peerCountChanged()
return true
else:
return false
proc addIncomingPeerNoWait*[A, B](pool: PeerPool[A, B],
peer: A): bool {.inline.} =
## Add incoming peer ``peer`` to PeerPool ``pool``.
##
## Returns ``true`` on success.
pool.addPeerNoWait(peer, PeerType.Incoming)
proc addOutgoingPeerNoWait*[A, B](pool: PeerPool[A, B],
peer: A): bool {.inline.} =
## Add outgoing peer ``peer`` to PeerPool ``pool``.
##
## Returns ``true`` on success.
pool.addPeerNoWait(peer, PeerType.Outgoing)
proc addIncomingPeer*[A, B](pool: PeerPool[A, B],
peer: A): Future[bool] {.inline.} =
## Add incoming peer ``peer`` to PeerPool ``pool``.
##
## Returns ``true`` on success.
pool.addPeer(peer, PeerType.Incoming)
proc addOutgoingPeer*[A, B](pool: PeerPool[A, B],
peer: A): Future[bool] {.inline.} =
## Add outgoing peer ``peer`` to PeerPool ``pool``.
##
## Returns ``true`` on success.
pool.addPeer(peer, PeerType.Outgoing)
## Procedure returns (PeerStatus.Success) on success.
mixin getKey
let res =
block:
let res1 = pool.checkPeer(peer)
if res1 != PeerStatus.Success:
res1
else:
let mask = pool.getPeerSpaceMask(peerType)
# We going to block here until ``pool`` will not have free space,
# for our type of peer.
while pool.lenSpace({peerType}) == 0:
await pool.waitNotFullEvent(mask)
# Because we could wait for a long time we need to check peer one more
# time to avoid race condition.
let res2 = pool.checkPeer(peer)
if res2 == PeerStatus.Success:
let peerKey = peer.getKey()
pool.addPeerImpl(peer, peerKey, peerType)
PeerStatus.Success
else:
res2
return res
proc acquireItemImpl[A, B](pool: PeerPool[A, B],
filter: set[PeerType]): A {.inline.} =
doAssert(filter != {})
doAssert((len(pool.outQueue) > 0) or (len(pool.incQueue) > 0))
let pindex =
if filter == {PeerType.Incoming, PeerType.Outgoing}:
@ -468,10 +540,11 @@ proc release*[A, B](pool: PeerPool[A, B], peer: A) =
discard pool.deletePeer(peer, force = true)
else:
item[].flags.excl(PeerFlags.Acquired)
if item[].peerType == PeerType.Incoming:
case item[].peerType
of PeerType.Incoming:
pool.incQueue.push(titem)
dec(pool.acqIncPeersCount)
elif item[].peerType == PeerType.Outgoing:
of PeerType.Outgoing:
pool.outQueue.push(titem)
dec(pool.acqOutPeersCount)
pool.fireNotEmptyEvent(item[])
@ -650,4 +723,5 @@ proc setScoreCheck*[A, B](pool: PeerPool[A, B],
proc setPeerCounter*[A, B](pool: PeerPool[A, B],
peerCounterCb: PeerCounterCallback) =
## Add PeerCounter callback.
pool.peerCounter = peerCounterCb

View File

@ -665,20 +665,161 @@ template peerAge(): uint64 =
template queueAge(): uint64 =
wallSlot - man.queue.outSlot
template checkPeerScore(peer, body: untyped): untyped =
mixin getScore
let currentScore = peer.getScore()
body
let newScore = peer.getScore()
if currentScore > newScore:
debug "Overdue penalty for peer's score received, exiting", peer = peer,
penalty = newScore - currentScore, peer_score = newScore,
topics = "syncman"
break
func syncQueueLen*[A, B](man: SyncManager[A, B]): uint64 =
man.queue.len
proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
let wallSlot = man.getLocalWallSlot()
let headSlot = man.getLocalHeadSlot()
var peerSlot = peer.getHeadSlot()
# We updating SyncQueue's last slot all the time
man.queue.updateLastSlot(wallSlot)
debug "Peer's syncing status", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot,
peer_score = peer.getScore(), peer = peer, index = index,
peer_speed = peer.netKbps(), topics = "syncman"
# Check if peer's head slot is bigger than our wall clock slot.
if peerSlot > wallSlot + man.toleranceValue:
warn "Local timer is broken or peer's status information is invalid",
wall_clock_slot = wallSlot, remote_head_slot = peerSlot,
local_head_slot = headSlot, peer = peer, index = index,
tolerance_value = man.toleranceValue, peer_speed = peer.netKbps(),
peer_score = peer.getScore(), topics = "syncman"
let failure = SyncFailure.init(SyncFailureKind.StatusInvalid, peer)
man.failures.add(failure)
return
# Check if we need to update peer's status information
if peerAge >= man.maxStatusAge:
# Peer's status information is very old, its time to update it
man.workers[index].status = SyncWorkerStatus.UpdatingStatus
debug "Updating peer's status information", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot,
peer = peer, peer_score = peer.getScore(), index = index,
peer_speed = peer.netKbps(), topics = "syncman"
try:
let res = await peer.updateStatus()
if not(res):
peer.updateScore(PeerScoreNoStatus)
debug "Failed to get remote peer's status, exiting", peer = peer,
peer_score = peer.getScore(), peer_head_slot = peerSlot,
peer_speed = peer.netKbps(), index = index, topics = "syncman"
let failure = SyncFailure.init(SyncFailureKind.StatusDownload, peer)
man.failures.add(failure)
return
except CatchableError as exc:
debug "Unexpected exception while updating peer's status",
peer = peer, peer_score = peer.getScore(),
peer_head_slot = peerSlot, peer_speed = peer.netKbps(),
index = index, errMsg = exc.msg, topics = "syncman"
return
let newPeerSlot = peer.getHeadSlot()
if peerSlot >= newPeerSlot:
peer.updateScore(PeerScoreStaleStatus)
debug "Peer's status information is stale",
wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot,
local_head_slot = headSlot, remote_new_head_slot = newPeerSlot,
peer = peer, peer_score = peer.getScore(), index = index,
peer_speed = peer.netKbps(), topics = "syncman"
else:
debug "Peer's status information updated", wall_clock_slot = wallSlot,
remote_old_head_slot = peerSlot, local_head_slot = headSlot,
remote_new_head_slot = newPeerSlot, peer = peer,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
index = index, topics = "syncman"
peer.updateScore(PeerScoreGoodStatus)
peerSlot = newPeerSlot
if (peerAge <= man.maxHeadAge) and (headAge <= man.maxHeadAge):
debug "We are in sync with peer, exiting", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot,
peer = peer, peer_score = peer.getScore(), index = index,
peer_speed = peer.netKbps(), topics = "syncman"
return
man.workers[index].status = SyncWorkerStatus.Requesting
let req = man.queue.pop(peerSlot, peer)
if req.isEmpty():
# SyncQueue could return empty request in 2 cases:
# 1. There no more slots in SyncQueue to download (we are synced, but
# our ``notInSyncEvent`` is not yet cleared).
# 2. Current peer's known head slot is too low to satisfy request.
#
# To avoid endless loop we going to wait for RESP_TIMEOUT time here.
# This time is enough for all pending requests to finish and it is also
# enough for main sync loop to clear ``notInSyncEvent``.
debug "Empty request received from queue, exiting", peer = peer,
local_head_slot = headSlot, remote_head_slot = peerSlot,
queue_input_slot = man.queue.inpSlot,
queue_output_slot = man.queue.outSlot,
queue_last_slot = man.queue.lastSlot,
peer_speed = peer.netKbps(), peer_score = peer.getScore(),
index = index, topics = "syncman"
await sleepAsync(RESP_TIMEOUT)
return
debug "Creating new request for peer", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot,
request_slot = req.slot, request_count = req.count,
request_step = req.step, peer = peer, peer_speed = peer.netKbps(),
peer_score = peer.getScore(), index = index, topics = "syncman"
man.workers[index].status = SyncWorkerStatus.Downloading
try:
let blocks = await man.getBlocks(peer, req)
if blocks.isOk:
let data = blocks.get()
let smap = getShortMap(req, data)
debug "Received blocks on request", blocks_count = len(data),
blocks_map = smap, request_slot = req.slot,
request_count = req.count, request_step = req.step,
peer = peer, peer_score = peer.getScore(),
peer_speed = peer.netKbps(), index = index, topics = "syncman"
if not(checkResponse(req, data)):
peer.updateScore(PeerScoreBadResponse)
warn "Received blocks sequence is not in requested range",
blocks_count = len(data), blocks_map = smap,
request_slot = req.slot, request_count = req.count,
request_step = req.step, peer = peer,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
index = index, topics = "syncman"
let failure = SyncFailure.init(SyncFailureKind.BadResponse, peer)
man.failures.add(failure)
return
# Scoring will happen in `syncUpdate`.
man.workers[index].status = SyncWorkerStatus.Processing
await man.queue.push(req, data)
# Cleaning up failures.
man.failures.setLen(0)
else:
peer.updateScore(PeerScoreNoBlocks)
man.queue.push(req)
debug "Failed to receive blocks on request",
request_slot = req.slot, request_count = req.count,
request_step = req.step, peer = peer, index = index,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
topics = "syncman"
let failure = SyncFailure.init(SyncFailureKind.BlockDownload, peer)
man.failures.add(failure)
return
except CatchableError as exc:
debug "Unexpected exception while receiving blocks",
request_slot = req.slot, request_count = req.count,
request_step = req.step, peer = peer, index = index,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
errMsg = exc.msg, topics = "syncman"
return
proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} =
mixin getKey, getScore, getHeadSlot
@ -690,150 +831,10 @@ proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} =
await man.notInSyncEvent.wait()
man.workers[index].status = SyncWorkerStatus.WaitingPeer
let peer = await man.pool.acquire()
try:
let wallSlot = man.getLocalWallSlot()
let headSlot = man.getLocalHeadSlot()
var peerSlot = peer.getHeadSlot()
# We updating SyncQueue's last slot all the time
man.queue.updateLastSlot(wallSlot)
debug "Peer's syncing status", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot,
peer_score = peer.getScore(), peer = peer,
peer_speed = peer.netKbps(), topics = "syncman"
# Check if peer's head slot is bigger than our wall clock slot.
if peerSlot > wallSlot + man.toleranceValue:
# Our wall timer is broken, or peer's status information is invalid.
warn "Local timer is broken or peer's status information is invalid",
wall_clock_slot = wallSlot, remote_head_slot = peerSlot,
local_head_slot = headSlot, peer = peer,
tolerance_value = man.toleranceValue, peer_speed = peer.netKbps(),
peer_score = peer.getScore(), topics = "syncman"
let failure = SyncFailure.init(SyncFailureKind.StatusInvalid, peer)
man.failures.add(failure)
continue
# Check if we need to update peer's status information
if peerAge >= man.maxStatusAge:
# Peer's status information is very old, its time to update it
man.workers[index].status = SyncWorkerStatus.UpdatingStatus
debug "Updating peer's status information", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot,
peer = peer, peer_score = peer.getScore(),
peer_speed = peer.netKbps(), topics = "syncman"
let res = await peer.updateStatus()
if not(res):
peer.updateScore(PeerScoreNoStatus)
debug "Failed to get remote peer's status, exiting", peer = peer,
peer_score = peer.getScore(), peer_head_slot = peerSlot,
peer_speed = peer.netKbps(), topics = "syncman"
let failure = SyncFailure.init(SyncFailureKind.StatusDownload, peer)
man.failures.add(failure)
continue
let newPeerSlot = peer.getHeadSlot()
if peerSlot >= newPeerSlot:
peer.updateScore(PeerScoreStaleStatus)
debug "Peer's status information is stale",
wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot,
local_head_slot = headSlot, remote_new_head_slot = newPeerSlot,
peer = peer, peer_score = peer.getScore(),
peer_speed = peer.netKbps(), topics = "syncman"
else:
debug "Peer's status information updated", wall_clock_slot = wallSlot,
remote_old_head_slot = peerSlot, local_head_slot = headSlot,
remote_new_head_slot = newPeerSlot, peer = peer,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
topics = "syncman"
peer.updateScore(PeerScoreGoodStatus)
peerSlot = newPeerSlot
if (peerAge <= man.maxHeadAge) and (headAge <= man.maxHeadAge):
debug "We are in sync with peer, exiting", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot,
peer = peer, peer_score = peer.getScore(),
peer_speed = peer.netKbps(), topics = "syncman"
continue
man.workers[index].status = SyncWorkerStatus.Requesting
let req = man.queue.pop(peerSlot, peer)
if req.isEmpty():
# SyncQueue could return empty request in 2 cases:
# 1. There no more slots in SyncQueue to download (we are synced, but
# our ``notInSyncEvent`` is not yet cleared).
# 2. Current peer's known head slot is too low to satisfy request.
#
# To avoid endless loop we going to wait for RESP_TIMEOUT time here.
# This time is enough for all pending requests to finish and it is also
# enough for main sync loop to clear ``notInSyncEvent``.
debug "Empty request received from queue, exiting", peer = peer,
local_head_slot = headSlot, remote_head_slot = peerSlot,
queue_input_slot = man.queue.inpSlot,
queue_output_slot = man.queue.outSlot,
queue_last_slot = man.queue.lastSlot,
peer_speed = peer.netKbps(), peer_score = peer.getScore(),
topics = "syncman"
await sleepAsync(RESP_TIMEOUT)
continue
man.workers[index].status = SyncWorkerStatus.Downloading
debug "Creating new request for peer", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot,
request_slot = req.slot, request_count = req.count,
request_step = req.step, peer = peer, peer_speed = peer.netKbps(),
peer_score = peer.getScore(), topics = "syncman"
let blocks = await man.getBlocks(peer, req)
if blocks.isOk:
let data = blocks.get()
let smap = getShortMap(req, data)
debug "Received blocks on request", blocks_count = len(data),
blocks_map = smap, request_slot = req.slot,
request_count = req.count, request_step = req.step,
peer = peer, peer_score = peer.getScore(),
peer_speed = peer.netKbps(), topics = "syncman"
if not(checkResponse(req, data)):
peer.updateScore(PeerScoreBadResponse)
warn "Received blocks sequence is not in requested range",
blocks_count = len(data), blocks_map = smap,
request_slot = req.slot, request_count = req.count,
request_step = req.step, peer = peer,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
topics = "syncman"
let failure = SyncFailure.init(SyncFailureKind.BadResponse, peer)
man.failures.add(failure)
continue
# Scoring will happen in `syncUpdate`.
man.workers[index].status = SyncWorkerStatus.Processing
await man.queue.push(req, data)
# Cleaning up failures.
man.failures.setLen(0)
else:
peer.updateScore(PeerScoreNoBlocks)
man.queue.push(req)
debug "Failed to receive blocks on request",
request_slot = req.slot, request_count = req.count,
request_step = req.step, peer = peer,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
topics = "syncman"
let failure = SyncFailure.init(SyncFailureKind.BlockDownload, peer)
man.failures.add(failure)
except CatchableError as exc:
debug "Unexpected exception happened", topics = "syncman",
excName = $exc.name, excMsg = exc.msg
finally:
man.pool.release(peer)
await man.syncStep(index, peer)
man.pool.release(peer)
proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string,
sleeping: int,

View File

@ -46,19 +46,21 @@ suiteReport "PeerPool testing suite":
var pool = newPeerPool[PeerTest, PeerTestID](item[0], item[1], item[2])
for i in 0 ..< item[4]:
var peer = PeerTest.init("idInc" & $i)
check pool.addIncomingPeerNoWait(peer) == true
check pool.addPeerNoWait(peer, PeerType.Incoming) == PeerStatus.Success
for i in 0 ..< item[5]:
var peer = PeerTest.init("idOut" & $i)
check pool.addOutgoingPeerNoWait(peer) == true
check pool.addPeerNoWait(peer, PeerType.Outgoing) == PeerStatus.Success
var peer = PeerTest.init("idCheck")
if item[1] != -1:
for i in 0 ..< item[3]:
check pool.addIncomingPeerNoWait(peer) == false
check pool.addPeerNoWait(peer, PeerType.Incoming) ==
PeerStatus.NoSpaceError
if item[2] != -1:
for i in 0 ..< item[3]:
check pool.addOutgoingPeerNoWait(peer) == false
check pool.addPeerNoWait(peer, PeerType.Outgoing) ==
PeerStatus.NoSpaceError
check:
pool.lenAvailable == item[3]
pool.lenAvailable({PeerType.Incoming}) == item[4]
@ -72,9 +74,9 @@ suiteReport "PeerPool testing suite":
var peer0 = PeerTest.init("idInc0")
var peer1 = PeerTest.init("idOut0")
var peer2 = PeerTest.init("idInc1")
var fut0 = pool.addIncomingPeer(peer0)
var fut1 = pool.addOutgoingPeer(peer1)
var fut2 = pool.addIncomingPeer(peer2)
var fut0 = pool.addPeer(peer0, PeerType.Incoming)
var fut1 = pool.addPeer(peer1, PeerType.Outgoing)
var fut2 = pool.addPeer(peer2, PeerType.Incoming)
doAssert(fut0.finished == true and fut0.failed == false)
doAssert(fut1.finished == false)
doAssert(fut2.finished == false)
@ -92,10 +94,10 @@ suiteReport "PeerPool testing suite":
var peer1 = PeerTest.init("idOut0")
var peer2 = PeerTest.init("idInc1")
var peer3 = PeerTest.init("idOut1")
var fut0 = pool.addIncomingPeer(peer0)
var fut1 = pool.addOutgoingPeer(peer1)
var fut2 = pool.addIncomingPeer(peer2)
var fut3 = pool.addOutgoingPeer(peer3)
var fut0 = pool.addPeer(peer0, PeerType.Incoming)
var fut1 = pool.addPeer(peer1, PeerType.Outgoing)
var fut2 = pool.addPeer(peer2, PeerType.Incoming)
var fut3 = pool.addPeer(peer3, PeerType.Outgoing)
doAssert(fut0.finished == true and fut0.failed == false)
doAssert(fut1.finished == true and fut1.failed == false)
doAssert(fut2.finished == false)
@ -118,10 +120,10 @@ suiteReport "PeerPool testing suite":
var peer2 = PeerTest.init("idOut0")
var peer3 = PeerTest.init("idOut1")
var fut0 = pool.addIncomingPeer(peer0)
var fut1 = pool.addIncomingPeer(peer1)
var fut2 = pool.addOutgoingPeer(peer2)
var fut3 = pool.addOutgoingPeer(peer3)
var fut0 = pool.addPeer(peer0, PeerType.Incoming)
var fut1 = pool.addPeer(peer1, PeerType.Incoming)
var fut2 = pool.addPeer(peer2, PeerType.Outgoing)
var fut3 = pool.addPeer(peer3, PeerType.Outgoing)
doAssert(fut0.finished == true and fut0.failed == false)
doAssert(fut1.finished == false)
doAssert(fut2.finished == true and fut2.failed == false)
@ -145,12 +147,12 @@ suiteReport "PeerPool testing suite":
var peer4 = PeerTest.init("idOut2")
var peer5 = PeerTest.init("idInc2")
var fut0 = pool.addIncomingPeer(peer0)
var fut1 = pool.addIncomingPeer(peer1)
var fut2 = pool.addOutgoingPeer(peer2)
var fut3 = pool.addOutgoingPeer(peer3)
var fut4 = pool.addOutgoingPeer(peer4)
var fut5 = pool.addIncomingPeer(peer5)
var fut0 = pool.addPeer(peer0, PeerType.Incoming)
var fut1 = pool.addPeer(peer1, PeerType.Incoming)
var fut2 = pool.addPeer(peer2, PeerType.Outgoing)
var fut3 = pool.addPeer(peer3, PeerType.Outgoing)
var fut4 = pool.addPeer(peer4, PeerType.Outgoing)
var fut5 = pool.addPeer(peer5, PeerType.Incoming)
doAssert(fut0.finished == true and fut0.failed == false)
doAssert(fut1.finished == true and fut1.failed == false)
@ -211,10 +213,10 @@ suiteReport "PeerPool testing suite":
var peer21 = PeerTest.init("peer21")
var peer22 = PeerTest.init("peer22")
check:
pool1.addPeerNoWait(peer11, PeerType.Incoming) == true
pool1.addPeerNoWait(peer12, PeerType.Incoming) == true
pool2.addPeerNoWait(peer21, PeerType.Outgoing) == true
pool2.addPeerNoWait(peer22, PeerType.Outgoing) == true
pool1.addPeerNoWait(peer11, PeerType.Incoming) == PeerStatus.Success
pool1.addPeerNoWait(peer12, PeerType.Incoming) == PeerStatus.Success
pool2.addPeerNoWait(peer21, PeerType.Outgoing) == PeerStatus.Success
pool2.addPeerNoWait(peer22, PeerType.Outgoing) == PeerStatus.Success
var itemFut11 = pool1.acquire({PeerType.Outgoing})
var itemFut12 = pool1.acquire(10, {PeerType.Outgoing})
@ -298,9 +300,9 @@ suiteReport "PeerPool testing suite":
var peer = PeerTest.init("peer" & $i, rand(MaxNumber))
# echo repr peer
if rand(100) mod 2 == 0:
check pool.addPeerNoWait(peer, PeerType.Incoming) == true
check pool.addPeerNoWait(peer, PeerType.Incoming) == PeerStatus.Success
else:
check pool.addPeerNoWait(peer, PeerType.Outgoing) == true
check pool.addPeerNoWait(peer, PeerType.Outgoing) == PeerStatus.Success
check waitFor(testAcquireRelease()) == TestsCount
@ -310,7 +312,8 @@ suiteReport "PeerPool testing suite":
var peer = PeerTest.init("deletePeer")
## Delete available peer
doAssert(pool.addIncomingPeerNoWait(peer) == true)
doAssert(pool.addPeerNoWait(peer,
PeerType.Incoming) == PeerStatus.Success)
doAssert(pool.len == 1)
doAssert(pool.lenAvailable == 1)
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
@ -323,7 +326,8 @@ suiteReport "PeerPool testing suite":
## Delete acquired peer
peer = PeerTest.init("closingPeer")
doAssert(pool.addIncomingPeerNoWait(peer) == true)
doAssert(pool.addPeerNoWait(peer,
PeerType.Incoming) == PeerStatus.Success)
doAssert(pool.len == 1)
doAssert(pool.lenAvailable == 1)
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
@ -342,7 +346,8 @@ suiteReport "PeerPool testing suite":
## Force delete acquired peer
peer = PeerTest.init("closingPeer")
doAssert(pool.addIncomingPeerNoWait(peer) == true)
doAssert(pool.addPeerNoWait(peer,
PeerType.Incoming) == PeerStatus.Success)
doAssert(pool.len == 1)
doAssert(pool.lenAvailable == 1)
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
@ -363,7 +368,8 @@ suiteReport "PeerPool testing suite":
var peer = PeerTest.init("closingPeer")
## Close available peer
doAssert(pool.addIncomingPeerNoWait(peer) == true)
doAssert(pool.addPeerNoWait(peer,
PeerType.Incoming) == PeerStatus.Success)
doAssert(pool.len == 1)
doAssert(pool.lenAvailable == 1)
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
@ -378,7 +384,8 @@ suiteReport "PeerPool testing suite":
## Close acquired peer
peer = PeerTest.init("closingPeer")
doAssert(pool.addIncomingPeerNoWait(peer) == true)
doAssert(pool.addPeerNoWait(peer,
PeerType.Incoming) == PeerStatus.Success)
doAssert(pool.len == 1)
doAssert(pool.lenAvailable == 1)
doAssert(pool.lenAvailable({PeerType.Outgoing}) == 0)
@ -411,9 +418,9 @@ suiteReport "PeerPool testing suite":
var peer3 = PeerTest.init("peer3", 8)
check:
pool.addPeerNoWait(peer1, PeerType.Incoming) == true
pool.addPeerNoWait(peer2, PeerType.Incoming) == true
pool.addPeerNoWait(peer3, PeerType.Outgoing) == true
pool.addPeerNoWait(peer1, PeerType.Incoming) == PeerStatus.Success
pool.addPeerNoWait(peer2, PeerType.Incoming) == PeerStatus.Success
pool.addPeerNoWait(peer3, PeerType.Outgoing) == PeerStatus.Success
pool.lenAvailable == 3
pool.lenAvailable({PeerType.Outgoing}) == 1
pool.lenAvailable({PeerType.Incoming}) == 2
@ -430,9 +437,9 @@ suiteReport "PeerPool testing suite":
pool.len == 0
check:
pool.addPeerNoWait(peer1, PeerType.Incoming) == true
pool.addPeerNoWait(peer2, PeerType.Incoming) == true
pool.addPeerNoWait(peer3, PeerType.Outgoing) == true
pool.addPeerNoWait(peer1, PeerType.Incoming) == PeerStatus.Success
pool.addPeerNoWait(peer2, PeerType.Incoming) == PeerStatus.Success
pool.addPeerNoWait(peer3, PeerType.Outgoing) == PeerStatus.Success
pool.lenAvailable == 3
pool.lenAvailable({PeerType.Outgoing}) == 1
pool.lenAvailable({PeerType.Incoming}) == 2
@ -458,9 +465,9 @@ suiteReport "PeerPool testing suite":
var peer3 = PeerTest.init("peer3", 8)
check:
pool.addPeerNoWait(peer1, PeerType.Incoming) == true
pool.addPeerNoWait(peer2, PeerType.Incoming) == true
pool.addPeerNoWait(peer3, PeerType.Outgoing) == true
pool.addPeerNoWait(peer1, PeerType.Incoming) == PeerStatus.Success
pool.addPeerNoWait(peer2, PeerType.Incoming) == PeerStatus.Success
pool.addPeerNoWait(peer3, PeerType.Outgoing) == PeerStatus.Success
pool.hasPeer("peer4") == false
pool.hasPeer("peer1") == true
pool.hasPeer("peer2") == true
@ -493,16 +500,16 @@ suiteReport "PeerPool testing suite":
var peer9 = PeerTest.init("peer9", 2)
check:
pool.addPeerNoWait(peer2, PeerType.Incoming) == true
pool.addPeerNoWait(peer3, PeerType.Incoming) == true
pool.addPeerNoWait(peer1, PeerType.Incoming) == true
pool.addPeerNoWait(peer4, PeerType.Incoming) == true
pool.addPeerNoWait(peer2, PeerType.Incoming) == PeerStatus.Success
pool.addPeerNoWait(peer3, PeerType.Incoming) == PeerStatus.Success
pool.addPeerNoWait(peer1, PeerType.Incoming) == PeerStatus.Success
pool.addPeerNoWait(peer4, PeerType.Incoming) == PeerStatus.Success
pool.addPeerNoWait(peer5, PeerType.Outgoing) == true
pool.addPeerNoWait(peer8, PeerType.Outgoing) == true
pool.addPeerNoWait(peer7, PeerType.Outgoing) == true
pool.addPeerNoWait(peer6, PeerType.Outgoing) == true
pool.addPeerNoWait(peer9, PeerType.Outgoing) == true
pool.addPeerNoWait(peer5, PeerType.Outgoing) == PeerStatus.Success
pool.addPeerNoWait(peer8, PeerType.Outgoing) == PeerStatus.Success
pool.addPeerNoWait(peer7, PeerType.Outgoing) == PeerStatus.Success
pool.addPeerNoWait(peer6, PeerType.Outgoing) == PeerStatus.Success
pool.addPeerNoWait(peer9, PeerType.Outgoing) == PeerStatus.Success
var total1, total2, total3: seq[PeerTest]
var avail1, avail2, avail3: seq[PeerTest]
@ -596,17 +603,19 @@ suiteReport "PeerPool testing suite":
pool.setScoreCheck(scoreCheck)
check:
pool.addPeerNoWait(peer1, PeerType.Incoming) == true
pool.addPeerNoWait(peer2, PeerType.Incoming) == true
pool.addPeerNoWait(peer3, PeerType.Outgoing) == true
pool.addPeerNoWait(peer4, PeerType.Incoming) == false
pool.addPeerNoWait(peer5, PeerType.Outgoing) == false
pool.addPeerNoWait(peer1, PeerType.Incoming) == PeerStatus.Success
pool.addPeerNoWait(peer2, PeerType.Incoming) == PeerStatus.Success
pool.addPeerNoWait(peer3, PeerType.Outgoing) == PeerStatus.Success
pool.addPeerNoWait(peer4, PeerType.Incoming) == PeerStatus.LowScoreError
pool.addPeerNoWait(peer5, PeerType.Outgoing) == PeerStatus.LowScoreError
len(pool) == 3
lenAvailable(pool) == 3
check:
waitFor(pool.addPeer(peer4, PeerType.Incoming)) == false
waitFor(pool.addPeer(peer5, PeerType.Outgoing)) == false
waitFor(pool.addPeer(peer4, PeerType.Incoming)) ==
PeerStatus.LowScoreError
waitFor(pool.addPeer(peer5, PeerType.Outgoing)) ==
PeerStatus.LowScoreError
len(pool) == 3
lenAvailable(pool) == 3
@ -656,9 +665,9 @@ suiteReport "PeerPool testing suite":
var peer0 = PeerTest.init("idInc0", 100)
var peer1 = PeerTest.init("idOut0", 100)
var peer2 = PeerTest.init("idInc1", 100)
var fut0 = pool.addIncomingPeer(peer0)
var fut1 = pool.addOutgoingPeer(peer1)
var fut2 = pool.addIncomingPeer(peer2)
var fut0 = pool.addPeer(peer0, PeerType.Incoming)
var fut1 = pool.addPeer(peer1, PeerType.Outgoing)
var fut2 = pool.addPeer(peer2, PeerType.Incoming)
doAssert(fut0.finished == true and fut0.failed == false)
doAssert(fut1.finished == false)
doAssert(fut2.finished == false)
@ -673,3 +682,251 @@ suiteReport "PeerPool testing suite":
result = true
check waitFor(testDeleteOnRelease()) == true
timedTest "Space tests":
var pool1 = newPeerPool[PeerTest, PeerTestID](maxPeers = 79)
var pool2 = newPeerPool[PeerTest, PeerTestID](maxPeers = 79,
maxIncomingPeers = 39)
var pool3 = newPeerPool[PeerTest, PeerTestID](maxPeers = 79,
maxOutgoingPeers = 40)
var pool4 = newPeerPool[PeerTest, PeerTestID](maxPeers = 79,
maxOutgoingPeers = 40,
maxIncomingPeers = 0)
var pool5 = newPeerPool[PeerTest, PeerTestID](maxPeers = 79,
maxIncomingPeers = 39,
maxOutgoingPeers = 0)
var pool6 = newPeerPool[PeerTest, PeerTestID](maxPeers = 79,
maxIncomingPeers = 39,
maxOutgoingPeers = 40)
var pool7 = newPeerPool[PeerTest, PeerTestID](maxIncomingPeers = 39)
var pool8 = newPeerPool[PeerTest, PeerTestID](maxOutgoingPeers = 40)
var pool9 = newPeerPool[PeerTest, PeerTestID]()
check:
pool1.lenSpace() == 79
pool1.lenSpace({PeerType.Incoming}) == 79
pool1.lenSpace({PeerType.Outgoing}) == 79
pool2.lenSpace() == 79
pool2.lenSpace({PeerType.Incoming}) == 39
pool2.lenSpace({PeerType.Outgoing}) == 79
pool3.lenSpace() == 79
pool3.lenSpace({PeerType.Incoming}) == 79
pool3.lenSpace({PeerType.Outgoing}) == 40
pool4.lenSpace() == 40
pool4.lenSpace({PeerType.Incoming}) == 0
pool4.lenSpace({PeerType.Outgoing}) == 40
pool5.lenSpace() == 39
pool5.lenSpace({PeerType.Incoming}) == 39
pool5.lenSpace({PeerType.Outgoing}) == 0
pool6.lenSpace() == 79
pool6.lenSpace({PeerType.Incoming}) == 39
pool6.lenSpace({PeerType.Outgoing}) == 40
pool7.lenSpace() == high(int)
pool7.lenSpace({PeerType.Incoming}) == 39
pool7.lenSpace({PeerType.Outgoing}) == high(int)
pool8.lenSpace() == high(int)
pool8.lenSpace({PeerType.Incoming}) == high(int)
pool8.lenSpace({PeerType.Outgoing}) == 40
pool9.lenSpace() == high(int)
pool9.lenSpace({PeerType.Incoming}) == high(int)
pool9.lenSpace({PeerType.Outgoing}) == high(int)
# POOL 1
for i in 0 ..< 79:
if i mod 2 == 0:
check pool1.addPeerNoWait(PeerTest.init("idInc" & $i),
PeerType.Incoming) == PeerStatus.Success
else:
check pool1.addPeerNoWait(PeerTest.init("idOut" & $i),
PeerType.Outgoing) == PeerStatus.Success
check pool1.lenSpace() == 79 - (i + 1)
# POOL 2
for i in 0 ..< 39:
check:
pool2.addPeerNoWait(PeerTest.init("idInc" & $i),
PeerType.Incoming) == PeerStatus.Success
pool2.lenSpace() == 79 - (i + 1)
pool2.lenSpace({PeerType.Incoming}) == 39 - (i + 1)
pool2.lenSpace({PeerType.Outgoing}) == 79 - (i + 1)
check:
pool2.addPeerNoWait(PeerTest.init("idInc39"),
PeerType.Incoming) == PeerStatus.NoSpaceError
pool2.lenSpace({PeerType.Incoming}) == 0
for i in 39 ..< 79:
check:
pool2.addPeerNoWait(PeerTest.init("idOut" & $i),
PeerType.Outgoing) == PeerStatus.Success
pool2.addPeerNoWait(PeerTest.init("idIncSome"),
PeerType.Incoming) == PeerStatus.NoSpaceError
pool2.lenSpace() == 79 - (i + 1)
pool2.lenSpace({PeerType.Incoming}) == 0
pool2.lenSpace({PeerType.Outgoing}) == 79 - (i + 1)
check:
pool2.addPeerNoWait(PeerTest.init("idOut79"),
PeerType.Outgoing) == PeerStatus.NoSpaceError
pool2.addPeerNoWait(PeerTest.init("idInc79"),
PeerType.Incoming) == PeerStatus.NoSpaceError
pool2.lenSpace() == 0
pool2.lenSpace({PeerType.Incoming}) == 0
pool2.lenSpace({PeerType.Outgoing}) == 0
# POOL 3
for i in 0 ..< 40:
check:
pool3.addPeerNoWait(PeerTest.init("idOut" & $i),
PeerType.Outgoing) == PeerStatus.Success
pool3.lenSpace() == 79 - (i + 1)
pool3.lenSpace({PeerType.Outgoing}) == 40 - (i + 1)
pool3.lenSpace({PeerType.Incoming}) == 79 - (i + 1)
check:
pool3.addPeerNoWait(PeerTest.init("idInc40"),
PeerType.Outgoing) == PeerStatus.NoSpaceError
pool3.lenSpace({PeerType.Outgoing}) == 0
for i in 40 ..< 79:
check:
pool3.addPeerNoWait(PeerTest.init("idInc" & $i),
PeerType.Incoming) == PeerStatus.Success
pool3.addPeerNoWait(PeerTest.init("idOutSome"),
PeerType.Outgoing) == PeerStatus.NoSpaceError
pool3.lenSpace() == 79 - (i + 1)
pool3.lenSpace({PeerType.Outgoing}) == 0
pool3.lenSpace({PeerType.Incoming}) == 79 - (i + 1)
check:
pool3.addPeerNoWait(PeerTest.init("idInc79"),
PeerType.Incoming) == PeerStatus.NoSpaceError
pool3.addPeerNoWait(PeerTest.init("idOut79"),
PeerType.Outgoing) == PeerStatus.NoSpaceError
pool3.lenSpace() == 0
pool3.lenSpace({PeerType.Incoming}) == 0
pool3.lenSpace({PeerType.Outgoing}) == 0
# POOL 4
for i in 0 ..< 40:
check:
pool4.addPeerNoWait(PeerTest.init("idOut" & $i),
PeerType.Outgoing) == PeerStatus.Success
pool4.addPeerNoWait(PeerTest.init("idIncSome"),
PeerType.Incoming) == PeerStatus.NoSpaceError
pool4.lenSpace() == 40 - (i + 1)
pool4.lenSpace({PeerType.Incoming}) == 0
pool4.lenSpace({PeerType.Outgoing}) == 40 - (i + 1)
check:
pool4.addPeerNoWait(PeerTest.init("idOut40"),
PeerType.Outgoing) == PeerStatus.NoSpaceError
pool4.addPeerNoWait(PeerTest.init("idInc40"),
PeerType.Incoming) == PeerStatus.NoSpaceError
pool4.lenSpace() == 0
pool4.lenSpace({PeerType.Incoming}) == 0
pool4.lenSpace({PeerType.Outgoing}) == 0
# POOL 5
for i in 0 ..< 39:
check:
pool5.addPeerNoWait(PeerTest.init("idInc" & $i),
PeerType.Incoming) == PeerStatus.Success
pool5.addPeerNoWait(PeerTest.init("idOutSome"),
PeerType.Outgoing) == PeerStatus.NoSpaceError
pool5.lenSpace() == 39 - (i + 1)
pool5.lenSpace({PeerType.Incoming}) == 39 - (i + 1)
pool5.lenSpace({PeerType.Outgoing}) == 0
check:
pool5.addPeerNoWait(PeerTest.init("idOut39"),
PeerType.Outgoing) == PeerStatus.NoSpaceError
pool5.addPeerNoWait(PeerTest.init("idInc39"),
PeerType.Incoming) == PeerStatus.NoSpaceError
pool5.lenSpace() == 0
pool5.lenSpace({PeerType.Incoming}) == 0
pool5.lenSpace({PeerType.Outgoing}) == 0
# POOL 6
for i in 0 ..< 39:
check:
pool6.addPeerNoWait(PeerTest.init("idInc" & $i),
PeerType.Incoming) == PeerStatus.Success
pool6.addPeerNoWait(PeerTest.init("idOut" & $(i + 39)),
PeerType.Outgoing) == PeerStatus.Success
pool6.lenSpace() == 79 - (i + 1) * 2
pool6.lenSpace({PeerType.Incoming}) == 39 - (i + 1)
pool6.lenSpace({PeerType.Outgoing}) == 40 - (i + 1)
check:
pool6.addPeerNoWait(PeerTest.init("idInc39"),
PeerType.Incoming) == PeerStatus.NoSpaceError
pool6.addPeerNoWait(PeerTest.init("idOut79"),
PeerType.Outgoing) == PeerStatus.Success
pool6.addPeerNoWait(PeerTest.init("idOut80"),
PeerType.Outgoing) == PeerStatus.NoSpaceError
pool6.lenSpace() == 0
pool6.lenSpace({PeerType.Incoming}) == 0
pool6.lenSpace({PeerType.Outgoing}) == 0
# POOL 7
for i in 0 ..< 39:
check:
pool7.addPeerNoWait(PeerTest.init("idInc" & $i),
PeerType.Incoming) == PeerStatus.Success
pool7.lenSpace() == high(int) - (i + 1)
pool7.lenSpace({PeerType.Incoming}) == 39 - (i + 1)
pool7.lenSpace({PeerType.Outgoing}) == high(int) - (i + 1)
check:
pool7.addPeerNoWait(PeerTest.init("idInc39"),
PeerType.Incoming) == PeerStatus.NoSpaceError
pool7.lenSpace() == high(int) - 39
pool7.lenSpace({PeerType.Incoming}) == 0
pool7.lenSpace({PeerType.Outgoing}) == high(int) - 39
# We could not check whole high(int), so we check 10_000 items
for i in 0 ..< 10_000:
check:
pool7.addPeerNoWait(PeerTest.init("idOut" & $i),
PeerType.Outgoing) == PeerStatus.Success
pool7.lenSpace() == high(int) - 39 - (i + 1)
pool7.lenSpace({PeerType.Incoming}) == 0
pool7.lenSpace({PeerType.Outgoing}) == high(int) - 39 - (i + 1)
# POOL 8
for i in 0 ..< 40:
check:
pool8.addPeerNoWait(PeerTest.init("idOut" & $i),
PeerType.Outgoing) == PeerStatus.Success
pool8.lenSpace() == high(int) - (i + 1)
pool8.lenSpace({PeerType.Outgoing}) == 40 - (i + 1)
pool8.lenSpace({PeerType.Incoming}) == high(int) - (i + 1)
check:
pool8.addPeerNoWait(PeerTest.init("idOut40"),
PeerType.Outgoing) == PeerStatus.NoSpaceError
pool8.lenSpace() == high(int) - 40
pool8.lenSpace({PeerType.Outgoing}) == 0
pool8.lenSpace({PeerType.Incoming}) == high(int) - 40
# We could not check whole high(int), so we check 10_000 items
for i in 0 ..< 10_000:
check:
pool8.addPeerNoWait(PeerTest.init("idInc" & $i),
PeerType.Incoming) == PeerStatus.Success
pool8.lenSpace() == high(int) - 40 - (i + 1)
pool8.lenSpace({PeerType.Outgoing}) == 0
pool8.lenSpace({PeerType.Incoming}) == high(int) - 40 - (i + 1)
# POOL 9
# We could not check whole high(int), so we check 10_000 items
for i in 0 ..< 10_000:
check:
pool9.addPeerNoWait(PeerTest.init("idInc" & $i),
PeerType.Incoming) == PeerStatus.Success
pool9.addPeerNoWait(PeerTest.init("idOut" & $i),
PeerType.Outgoing) == PeerStatus.Success
pool9.lenSpace() == high(int) - (i + 1) * 2
pool9.lenSpace({PeerType.Outgoing}) == high(int) - (i + 1) * 2
pool9.lenSpace({PeerType.Incoming}) == high(int) - (i + 1) * 2