From 08795b3f5d5f03f8a966f201db1de682f96ab48e Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Wed, 23 Sep 2020 18:58:02 +0300 Subject: [PATCH] Fix tight loop at the end of sync process. (#1731) --- beacon_chain/sync_manager.nim | 61 +++++++++++++++++++++++++++++++--- beacon_chain/sync_protocol.nim | 8 +++-- 2 files changed, 63 insertions(+), 6 deletions(-) diff --git a/beacon_chain/sync_manager.nim b/beacon_chain/sync_manager.nim index bdf3e3032..8ac3612d9 100644 --- a/beacon_chain/sync_manager.nim +++ b/beacon_chain/sync_manager.nim @@ -32,6 +32,9 @@ const SyncWorkersCount* = 20 ## Number of sync workers to spawn + StatusUpdateInterval* = chronos.minutes(1) + ## Minimum time between two subsequent calls to update peer's status + type SyncFailureKind* = enum StatusInvalid, @@ -665,6 +668,9 @@ template peerAge(): uint64 = template queueAge(): uint64 = wallSlot - man.queue.outSlot +template peerStatusAge(): Duration = + Moment.now() - peer.state(BeaconSync).statusLastTime + func syncQueueLen*[A, B](man: SyncManager[A, B]): uint64 = man.queue.len @@ -735,11 +741,60 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = peer.updateScore(PeerScoreGoodStatus) peerSlot = newPeerSlot - if (peerAge <= man.maxHeadAge) and (headAge <= man.maxHeadAge): - debug "We are in sync with peer, exiting", wall_clock_slot = wallSlot, + if headAge <= man.maxHeadAge: + debug "We are in sync with network, exiting", wall_clock_slot = wallSlot, remote_head_slot = peerSlot, local_head_slot = headSlot, peer = peer, peer_score = peer.getScore(), index = index, peer_speed = peer.netKbps(), topics = "syncman" + # We clear SyncManager's `notInSyncEvent` so all the workers will become + # sleeping soon. + man.notInSyncEvent.clear() + return + + if peerAge <= man.maxHeadAge: + debug "We are in sync with peer, refreshing peer's status information", + wall_clock_slot = wallSlot, remote_head_slot = peerSlot, + local_head_slot = headSlot, peer = peer, peer_score = peer.getScore(), + index = index, peer_speed = peer.netKbps(), topics = "syncman" + + if peerStatusAge <= StatusUpdateInterval: + await sleepAsync(StatusUpdateInterval - peerStatusAge) + + man.workers[index].status = SyncWorkerStatus.UpdatingStatus + try: + let res = await peer.updateStatus() + if not(res): + peer.updateScore(PeerScoreNoStatus) + debug "Failed to get remote peer's status, exiting", peer = peer, + peer_score = peer.getScore(), peer_head_slot = peerSlot, + peer_speed = peer.netKbps(), index = index, topics = "syncman" + let failure = SyncFailure.init(SyncFailureKind.StatusDownload, peer) + man.failures.add(failure) + return + except CatchableError as exc: + debug "Unexpected exception while updating peer's status", + peer = peer, peer_score = peer.getScore(), + peer_head_slot = peerSlot, peer_speed = peer.netKbps(), + index = index, errMsg = exc.msg, topics = "syncman" + return + + let newPeerSlot = peer.getHeadSlot() + if peerSlot >= newPeerSlot: + peer.updateScore(PeerScoreStaleStatus) + debug "Peer's status information is stale", + wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot, + local_head_slot = headSlot, remote_new_head_slot = newPeerSlot, + peer = peer, peer_score = peer.getScore(), index = index, + peer_speed = peer.netKbps(), topics = "syncman" + else: + debug "Peer's status information updated", wall_clock_slot = wallSlot, + remote_old_head_slot = peerSlot, local_head_slot = headSlot, + remote_new_head_slot = newPeerSlot, peer = peer, + peer_score = peer.getScore(), peer_speed = peer.netKbps(), + index = index, topics = "syncman" + peer.updateScore(PeerScoreGoodStatus) + peerSlot = newPeerSlot + return man.workers[index].status = SyncWorkerStatus.Requesting @@ -829,9 +884,7 @@ proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} = man.workers[index].status = SyncWorkerStatus.Sleeping # This event is going to be set until we are not in sync with network await man.notInSyncEvent.wait() - man.workers[index].status = SyncWorkerStatus.WaitingPeer - let peer = await man.pool.acquire() await man.syncStep(index, peer) man.pool.release(peer) diff --git a/beacon_chain/sync_protocol.nim b/beacon_chain/sync_protocol.nim index d16ed6340..289bfeaef 100644 --- a/beacon_chain/sync_protocol.nim +++ b/beacon_chain/sync_protocol.nim @@ -37,6 +37,7 @@ type forkDigest*: ForkDigest BeaconSyncPeerState* = ref object + statusLastTime*: chronos.Moment statusMsg*: StatusMsg BlockRootSlot* = object @@ -190,6 +191,7 @@ p2pProtocol BeaconSync(version = 1, proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) = debug "Peer status", peer, statusMsg peer.state(BeaconSync).statusMsg = statusMsg + peer.state(BeaconSync).statusLastTime = Moment.now() proc updateStatus*(peer: Peer): Future[bool] {.async.} = ## Request `status` of remote peer ``peer``. @@ -200,12 +202,14 @@ proc updateStatus*(peer: Peer): Future[bool] {.async.} = let theirFut = awaitne peer.status(ourStatus, timeout = chronos.seconds(60)) if theirFut.failed(): - result = false + return false else: let theirStatus = theirFut.read() if theirStatus.isOk: peer.setStatusMsg(theirStatus.get) - result = true + return true + else: + return false proc getHeadSlot*(peer: Peer): Slot {.inline.} = ## Returns head slot for specific peer ``peer``.