Add "drop by score" ability to PeerPool. (#917)

* Add "drop by score" ability to PeerPool.
Add tests.
Fix syncmanager queue to start from most fresh data.

* Fix endless cycle at the end of syncing process.
This commit is contained in:
Eugene Kabanov 2020-04-23 18:31:00 +03:00 committed by GitHub
parent 7cd4b0bfae
commit be89a3c54d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 164 additions and 24 deletions

View File

@ -951,12 +951,7 @@ proc updateStatus*(peer: Peer): Future[bool] {.async.} =
peer.state(BeaconSync).statusMsg = theirStatus.get()
result = true
proc updateScore*(peer: Peer, score: int) =
## Update peer's ``peer`` score with value ``score``.
peer.score = peer.score + score
proc runSyncLoop(node: BeaconNode) {.async.} =
proc getLocalHeadSlot(): Slot =
result = node.blockPool.head.blck.slot
@ -966,15 +961,28 @@ proc runSyncLoop(node: BeaconNode) {.async.} =
proc updateLocalBlocks(list: openarray[SignedBeaconBlock]): bool =
debug "Forward sync imported blocks", count = len(list),
local_head_slot = $getLocalHeadSlot()
local_head_slot = getLocalHeadSlot()
for blk in list:
if not(node.storeBlock(blk)):
return false
discard node.updateHead()
info "Forward sync blocks got imported sucessfully", count = $len(list),
local_head_slot = $getLocalHeadSlot()
info "Forward sync blocks got imported sucessfully", count = len(list),
local_head_slot = getLocalHeadSlot()
result = true
proc scoreCheck(peer: Peer): bool =
if peer.score < PeerScoreLimit:
try:
debug "Peer score is too low, removing it from PeerPool", peer = peer,
peer_score = peer.score, score_limit = PeerScoreLimit
except:
discard
result = false
else:
result = true
node.network.peerPool.setScoreCheck(scoreCheck)
var syncman = newSyncManager[Peer, PeerID](
node.network.peerPool, getLocalHeadSlot, getLocalWallSlot,
updateLocalBlocks

View File

@ -145,6 +145,11 @@ const
readTimeoutErrorMsg = "Exceeded read timeout for a request"
NewPeerScore* = 200
## Score which will be assigned to new connected Peer
PeerScoreLimit* = 0
## Score after which peer will be kicked
# Metrics for tracking attestation and beacon block loss
declareCounter gossip_messages_sent,
"Number of gossip messages sent by this peer"
@ -198,6 +203,10 @@ proc `<`*(a, b: Peer): bool =
proc getScore*(a: Peer): int =
result = a.score
proc updateScore*(peer: Peer, score: int) =
## Update peer's ``peer`` score with value ``score``.
peer.score = peer.score + score
proc disconnect*(peer: Peer, reason: DisconnectionReason,
notifyOtherPeer = false) {.async.} =
# TODO: How should we notify the other peer?
@ -611,6 +620,7 @@ proc handleOutgoingPeer*(peer: Peer): Future[void] {.async.} =
let res = await network.peerPool.addOutgoingPeer(peer)
if res:
peer.updateScore(NewPeerScore)
debug "Peer (outgoing) has been added to PeerPool", peer = $peer.info
peer.getFuture().addCallback(onPeerClosed)
libp2p_peers.set int64(len(network.peerPool))
@ -624,6 +634,7 @@ proc handleIncomingPeer*(peer: Peer) =
let res = network.peerPool.addIncomingPeerNoWait(peer)
if res:
peer.updateScore(NewPeerScore)
debug "Peer (incoming) has been added to PeerPool", peer = $peer.info
peer.getFuture().addCallback(onPeerClosed)
libp2p_peers.set int64(len(network.peerPool))

View File

@ -18,6 +18,8 @@ type
data: int
cmp: proc(a, b: PeerIndex): bool {.closure, gcsafe.}
PeerScoreCheckCallback*[T] = proc(peer: T): bool {.gcsafe, raises: [Defect].}
PeerPool*[A, B] = ref object
incNotEmptyEvent: AsyncEvent
outNotEmptyEvent: AsyncEvent
@ -28,6 +30,7 @@ type
registry: Table[B, PeerIndex]
storage: seq[PeerItem[A]]
cmp: proc(a, b: PeerIndex): bool {.closure, gcsafe.}
scoreCheck: PeerScoreCheckCallback[A]
maxPeersCount: int
maxIncPeersCount: int
maxOutPeersCount: int
@ -125,9 +128,9 @@ template getItem[A, B](pool: PeerPool[A, B],
pindex = pool.incQueue.pop().data
addr(pool.storage[pindex])
proc newPeerPool*[A, B](maxPeers = -1,
maxIncomingPeers = -1,
maxOutgoingPeers = -1): PeerPool[A, B] =
proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1,
maxOutgoingPeers = -1,
scoreCheckCb: PeerScoreCheckCallback[A] = nil): PeerPool[A, B] =
## Create new PeerPool.
##
## ``maxPeers`` - maximum number of peers allowed. All the peers which
@ -142,6 +145,10 @@ proc newPeerPool*[A, B](maxPeers = -1,
## outgoing peers exceeds this number will be rejected. By default this
## number if infinite.
##
## ``scoreCheckCb`` - callback which will be called for all released peers.
## If callback procedure returns ``false`` peer will be removed from
## PeerPool.
##
## Please note, that if ``maxPeers`` is positive non-zero value, then equation
## ``maxPeers >= maxIncomingPeers + maxOutgoingPeers`` must be ``true``.
var res = PeerPool[A, B]()
@ -161,6 +168,7 @@ proc newPeerPool*[A, B](maxPeers = -1,
res.incQueue = initHeapQueue[PeerIndex]()
res.outQueue = initHeapQueue[PeerIndex]()
res.registry = initTable[B, PeerIndex]()
res.scoreCheck = scoreCheckCb
res.storage = newSeq[PeerItem[A]]()
proc peerCmp(a, b: PeerIndex): bool {.closure, gcsafe.} =
@ -196,6 +204,16 @@ proc lenAcquired*[A, B](pool: PeerPool[A, B],
if PeerType.Outgoing in filter:
result = result + pool.acqOutPeersCount
proc checkPeerScore*[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} =
## Returns ``true`` if peer passing score check.
if not(isNil(pool.scoreCheck)):
if pool.scoreCheck(peer):
result = true
else:
result = false
else:
result = true
proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool =
## Remove ``peer`` from PeerPool ``pool``.
##
@ -274,6 +292,9 @@ proc addPeerNoWait*[A, B](pool: PeerPool[A, B],
## Procedure returns ``true`` on success.
mixin getKey, getFuture
if not(pool.checkPeerScore(peer)):
return false
result = false
let peerKey = getKey(peer)
@ -308,6 +329,9 @@ proc addPeer*[A, B](pool: PeerPool[A, B],
## Procedure returns ``true`` on success.
mixin getKey, getFuture
if not(pool.checkPeerScore(peer)):
return false
var res = false
let peerKey = getKey(peer)
@ -317,7 +341,6 @@ proc addPeer*[A, B](pool: PeerPool[A, B],
if peerType == PeerType.Incoming:
if pool.curIncPeersCount >= pool.maxIncPeersCount:
await pool.waitNotFullEvent(peerType)
let pindex = pool.addPeerImpl(peer, peerKey, peerType)
inc(pool.curIncPeersCount)
pool.incQueue.push(pindex)
@ -408,6 +431,8 @@ proc release*[A, B](pool: PeerPool[A, B], peer: A) =
let pindex = titem.data
var item = addr(pool.storage[pindex])
if PeerFlags.Acquired in item[].flags:
if not(pool.checkPeerScore(peer)):
item[].flags.incl(DeleteOnRelease)
item[].flags.excl(PeerFlags.Acquired)
if PeerFlags.DeleteOnRelease in item[].flags:
if item[].peerType == PeerType.Incoming:
@ -607,3 +632,8 @@ proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async.} =
for item in peers:
acquired.add(item)
pool.clear()
proc setScoreCheck*[A, B](pool: PeerPool[A, B],
scoreCheckCb: PeerScoreCheckCallback[A]) =
## Add ScoreCheck callback.
pool.scoreCheck = scoreCheckCb

View File

@ -7,6 +7,18 @@ export datatypes, digest, chronos, chronicles
logScope:
topics = "syncman"
const
PeerScoreNoStatus* = -100
## Peer did not answer `status` request.
PeerScoreStaleStatus* = -50
## Peer's `status` answer do not progress in time.
PeerScoreGoodStatus* = 50
## Peer's `status` answer is fine.
PeerScoreNoBlocks* = -100
## Peer did not respond in time on `blocksByRange` request.
PeerScoreGoodBlocks* = 100
## Peer' `blocksByRange` answer is fine.
type
GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].}
@ -48,6 +60,7 @@ type
getLocalHeadSlot: GetSlotCallback
getLocalWallSlot: GetSlotCallback
updateLocalBlocks: UpdateLocalBlocksCallback
chunkSize: uint64
queue: SyncQueue
SyncManagerError* = object of CatchableError
@ -187,10 +200,10 @@ proc total*(sq: SyncQueue): uint64 {.inline.} =
## Returns total number of slots in queue ``sq``.
result = sq.lastSlot - sq.startSlot + 1'u64
proc progress*(sq: SyncQueue): string =
proc progress*(sq: SyncQueue): uint64 =
## Returns queue's ``sq`` progress string.
let curSlot = sq.outSlot - sq.startSlot
result = $curSlot & "/" & $sq.total()
result = (curSlot * 100'u64) div sq.total()
proc newSyncManager*[A, B](pool: PeerPool[A, B],
getLocalHeadSlotCb: GetSlotCallback,
@ -213,6 +226,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
getLocalWallSlot: getLocalWallSlotCb,
maxHeadAge: maxHeadAge,
sleepTime: sleepTime,
chunkSize: chunkSize,
queue: queue
)
@ -255,6 +269,8 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
var headSlot = man.getLocalHeadSlot()
var peerSlot = peer.getHeadSlot()
man.queue.updateLastSlot(wallSlot)
debug "Peer's syncing status", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot,
peer_score = peer.getScore(), peer = peer, topics = "syncman"
@ -275,7 +291,7 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
peer = peer, peer_score = peer.getScore(), topics = "syncman"
let res = await peer.updateStatus()
if not(res):
peer.updateScore(-100)
peer.updateScore(PeerScoreNoStatus)
debug "Failed to get remote peer's status, exiting", peer = peer,
peer_score = peer.getScore(), peer_head_slot = peerSlot,
topics = "syncman"
@ -288,18 +304,16 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
local_head_slot = headSlot,
remote_new_head_slot = newPeerSlot,
peer = peer, peer_score = peer.getScore(), topics = "syncman"
peer.updateScore(-50)
peer.updateScore(PeerScoreStaleStatus)
break
debug "Peer's status information updated", wall_clock_slot = wallSlot,
remote_old_head_slot = peerSlot, local_head_slot = headSlot,
remote_new_head_slot = newPeerSlot, peer = peer,
peer_score = peer.getScore(), topics = "syncman"
peer.updateScore(50)
peer.updateScore(PeerScoreGoodStatus)
peerSlot = newPeerSlot
man.queue.updateLastSlot(wallSlot)
if (peerAge <= man.maxHeadAge) and (headAge <= man.maxHeadAge):
debug "We are in sync with peer, exiting", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot,
@ -314,6 +328,11 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
queue_output_slot = man.queue.outSlot,
queue_last_slot = man.queue.lastSlot,
peer_score = peer.getScore(), topics = "syncman"
# Sometimes when syncing is almost done but last requests are still
# pending, this can fall into endless cycle, when low number of peers
# are available in PeerPool. We going to wait for RESP_TIMEOUT time,
# so all pending requests should be finished at this moment.
await sleepAsync(RESP_TIMEOUT)
break
debug "Creating new request for peer", wall_clock_slot = wallSlot,
@ -326,18 +345,19 @@ proc syncWorker*[A, B](man: SyncManager[A, B],
if blocks.isSome():
let data = blocks.get()
await man.queue.push(req, data)
peer.updateScore(100)
peer.updateScore(PeerScoreGoodBlocks)
debug "Received blocks on request", blocks_count = len(data),
request_slot = req.slot, request_count = req.count,
request_step = req.step, peer = peer,
peer_score = peer.getScore(), topics = "syncman"
else:
peer.updateScore(-100)
peer.updateScore(PeerScoreNoBlocks)
man.queue.push(req)
debug "Failed to receive blocks on request",
request_slot = req.slot, request_count = req.count,
request_step = req.step, peer = peer,
peer_score = peer.getScore(), topics = "syncman"
break
result = peer
finally:
@ -349,8 +369,8 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
var acquireFut: Future[A]
var wallSlot, headSlot: Slot
template workersCount(): string =
if isNil(acquireFut): $len(pending) else: $(len(pending) - 1)
template workersCount(): int =
if isNil(acquireFut): len(pending) else: (len(pending) - 1)
debug "Synchronization loop started", topics = "syncman"
@ -358,8 +378,16 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
wallSlot = man.getLocalWallSlot()
headSlot = man.getLocalHeadSlot()
var progress: uint64
if headSlot <= man.queue.lastSlot:
progress = man.queue.progress()
else:
progress = 100'u64
debug "Synchronization loop start tick", wall_head_slot = wallSlot,
local_head_slot = headSlot, queue_status = man.queue.progress(),
local_head_slot = headSlot, queue_status = progress,
queue_start_slot = man.queue.startSlot,
queue_last_slot = man.queue.lastSlot,
workers_count = workersCount(), topics = "syncman"
if headAge <= man.maxHeadAge:
@ -410,6 +438,9 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
peer_score = peer.getScore(), topics = "syncman"
man.pool.release(peer)
else:
if headSlot > man.queue.lastSlot:
man.queue = SyncQueue.init(headSlot, wallSlot, man.chunkSize,
man.updateLocalBlocks, 2)
debug "Synchronization loop starting new worker", peer = peer,
wall_head_slot = wallSlot, local_head_slot = headSlot,
peer_score = peer.getScore(), topics = "syncman"

View File

@ -537,3 +537,63 @@ suiteReport "PeerPool testing suite":
len(acqui1) == 3
len(acqui2) == 2
len(acqui3) == 1
timedTest "Score check test":
var pool = newPeerPool[PeerTest, PeerTestID]()
proc scoreCheck(peer: PeerTest): bool =
if peer.weight >= 0:
result = true
else:
result = false
var peer1 = PeerTest.init("peer1", 100)
var peer2 = PeerTest.init("peer2", 50)
var peer3 = PeerTest.init("peer3", 1)
var peer4 = PeerTest.init("peer4", -50)
var peer5 = PeerTest.init("peer5", -100)
pool.setScoreCheck(scoreCheck)
check:
pool.addPeerNoWait(peer1, PeerType.Incoming) == true
pool.addPeerNoWait(peer2, PeerType.Incoming) == true
pool.addPeerNoWait(peer3, PeerType.Outgoing) == true
pool.addPeerNoWait(peer4, PeerType.Incoming) == false
pool.addPeerNoWait(peer5, PeerType.Outgoing) == false
len(pool) == 3
lenAvailable(pool) == 3
check:
waitFor(pool.addPeer(peer4, PeerType.Incoming)) == false
waitFor(pool.addPeer(peer5, PeerType.Outgoing)) == false
len(pool) == 3
lenAvailable(pool) == 3
discard waitFor(pool.acquire({PeerType.Incoming}))
discard waitFor(pool.acquire({PeerType.Incoming}))
discard waitFor(pool.acquire({PeerType.Outgoing}))
check:
lenAvailable(pool) == 0
lenAcquired(pool) == 3
len(pool) == 3
peer3.weight -= 2
pool.release(peer3)
check:
lenAvailable(pool) == 0
lenAcquired(pool) == 2
len(pool) == 2
peer2.weight -= 100
pool.release(peer2)
check:
lenAvailable(pool) == 0
lenAcquired(pool) == 1
len(pool) == 1
peer1.weight -= 200
pool.release(peer1)
check:
lenAvailable(pool) == 0
lenAcquired(pool) == 0
len(pool) == 0