diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index d7a448162..bd7222377 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -174,7 +174,243 @@ template headAge(): uint64 = template peerStatusAge(): Duration = Moment.now() - peer.state(BeaconSync).statusLastTime -proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = +proc syncBackwardStep[A, B](man: SyncManager[A, B], index: int, peer: A) {. + async.} = + let wallSlot = man.getLocalWallSlot() + let headSlot = man.getLocalHeadSlot() + var peerSlot = peer.getHeadSlot() + + debug "Peer's syncing status", wall_clock_slot = wallSlot, + remote_head_slot = peerSlot, local_head_slot = headSlot, + peer_score = peer.getScore(), peer = peer, index = index, + peer_speed = peer.netKbps(), direction = man.direction, + topics = "syncman" + + # Check if peer's head slot is bigger than our wall clock slot. + if peerSlot > wallSlot + man.toleranceValue: + peer.updateScore(PeerScoreHeadTooNew) + + warn "Peer reports a head newer than our wall clock - clock out of sync?", + wall_clock_slot = wallSlot, remote_head_slot = peerSlot, + local_head_slot = headSlot, peer = peer, index = index, + tolerance_value = man.toleranceValue, peer_speed = peer.netKbps(), + peer_score = peer.getScore(), direction = man.direction, + topics = "syncman" + return + + # Check if we need to update peer's status information + if peerStatusAge >= StatusExpirationTime: + # Peer's status information is very old, its time to update it + man.workers[index].status = SyncWorkerStatus.UpdatingStatus + trace "Updating peer's status information", wall_clock_slot = wallSlot, + remote_head_slot = peerSlot, local_head_slot = headSlot, + peer = peer, peer_score = peer.getScore(), index = index, + peer_speed = peer.netKbps(), direction = man.direction, + topics = "syncman" + + try: + let res = await peer.updateStatus() + if not(res): + peer.updateScore(PeerScoreNoStatus) + debug "Failed to get remote peer's status, exiting", peer = peer, + peer_score = peer.getScore(), peer_head_slot = peerSlot, + peer_speed = peer.netKbps(), index = index, + direction = man.direction, topics = "syncman" + return + except CatchableError as exc: + debug "Unexpected exception while updating peer's status", + peer = peer, peer_score = peer.getScore(), + peer_head_slot = peerSlot, peer_speed = peer.netKbps(), + index = index, errMsg = exc.msg, direction = man.direction, + topics = "syncman" + return + + let newPeerSlot = peer.getHeadSlot() + if peerSlot >= newPeerSlot: + peer.updateScore(PeerScoreStaleStatus) + debug "Peer's status information is stale", + wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot, + local_head_slot = headSlot, remote_new_head_slot = newPeerSlot, + peer = peer, peer_score = peer.getScore(), index = index, + peer_speed = peer.netKbps(), direction = man.direction, + topics = "syncman" + else: + debug "Peer's status information updated", wall_clock_slot = wallSlot, + remote_old_head_slot = peerSlot, local_head_slot = headSlot, + remote_new_head_slot = newPeerSlot, peer = peer, + peer_score = peer.getScore(), peer_speed = peer.netKbps(), + index = index, direction = man.direction, topics = "syncman" + peer.updateScore(PeerScoreGoodStatus) + peerSlot = newPeerSlot + + if headAge <= man.maxHeadAge: + info "We are in sync with network", wall_clock_slot = wallSlot, + remote_head_slot = peerSlot, local_head_slot = headSlot, + peer = peer, peer_score = peer.getScore(), index = index, + peer_speed = peer.netKbps(), topics = "syncman" + # We clear SyncManager's `notInSyncEvent` so all the workers will become + # sleeping soon. + man.notInSyncEvent.clear() + return + + let firstSlot = man.getFirstSlot() + + if firstSlot == man.getLastSlot(): + info "We are in sync with network", wall_clock_slot = wallSlot, + remote_head_slot = peerSlot, local_head_slot = headSlot, + peer = peer, peer_score = peer.getScore(), index = index, + first_slot = firstSlot, peer_speed = peer.netKbps(), + topics = "syncman" + # We clear SyncManager's `notInSyncEvent` so all the workers will become + # sleeping soon. + man.notInSyncEvent.clear() + return + + if firstSlot > peerSlot: + debug "Peer's head slot is low; refreshing peer's status information", + wall_clock_slot = wallSlot, remote_head_slot = peerSlot, + local_head_slot = headSlot, first_slot = firstSlot, + peer = peer, peer_score = peer.getScore(), index = index, + peer_speed = peer.netKbps(), direction = man.direction, + topics = "syncman" + + man.workers[index].status = SyncWorkerStatus.UpdatingStatus + + if peerStatusAge <= StatusUpdateInterval: + await sleepAsync(StatusUpdateInterval - peerStatusAge) + + try: + let res = await peer.updateStatus() + if not(res): + peer.updateScore(PeerScoreNoStatus) + debug "Failed to get remote peer's status, exiting", peer = peer, + peer_score = peer.getScore(), peer_head_slot = peerSlot, + peer_speed = peer.netKbps(), index = index, + direction = man.direction, topics = "syncman" + return + except CatchableError as exc: + debug "Unexpected exception while updating peer's status", + peer = peer, peer_score = peer.getScore(), + peer_head_slot = peerSlot, peer_speed = peer.netKbps(), + index = index, errMsg = exc.msg, direction = man.direction, + topics = "syncman" + return + + let newPeerSlot = peer.getHeadSlot() + if peerSlot >= newPeerSlot: + peer.updateScore(PeerScoreStaleStatus) + debug "Peer's status information is stale", + wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot, + local_head_slot = headSlot, remote_new_head_slot = newPeerSlot, + peer = peer, peer_score = peer.getScore(), index = index, + peer_speed = peer.netKbps(), direction = man.direction, + topics = "syncman" + else: + # This is not very good solution because we should not discriminate and/or + # penalize peers which are in sync process too, but their latest head is + # lower then our latest head. We should keep connections with such peers + # (so this peers are able to get in sync using our data), but we should + # not use this peers for syncing because this peers are useless for us. + # Right now we decreasing peer's score a bit, so it will not be + # disconnected due to low peer's score, but new fresh peers could replace + # peers with low latest head. + if firstSlot > newPeerSlot: + # Peer's head slot is still lower then needed. + debug "Peer's head slot is lower then local first slot", + wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot, + local_head_slot = headSlot, remote_new_head_slot = newPeerSlot, + first_slot = firstSlot, peer = peer, + peer_score = peer.getScore(), peer_speed = peer.netKbps(), + index = index, direction = man.direction, topics = "syncman" + peer.updateScore(PeerScoreUseless) + else: + debug "Peer's status information updated", wall_clock_slot = wallSlot, + remote_old_head_slot = peerSlot, local_head_slot = headSlot, + remote_new_head_slot = newPeerSlot, peer = peer, + peer_score = peer.getScore(), peer_speed = peer.netKbps(), + index = index, direction = man.direction, topics = "syncman" + peer.updateScore(PeerScoreGoodStatus) + peerSlot = newPeerSlot + + return + + man.workers[index].status = SyncWorkerStatus.Requesting + + let req = man.queue.pop(peerSlot, peer) + if req.isEmpty(): + # SyncQueue could return empty request in 2 cases: + # 1. There no more slots in SyncQueue to download (we are synced, but + # our ``notInSyncEvent`` is not yet cleared). + # 2. Current peer's known head slot is too low to satisfy request. + # + # To avoid endless loop we going to wait for RESP_TIMEOUT time here. + # This time is enough for all pending requests to finish and it is also + # enough for main sync loop to clear ``notInSyncEvent``. + debug "Empty request received from queue, exiting", peer = peer, + local_head_slot = headSlot, remote_head_slot = peerSlot, + queue_input_slot = man.queue.inpSlot, + queue_output_slot = man.queue.outSlot, + queue_last_slot = man.queue.finalSlot, + peer_speed = peer.netKbps(), peer_score = peer.getScore(), + index = index, direction = man.direction, topics = "syncman" + await sleepAsync(RESP_TIMEOUT) + return + + debug "Creating new request for peer", wall_clock_slot = wallSlot, + remote_head_slot = peerSlot, local_head_slot = headSlot, + request_slot = req.slot, request_count = req.count, + request_step = req.step, peer = peer, peer_speed = peer.netKbps(), + peer_score = peer.getScore(), index = index, + direction = man.direction, topics = "syncman" + + man.workers[index].status = SyncWorkerStatus.Downloading + + try: + let blocks = await man.getBlocks(peer, req) + if blocks.isOk: + let data = blocks.get() + let smap = getShortMap(req, data) + debug "Received blocks on request", blocks_count = len(data), + blocks_map = smap, request_slot = req.slot, + request_count = req.count, request_step = req.step, + peer = peer, peer_score = peer.getScore(), + peer_speed = peer.netKbps(), index = index, + direction = man.direction, topics = "syncman" + + if not(checkResponse(req, data)): + peer.updateScore(PeerScoreBadResponse) + warn "Received blocks sequence is not in requested range", + blocks_count = len(data), blocks_map = smap, + request_slot = req.slot, request_count = req.count, + request_step = req.step, peer = peer, + peer_score = peer.getScore(), peer_speed = peer.netKbps(), + index = index, direction = man.direction, topics = "syncman" + return + + # Scoring will happen in `syncUpdate`. + man.workers[index].status = SyncWorkerStatus.Queueing + await man.queue.push(req, data, proc() = + man.workers[index].status = SyncWorkerStatus.Processing) + else: + peer.updateScore(PeerScoreNoBlocks) + man.queue.push(req) + debug "Failed to receive blocks on request", + request_slot = req.slot, request_count = req.count, + request_step = req.step, peer = peer, index = index, + peer_score = peer.getScore(), peer_speed = peer.netKbps(), + direction = man.direction, topics = "syncman" + return + + except CatchableError as exc: + debug "Unexpected exception while receiving blocks", + request_slot = req.slot, request_count = req.count, + request_step = req.step, peer = peer, index = index, + peer_score = peer.getScore(), peer_speed = peer.netKbps(), + errMsg = exc.msg, direction = man.direction, topics = "syncman" + return + +proc syncForwardStep[A, B](man: SyncManager[A, B], index: int, peer: A) {. + async.} = let wallSlot = man.getLocalWallSlot() let headSlot = man.getLocalHeadSlot() var peerSlot = peer.getHeadSlot() @@ -408,7 +644,11 @@ proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} = await man.notInSyncEvent.wait() man.workers[index].status = SyncWorkerStatus.WaitingPeer let peer = await man.pool.acquire() - await man.syncStep(index, peer) + case man.direction + of SyncQueueKind.Forward: + await man.syncForwardStep(index, peer) + of SyncQueueKind.Backward: + await man.syncBackwardStep(index, peer) man.pool.release(peer) proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string,