From be89a3c54d9625001a0d867f199be016484a397c Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Thu, 23 Apr 2020 18:31:00 +0300 Subject: [PATCH] Add "drop by score" ability to PeerPool. (#917) * Add "drop by score" ability to PeerPool. Add tests. Fix syncmanager queue to start from most fresh data. * Fix endless cycle at the end of syncing process. --- beacon_chain/beacon_node.nim | 24 +++++++++----- beacon_chain/eth2_network.nim | 11 +++++++ beacon_chain/peer_pool.nim | 38 +++++++++++++++++++--- beacon_chain/sync_manager.nim | 55 +++++++++++++++++++++++++------- tests/test_peer_pool.nim | 60 +++++++++++++++++++++++++++++++++++ 5 files changed, 164 insertions(+), 24 deletions(-) diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index f94064f9a..5f8a7625e 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -951,12 +951,7 @@ proc updateStatus*(peer: Peer): Future[bool] {.async.} = peer.state(BeaconSync).statusMsg = theirStatus.get() result = true -proc updateScore*(peer: Peer, score: int) = - ## Update peer's ``peer`` score with value ``score``. - peer.score = peer.score + score - proc runSyncLoop(node: BeaconNode) {.async.} = - proc getLocalHeadSlot(): Slot = result = node.blockPool.head.blck.slot @@ -966,15 +961,28 @@ proc runSyncLoop(node: BeaconNode) {.async.} = proc updateLocalBlocks(list: openarray[SignedBeaconBlock]): bool = debug "Forward sync imported blocks", count = len(list), - local_head_slot = $getLocalHeadSlot() + local_head_slot = getLocalHeadSlot() for blk in list: if not(node.storeBlock(blk)): return false discard node.updateHead() - info "Forward sync blocks got imported sucessfully", count = $len(list), - local_head_slot = $getLocalHeadSlot() + info "Forward sync blocks got imported sucessfully", count = len(list), + local_head_slot = getLocalHeadSlot() result = true + proc scoreCheck(peer: Peer): bool = + if peer.score < PeerScoreLimit: + try: + debug "Peer score is too low, removing it from PeerPool", peer = peer, + peer_score = peer.score, score_limit = PeerScoreLimit + except: + discard + result = false + else: + result = true + + node.network.peerPool.setScoreCheck(scoreCheck) + var syncman = newSyncManager[Peer, PeerID]( node.network.peerPool, getLocalHeadSlot, getLocalWallSlot, updateLocalBlocks diff --git a/beacon_chain/eth2_network.nim b/beacon_chain/eth2_network.nim index 7ad96d0d0..9aa88923c 100644 --- a/beacon_chain/eth2_network.nim +++ b/beacon_chain/eth2_network.nim @@ -145,6 +145,11 @@ const readTimeoutErrorMsg = "Exceeded read timeout for a request" + NewPeerScore* = 200 + ## Score which will be assigned to new connected Peer + PeerScoreLimit* = 0 + ## Score after which peer will be kicked + # Metrics for tracking attestation and beacon block loss declareCounter gossip_messages_sent, "Number of gossip messages sent by this peer" @@ -198,6 +203,10 @@ proc `<`*(a, b: Peer): bool = proc getScore*(a: Peer): int = result = a.score +proc updateScore*(peer: Peer, score: int) = + ## Update peer's ``peer`` score with value ``score``. + peer.score = peer.score + score + proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} = # TODO: How should we notify the other peer? @@ -611,6 +620,7 @@ proc handleOutgoingPeer*(peer: Peer): Future[void] {.async.} = let res = await network.peerPool.addOutgoingPeer(peer) if res: + peer.updateScore(NewPeerScore) debug "Peer (outgoing) has been added to PeerPool", peer = $peer.info peer.getFuture().addCallback(onPeerClosed) libp2p_peers.set int64(len(network.peerPool)) @@ -624,6 +634,7 @@ proc handleIncomingPeer*(peer: Peer) = let res = network.peerPool.addIncomingPeerNoWait(peer) if res: + peer.updateScore(NewPeerScore) debug "Peer (incoming) has been added to PeerPool", peer = $peer.info peer.getFuture().addCallback(onPeerClosed) libp2p_peers.set int64(len(network.peerPool)) diff --git a/beacon_chain/peer_pool.nim b/beacon_chain/peer_pool.nim index 1c5c1b8f1..b24a5d63c 100644 --- a/beacon_chain/peer_pool.nim +++ b/beacon_chain/peer_pool.nim @@ -18,6 +18,8 @@ type data: int cmp: proc(a, b: PeerIndex): bool {.closure, gcsafe.} + PeerScoreCheckCallback*[T] = proc(peer: T): bool {.gcsafe, raises: [Defect].} + PeerPool*[A, B] = ref object incNotEmptyEvent: AsyncEvent outNotEmptyEvent: AsyncEvent @@ -28,6 +30,7 @@ type registry: Table[B, PeerIndex] storage: seq[PeerItem[A]] cmp: proc(a, b: PeerIndex): bool {.closure, gcsafe.} + scoreCheck: PeerScoreCheckCallback[A] maxPeersCount: int maxIncPeersCount: int maxOutPeersCount: int @@ -125,9 +128,9 @@ template getItem[A, B](pool: PeerPool[A, B], pindex = pool.incQueue.pop().data addr(pool.storage[pindex]) -proc newPeerPool*[A, B](maxPeers = -1, - maxIncomingPeers = -1, - maxOutgoingPeers = -1): PeerPool[A, B] = +proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1, + maxOutgoingPeers = -1, + scoreCheckCb: PeerScoreCheckCallback[A] = nil): PeerPool[A, B] = ## Create new PeerPool. ## ## ``maxPeers`` - maximum number of peers allowed. All the peers which @@ -142,6 +145,10 @@ proc newPeerPool*[A, B](maxPeers = -1, ## outgoing peers exceeds this number will be rejected. By default this ## number if infinite. ## + ## ``scoreCheckCb`` - callback which will be called for all released peers. + ## If callback procedure returns ``false`` peer will be removed from + ## PeerPool. + ## ## Please note, that if ``maxPeers`` is positive non-zero value, then equation ## ``maxPeers >= maxIncomingPeers + maxOutgoingPeers`` must be ``true``. var res = PeerPool[A, B]() @@ -161,6 +168,7 @@ proc newPeerPool*[A, B](maxPeers = -1, res.incQueue = initHeapQueue[PeerIndex]() res.outQueue = initHeapQueue[PeerIndex]() res.registry = initTable[B, PeerIndex]() + res.scoreCheck = scoreCheckCb res.storage = newSeq[PeerItem[A]]() proc peerCmp(a, b: PeerIndex): bool {.closure, gcsafe.} = @@ -196,6 +204,16 @@ proc lenAcquired*[A, B](pool: PeerPool[A, B], if PeerType.Outgoing in filter: result = result + pool.acqOutPeersCount +proc checkPeerScore*[A, B](pool: PeerPool[A, B], peer: A): bool {.inline.} = + ## Returns ``true`` if peer passing score check. + if not(isNil(pool.scoreCheck)): + if pool.scoreCheck(peer): + result = true + else: + result = false + else: + result = true + proc deletePeer*[A, B](pool: PeerPool[A, B], peer: A, force = false): bool = ## Remove ``peer`` from PeerPool ``pool``. ## @@ -274,6 +292,9 @@ proc addPeerNoWait*[A, B](pool: PeerPool[A, B], ## Procedure returns ``true`` on success. mixin getKey, getFuture + if not(pool.checkPeerScore(peer)): + return false + result = false let peerKey = getKey(peer) @@ -308,6 +329,9 @@ proc addPeer*[A, B](pool: PeerPool[A, B], ## Procedure returns ``true`` on success. mixin getKey, getFuture + if not(pool.checkPeerScore(peer)): + return false + var res = false let peerKey = getKey(peer) @@ -317,7 +341,6 @@ proc addPeer*[A, B](pool: PeerPool[A, B], if peerType == PeerType.Incoming: if pool.curIncPeersCount >= pool.maxIncPeersCount: await pool.waitNotFullEvent(peerType) - let pindex = pool.addPeerImpl(peer, peerKey, peerType) inc(pool.curIncPeersCount) pool.incQueue.push(pindex) @@ -408,6 +431,8 @@ proc release*[A, B](pool: PeerPool[A, B], peer: A) = let pindex = titem.data var item = addr(pool.storage[pindex]) if PeerFlags.Acquired in item[].flags: + if not(pool.checkPeerScore(peer)): + item[].flags.incl(DeleteOnRelease) item[].flags.excl(PeerFlags.Acquired) if PeerFlags.DeleteOnRelease in item[].flags: if item[].peerType == PeerType.Incoming: @@ -607,3 +632,8 @@ proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async.} = for item in peers: acquired.add(item) pool.clear() + +proc setScoreCheck*[A, B](pool: PeerPool[A, B], + scoreCheckCb: PeerScoreCheckCallback[A]) = + ## Add ScoreCheck callback. + pool.scoreCheck = scoreCheckCb diff --git a/beacon_chain/sync_manager.nim b/beacon_chain/sync_manager.nim index fb9efd226..6079e56c0 100644 --- a/beacon_chain/sync_manager.nim +++ b/beacon_chain/sync_manager.nim @@ -7,6 +7,18 @@ export datatypes, digest, chronos, chronicles logScope: topics = "syncman" +const + PeerScoreNoStatus* = -100 + ## Peer did not answer `status` request. + PeerScoreStaleStatus* = -50 + ## Peer's `status` answer do not progress in time. + PeerScoreGoodStatus* = 50 + ## Peer's `status` answer is fine. + PeerScoreNoBlocks* = -100 + ## Peer did not respond in time on `blocksByRange` request. + PeerScoreGoodBlocks* = 100 + ## Peer' `blocksByRange` answer is fine. + type GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].} @@ -48,6 +60,7 @@ type getLocalHeadSlot: GetSlotCallback getLocalWallSlot: GetSlotCallback updateLocalBlocks: UpdateLocalBlocksCallback + chunkSize: uint64 queue: SyncQueue SyncManagerError* = object of CatchableError @@ -187,10 +200,10 @@ proc total*(sq: SyncQueue): uint64 {.inline.} = ## Returns total number of slots in queue ``sq``. result = sq.lastSlot - sq.startSlot + 1'u64 -proc progress*(sq: SyncQueue): string = +proc progress*(sq: SyncQueue): uint64 = ## Returns queue's ``sq`` progress string. let curSlot = sq.outSlot - sq.startSlot - result = $curSlot & "/" & $sq.total() + result = (curSlot * 100'u64) div sq.total() proc newSyncManager*[A, B](pool: PeerPool[A, B], getLocalHeadSlotCb: GetSlotCallback, @@ -213,6 +226,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], getLocalWallSlot: getLocalWallSlotCb, maxHeadAge: maxHeadAge, sleepTime: sleepTime, + chunkSize: chunkSize, queue: queue ) @@ -255,6 +269,8 @@ proc syncWorker*[A, B](man: SyncManager[A, B], var headSlot = man.getLocalHeadSlot() var peerSlot = peer.getHeadSlot() + man.queue.updateLastSlot(wallSlot) + debug "Peer's syncing status", wall_clock_slot = wallSlot, remote_head_slot = peerSlot, local_head_slot = headSlot, peer_score = peer.getScore(), peer = peer, topics = "syncman" @@ -275,7 +291,7 @@ proc syncWorker*[A, B](man: SyncManager[A, B], peer = peer, peer_score = peer.getScore(), topics = "syncman" let res = await peer.updateStatus() if not(res): - peer.updateScore(-100) + peer.updateScore(PeerScoreNoStatus) debug "Failed to get remote peer's status, exiting", peer = peer, peer_score = peer.getScore(), peer_head_slot = peerSlot, topics = "syncman" @@ -288,18 +304,16 @@ proc syncWorker*[A, B](man: SyncManager[A, B], local_head_slot = headSlot, remote_new_head_slot = newPeerSlot, peer = peer, peer_score = peer.getScore(), topics = "syncman" - peer.updateScore(-50) + peer.updateScore(PeerScoreStaleStatus) break debug "Peer's status information updated", wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot, local_head_slot = headSlot, remote_new_head_slot = newPeerSlot, peer = peer, peer_score = peer.getScore(), topics = "syncman" - peer.updateScore(50) + peer.updateScore(PeerScoreGoodStatus) peerSlot = newPeerSlot - man.queue.updateLastSlot(wallSlot) - if (peerAge <= man.maxHeadAge) and (headAge <= man.maxHeadAge): debug "We are in sync with peer, exiting", wall_clock_slot = wallSlot, remote_head_slot = peerSlot, local_head_slot = headSlot, @@ -314,6 +328,11 @@ proc syncWorker*[A, B](man: SyncManager[A, B], queue_output_slot = man.queue.outSlot, queue_last_slot = man.queue.lastSlot, peer_score = peer.getScore(), topics = "syncman" + # Sometimes when syncing is almost done but last requests are still + # pending, this can fall into endless cycle, when low number of peers + # are available in PeerPool. We going to wait for RESP_TIMEOUT time, + # so all pending requests should be finished at this moment. + await sleepAsync(RESP_TIMEOUT) break debug "Creating new request for peer", wall_clock_slot = wallSlot, @@ -326,18 +345,19 @@ proc syncWorker*[A, B](man: SyncManager[A, B], if blocks.isSome(): let data = blocks.get() await man.queue.push(req, data) - peer.updateScore(100) + peer.updateScore(PeerScoreGoodBlocks) debug "Received blocks on request", blocks_count = len(data), request_slot = req.slot, request_count = req.count, request_step = req.step, peer = peer, peer_score = peer.getScore(), topics = "syncman" else: - peer.updateScore(-100) + peer.updateScore(PeerScoreNoBlocks) man.queue.push(req) debug "Failed to receive blocks on request", request_slot = req.slot, request_count = req.count, request_step = req.step, peer = peer, peer_score = peer.getScore(), topics = "syncman" + break result = peer finally: @@ -349,8 +369,8 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} = var acquireFut: Future[A] var wallSlot, headSlot: Slot - template workersCount(): string = - if isNil(acquireFut): $len(pending) else: $(len(pending) - 1) + template workersCount(): int = + if isNil(acquireFut): len(pending) else: (len(pending) - 1) debug "Synchronization loop started", topics = "syncman" @@ -358,8 +378,16 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} = wallSlot = man.getLocalWallSlot() headSlot = man.getLocalHeadSlot() + var progress: uint64 + if headSlot <= man.queue.lastSlot: + progress = man.queue.progress() + else: + progress = 100'u64 + debug "Synchronization loop start tick", wall_head_slot = wallSlot, - local_head_slot = headSlot, queue_status = man.queue.progress(), + local_head_slot = headSlot, queue_status = progress, + queue_start_slot = man.queue.startSlot, + queue_last_slot = man.queue.lastSlot, workers_count = workersCount(), topics = "syncman" if headAge <= man.maxHeadAge: @@ -410,6 +438,9 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} = peer_score = peer.getScore(), topics = "syncman" man.pool.release(peer) else: + if headSlot > man.queue.lastSlot: + man.queue = SyncQueue.init(headSlot, wallSlot, man.chunkSize, + man.updateLocalBlocks, 2) debug "Synchronization loop starting new worker", peer = peer, wall_head_slot = wallSlot, local_head_slot = headSlot, peer_score = peer.getScore(), topics = "syncman" diff --git a/tests/test_peer_pool.nim b/tests/test_peer_pool.nim index a970e43cf..76e49c862 100644 --- a/tests/test_peer_pool.nim +++ b/tests/test_peer_pool.nim @@ -537,3 +537,63 @@ suiteReport "PeerPool testing suite": len(acqui1) == 3 len(acqui2) == 2 len(acqui3) == 1 + + timedTest "Score check test": + var pool = newPeerPool[PeerTest, PeerTestID]() + proc scoreCheck(peer: PeerTest): bool = + if peer.weight >= 0: + result = true + else: + result = false + var peer1 = PeerTest.init("peer1", 100) + var peer2 = PeerTest.init("peer2", 50) + var peer3 = PeerTest.init("peer3", 1) + var peer4 = PeerTest.init("peer4", -50) + var peer5 = PeerTest.init("peer5", -100) + + pool.setScoreCheck(scoreCheck) + + check: + pool.addPeerNoWait(peer1, PeerType.Incoming) == true + pool.addPeerNoWait(peer2, PeerType.Incoming) == true + pool.addPeerNoWait(peer3, PeerType.Outgoing) == true + pool.addPeerNoWait(peer4, PeerType.Incoming) == false + pool.addPeerNoWait(peer5, PeerType.Outgoing) == false + len(pool) == 3 + lenAvailable(pool) == 3 + + check: + waitFor(pool.addPeer(peer4, PeerType.Incoming)) == false + waitFor(pool.addPeer(peer5, PeerType.Outgoing)) == false + len(pool) == 3 + lenAvailable(pool) == 3 + + discard waitFor(pool.acquire({PeerType.Incoming})) + discard waitFor(pool.acquire({PeerType.Incoming})) + discard waitFor(pool.acquire({PeerType.Outgoing})) + + check: + lenAvailable(pool) == 0 + lenAcquired(pool) == 3 + len(pool) == 3 + + peer3.weight -= 2 + pool.release(peer3) + check: + lenAvailable(pool) == 0 + lenAcquired(pool) == 2 + len(pool) == 2 + + peer2.weight -= 100 + pool.release(peer2) + check: + lenAvailable(pool) == 0 + lenAcquired(pool) == 1 + len(pool) == 1 + + peer1.weight -= 200 + pool.release(peer1) + check: + lenAvailable(pool) == 0 + lenAcquired(pool) == 0 + len(pool) == 0