From 96f26c447cc67d2797792197ec20252f9a6c1f75 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Mon, 15 Jun 2020 22:41:26 +0300 Subject: [PATCH] Replace zero-point rewind with rewind to latest finalized epoch's first slot. (#1176) * Replace zero-point rewind with rewind to latest finalized epoch's first slot. * Fix tests. * Add missing penalty for MissingParent Fix comments. --- beacon_chain/sync_manager.nim | 169 ++++++---------------------------- 1 file changed, 28 insertions(+), 141 deletions(-) diff --git a/beacon_chain/sync_manager.nim b/beacon_chain/sync_manager.nim index 88750e914..ed5535d1f 100644 --- a/beacon_chain/sync_manager.nim +++ b/beacon_chain/sync_manager.nim @@ -25,7 +25,7 @@ const ## Peer's response contains incorrect blocks. PeerScoreBadResponse* = -1000 ## Peer's response is not in requested range. - PeerScoreJokeBlocks* = -200 + PeerScoreMissingBlocks* = -200 ## Peer response contains too many empty blocks. type @@ -78,7 +78,6 @@ type debtsQueue: HeapQueue[SyncRequest[T]] debtsCount: uint64 readyQueue: HeapQueue[SyncResult[T]] - zeroPoint: Option[Slot] suspects: seq[SyncResult[T]] SyncManager*[A, B] = ref object @@ -260,34 +259,8 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T], # missing" error. Lets call such peers "jokers", because they are joking # with responses. # - # To fix "joker" problem i'm going to introduce "zero-point" which will - # represent first non-empty slot in gap at the end of requested chunk. - # If SyncQueue receives chunk of blocks with gap at the end and this chunk - # will be successfully processed by `block_pool` it will set `zero_point` to - # the first uncertain (empty) slot. For example: - # - # Case 1 - # X X X X X - - # 3 4 5 6 7 8 - # - # Case2 - # X X - - - - - # 3 4 5 6 7 8 - # - # In Case 1 `zero-point` will be equal to 8, in Case 2 `zero-point` will be - # set to 5. - # - # When `zero-point` is set and the next received chunk of blocks will be - # empty, then peer produced this chunk of blocks will be added to suspect - # list. - # - # If the next chunk of blocks has at least one non-empty block and this chunk - # will be successfully processed by `block_pool`, then `zero-point` will be - # reset and suspect list will be cleared. - # - # If the `block_pool` failed to process next chunk of blocks, SyncQueue will - # perform rollback to `zero-point` and penalize all the peers in suspect list. - + # To fix "joker" problem we going to perform rollback to the latest finalized + # epoch's first slot. doAssert(chunkSize > 0'u64, "Chunk size should not be zero") result = SyncQueue[T]( startSlot: start, @@ -372,10 +345,10 @@ proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async.} = # without missing any blocks. There 3 sources: # 1. Debts queue. # 2. Processing queue (`inpSlot`, `outSlot`). - # 3. Requested slot `toSlot` (which can be `zero-point` slot). + # 3. Requested slot `toSlot`. # # Queue's `outSlot` is the lowest slot we added to `block_pool`, but - # `zero-point` slot can be less then `outSlot`. `debtsQueue` holds only not + # `toSlot` slot can be less then `outSlot`. `debtsQueue` holds only not # added slot requests, so it can't be bigger then `outSlot` value. var minSlot = sq.outSlot if toSlot.isSome(): @@ -441,8 +414,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], if res: continue else: - # SyncQueue reset happens (it can't be `zero-point` reset, or continous - # failure reset). We are exiting to wake up sync-worker. + # SyncQueue reset happens. We are exiting to wake up sync-worker. exitNow = true break let syncres = SyncResult[T](request: sr, data: data) @@ -460,55 +432,6 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], let item = sq.readyQueue.pop() let res = sq.syncUpdate(item.request, item.data) if res.isOk: - if sq.zeroPoint.isSome(): - if item.isEmpty(): - # If the `zeropoint` is set and response is empty, we will add this - # request to suspect list. - debug "Adding peer to suspect list", peer = item.request.item, - request_slot = item.request.slot, - request_count = item.request.count, - request_step = item.request.step, - response_count = len(item.data), topics = "syncman" - sq.suspects.add(item) - else: - # If the `zeropoint` is set and response is not empty, we will clean - # suspect list and reset `zeropoint`. - sq.suspects.setLen(0) - sq.zeroPoint = none[Slot]() - # At this point `zeropoint` is unset, but received response can have - # gap at the end. - if item.hasEndGap(): - debug "Zero-point reset and new zero-point found", - peer = item.request.item, request_slot = item.request.slot, - request_count = item.request.count, - request_step = item.request.step, - response_count = len(item.data), - blocks_map = getShortMap(item.request, item.data), - topics = "syncman" - sq.suspects.add(item) - sq.zeroPoint = some(item.getLastNonEmptySlot()) - else: - debug "Zero-point reset", peer = item.request.item, - request_slot = item.request.slot, - request_count = item.request.count, - request_step = item.request.step, - response_count = len(item.data), - blocks_map = getShortMap(item.request, item.data), - topics = "syncman" - else: - # If the `zeropoint` is not set and response has gap at the end, we - # will add first suspect to the suspect list and set `zeropoint`. - if item.hasEndGap(): - debug "New zero-point found", peer = item.request.item, - request_slot = item.request.slot, - request_count = item.request.count, - request_step = item.request.step, - response_count = len(item.data), - blocks_map = getShortMap(item.request, item.data), - topics = "syncman" - sq.suspects.add(item) - sq.zeroPoint = some(item.getLastNonEmptySlot()) - sq.outSlot = sq.outSlot + item.request.count sq.wakeupWaiters() else: @@ -522,63 +445,26 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], var resetSlot: Option[Slot] if res.error == BlockError.MissingParent: - if sq.zeroPoint.isSome(): - # If the `zeropoint` is set and we are unable to store response in - # `block_pool` we are going to revert suspicious responses list. - - # If `zeropoint` is set, suspicious list should not be empty. - var req: SyncRequest[T] - if isEmpty(sq.suspects[0]): - # If initial suspicious response is an empty list, then previous - # chunk of blocks did not have a gap at the end. So we are going to - # request suspicious response one more time without any changes. - req = sq.suspects[0].request - else: - # If initial suspicious response is not an empty list, we are going - # to request only gap at the end of the suspicious response. - let startSlot = sq.suspects[0].getLastNonEmptySlot() + 1'u64 - let lastSlot = sq.suspects[0].request.lastSlot() - req = SyncRequest.init(T, startSlot, lastSlot) - - debug "Resolve joker's problem", request_slot = req.slot, - request_count = req.count, - request_step = req.step, - suspects_count = (len(sq.suspects) - 1) - - sq.suspects[0].request.item.updateScore(PeerScoreJokeBlocks) - - sq.toDebtsQueue(req) - # We move all left suspicious responses to the debts queue. - if len(sq.suspects) > 1: - for i in 1 ..< len(sq.suspects): - sq.toDebtsQueue(sq.suspects[i].request) - sq.suspects[i].request.item.updateScore(PeerScoreJokeBlocks) - - # Reset state to the `zeropoint`. - sq.suspects.setLen(0) - resetSlot = sq.zeroPoint - sq.zeroPoint = none[Slot]() + # If we got `BlockError.MissingParent` it means that peer returns chain + # of blocks with holes or `block_pool` is in incomplete state. We going + # to rewind to the first slot at latest finalized epoch. + let req = item.request + let finalizedSlot = sq.getFirstSlotAFE() + if finalizedSlot < req.slot: + warn "Unexpected missing parent, rewind happens", + peer = req.item, rewind_to_slot = finalizedSlot, + request_slot = req.slot, request_count = req.count, + request_step = req.step, blocks_count = len(item.data), + blocks_map = getShortMap(req, item.data) + resetSlot = some(finalizedSlot) + req.item.updateScore(PeerScoreMissingBlocks) else: - # If we got `BlockError.MissingParent` and `zero-point` is not set - # it means that peer returns chain of blocks with holes or block_pool - # in incorrect state. We going to rewind to the latest finalized - # epoch. - let req = item.request - let finalizedSlot = sq.getFirstSlotAFE() - if finalizedSlot < req.slot: - warn "Unexpected missing parent, rewind to latest finalized epoch slot", - peer = req.item, to_slot = finalizedSlot, - request_slot = req.slot, request_count = req.count, - request_step = req.step, blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data) - await sq.resetWait(some(finalizedSlot)) - else: - error "Unexpected missing parent at finalized epoch slot", - peer = req.item, to_slot = finalizedSlot, - request_slot = req.slot, request_count = req.count, - request_step = req.step, blocks_count = len(item.data), - blocks_map = getShortMap(req, item.data) - req.item.updateScore(PeerScoreBadBlocks) + error "Unexpected missing parent at finalized epoch slot", + peer = req.item, to_slot = finalizedSlot, + request_slot = req.slot, request_count = req.count, + request_step = req.step, blocks_count = len(item.data), + blocks_map = getShortMap(req, item.data) + req.item.updateScore(PeerScoreBadBlocks) elif res.error == BlockError.Invalid: let req = item.request warn "Received invalid sequence of blocks", peer = req.item, @@ -598,8 +484,9 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], sq.toDebtsQueue(item.request) if resetSlot.isSome(): await sq.resetWait(resetSlot) - debug "Zero-point reset happens", queue_input_slot = sq.inpSlot, - queue_output_slot = sq.outSlot + debug "Rewind to slot was happened", reset_slot = reset_slot.get(), + queue_input_slot = sq.inpSlot, + queue_output_slot = sq.outSlot break proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T]) =