Add ability for SyncQueue to recover from unexpected MissingParent.
This commit is contained in:
parent
1c238a609d
commit
1fc9413c48
|
@ -542,6 +542,10 @@ proc runForwardSyncLoop(node: BeaconNode) {.async.} =
|
|||
1'u64
|
||||
result = epoch.compute_start_slot_at_epoch()
|
||||
|
||||
proc getFirstSlotAtFinalizedEpoch(): Slot {.gcsafe.} =
|
||||
let fepoch = node.blockPool.headState.data.data.finalized_checkpoint.epoch
|
||||
compute_start_slot_at_epoch(fepoch)
|
||||
|
||||
proc updateLocalBlocks(list: openarray[SignedBeaconBlock]): Result[void, BlockError] =
|
||||
debug "Forward sync imported blocks", count = len(list),
|
||||
local_head_slot = getLocalHeadSlot()
|
||||
|
@ -583,7 +587,7 @@ proc runForwardSyncLoop(node: BeaconNode) {.async.} =
|
|||
|
||||
node.syncManager = newSyncManager[Peer, PeerID](
|
||||
node.network.peerPool, getLocalHeadSlot, getLocalWallSlot,
|
||||
updateLocalBlocks,
|
||||
getFirstSlotAtFinalizedEpoch, updateLocalBlocks,
|
||||
# 4 blocks per chunk is the optimal value right now, because our current
|
||||
# syncing speed is around 4 blocks per second. So there no need to request
|
||||
# more then 4 blocks right now. As soon as `store_speed` value become
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import chronicles
|
||||
import options, deques, heapqueue, tables, strutils, sequtils, math, algorithm
|
||||
import stew/results, chronos, chronicles
|
||||
import spec/datatypes, spec/digest, peer_pool, eth2_network
|
||||
import spec/[datatypes, digest], peer_pool, eth2_network
|
||||
import eth/async_utils
|
||||
|
||||
import block_pools/block_pools_types
|
||||
|
@ -66,22 +66,18 @@ type
|
|||
SyncQueue*[T] = ref object
|
||||
inpSlot*: Slot
|
||||
outSlot*: Slot
|
||||
|
||||
startSlot*: Slot
|
||||
lastSlot: Slot
|
||||
chunkSize*: uint64
|
||||
queueSize*: int
|
||||
|
||||
counter*: uint64
|
||||
pending*: Table[uint64, SyncRequest[T]]
|
||||
|
||||
waiters: seq[SyncWaiter[T]]
|
||||
syncUpdate*: SyncUpdateCallback[T]
|
||||
|
||||
getFirstSlotAFE*: GetSlotCallback
|
||||
debtsQueue: HeapQueue[SyncRequest[T]]
|
||||
debtsCount: uint64
|
||||
readyQueue: HeapQueue[SyncResult[T]]
|
||||
|
||||
zeroPoint: Option[Slot]
|
||||
suspects: seq[SyncResult[T]]
|
||||
|
||||
|
@ -95,6 +91,7 @@ type
|
|||
toleranceValue: uint64
|
||||
getLocalHeadSlot: GetSlotCallback
|
||||
getLocalWallSlot: GetSlotCallback
|
||||
getFirstSlotAFE: GetSlotCallback
|
||||
syncUpdate: SyncUpdateCallback[A]
|
||||
chunkSize: uint64
|
||||
queue: SyncQueue[A]
|
||||
|
@ -211,6 +208,7 @@ proc isEmpty*[T](sr: SyncRequest[T]): bool {.inline.} =
|
|||
proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
|
||||
start, last: Slot, chunkSize: uint64,
|
||||
updateCb: SyncUpdateCallback[T],
|
||||
fsafeCb: GetSlotCallback,
|
||||
queueSize: int = -1): SyncQueue[T] =
|
||||
## Create new synchronization queue with parameters
|
||||
##
|
||||
|
@ -296,11 +294,11 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
|
|||
chunkSize: chunkSize,
|
||||
queueSize: queueSize,
|
||||
syncUpdate: updateCb,
|
||||
getFirstSlotAFE: fsafeCb,
|
||||
waiters: newSeq[SyncWaiter[T]](),
|
||||
counter: 1'u64,
|
||||
pending: initTable[uint64, SyncRequest[T]](),
|
||||
debtsQueue: initHeapQueue[SyncRequest[T]](),
|
||||
zeroPoint: some[Slot](start),
|
||||
inpSlot: start,
|
||||
outSlot: start
|
||||
)
|
||||
|
@ -561,13 +559,25 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
|||
sq.zeroPoint = none[Slot]()
|
||||
else:
|
||||
# If we got `BlockError.MissingParent` and `zero-point` is not set
|
||||
# it means that peer returns chain of blocks with holes.
|
||||
# it means that peer returns chain of blocks with holes or block_pool
|
||||
# in incorrect state. We going to rewind to the latest finalized
|
||||
# epoch.
|
||||
let req = item.request
|
||||
warn "Received sequence of blocks with holes", peer = req.item,
|
||||
request_slot = req.slot, request_count = req.count,
|
||||
request_step = req.step, blocks_count = len(item.data),
|
||||
blocks_map = getShortMap(req, item.data)
|
||||
req.item.updateScore(PeerScoreBadBlocks)
|
||||
let finalizedSlot = sq.getFirstSlotAFE()
|
||||
if finalizedSlot < req.slot:
|
||||
warn "Unexpected missing parent, rewind to latest finalized epoch slot",
|
||||
peer = req.item, to_slot = finalizedSlot,
|
||||
request_slot = req.slot, request_count = req.count,
|
||||
request_step = req.step, blocks_count = len(item.data),
|
||||
blocks_map = getShortMap(req, item.data)
|
||||
await sq.resetWait(some(finalizedSlot))
|
||||
else:
|
||||
error "Unexpected missing parent at finalized epoch slot",
|
||||
peer = req.item, to_slot = finalizedSlot,
|
||||
request_slot = req.slot, request_count = req.count,
|
||||
request_step = req.step, blocks_count = len(item.data),
|
||||
blocks_map = getShortMap(req, item.data)
|
||||
req.item.updateScore(PeerScoreBadBlocks)
|
||||
elif res.error == BlockError.Invalid:
|
||||
let req = item.request
|
||||
warn "Received invalid sequence of blocks", peer = req.item,
|
||||
|
@ -668,6 +678,7 @@ proc speed*(start, finish: SyncMoment): float {.inline.} =
|
|||
proc newSyncManager*[A, B](pool: PeerPool[A, B],
|
||||
getLocalHeadSlotCb: GetSlotCallback,
|
||||
getLocalWallSlotCb: GetSlotCallback,
|
||||
getFSAFECb: GetSlotCallback,
|
||||
updateLocalBlocksCb: UpdateLocalBlocksCallback,
|
||||
maxStatusAge = uint64(SLOTS_PER_EPOCH * 4),
|
||||
maxHeadAge = uint64(SLOTS_PER_EPOCH * 1),
|
||||
|
@ -687,7 +698,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
|
|||
return res
|
||||
|
||||
let queue = SyncQueue.init(A, getLocalHeadSlotCb(), getLocalWallSlotCb(),
|
||||
chunkSize, syncUpdate, 2)
|
||||
chunkSize, syncUpdate, getFSAFECb, 2)
|
||||
|
||||
result = SyncManager[A, B](
|
||||
pool: pool,
|
||||
|
@ -695,6 +706,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
|
|||
getLocalHeadSlot: getLocalHeadSlotCb,
|
||||
syncUpdate: syncUpdate,
|
||||
getLocalWallSlot: getLocalWallSlotCb,
|
||||
getFirstSlotAFE: getFSAFECb,
|
||||
maxHeadAge: maxHeadAge,
|
||||
maxRecurringFailures: maxRecurringFailures,
|
||||
sleepTime: sleepTime,
|
||||
|
@ -1000,7 +1012,8 @@ proc sync*[A, B](man: SyncManager[A, B]) {.async.} =
|
|||
else:
|
||||
if headSlot > man.queue.lastSlot:
|
||||
man.queue = SyncQueue.init(A, headSlot, wallSlot,
|
||||
man.chunkSize, man.syncUpdate, 2)
|
||||
man.chunkSize, man.syncUpdate,
|
||||
man.getFirstSlotAFE, 2)
|
||||
debug "Synchronization loop starting new worker", peer = peer,
|
||||
wall_head_slot = wallSlot, local_head_slot = headSlot,
|
||||
peer_score = peer.getScore(), topics = "syncman"
|
||||
|
|
|
@ -13,6 +13,9 @@ proc `$`*(peer: SomeTPeer): string =
|
|||
proc updateScore(peer: SomeTPeer, score: int) =
|
||||
discard
|
||||
|
||||
proc getFirstSlotAtFinalizedEpoch(): Slot =
|
||||
Slot(0)
|
||||
|
||||
suite "SyncManager test suite":
|
||||
proc createChain(start, finish: Slot): seq[SignedBeaconBlock] =
|
||||
doAssert(start <= finish)
|
||||
|
@ -30,7 +33,8 @@ suite "SyncManager test suite":
|
|||
|
||||
test "[SyncQueue] Start and finish slots equal":
|
||||
let p1 = SomeTPeer()
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(0), 1'u64, syncUpdate)
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(0), 1'u64, syncUpdate,
|
||||
getFirstSlotAtFinalizedEpoch)
|
||||
check len(queue) == 1
|
||||
var r11 = queue.pop(Slot(0), p1)
|
||||
check len(queue) == 0
|
||||
|
@ -45,7 +49,8 @@ suite "SyncManager test suite":
|
|||
r11.slot == Slot(0) and r11.count == 1'u64 and r11.step == 1'u64
|
||||
|
||||
test "[SyncQueue] Two full requests success/fail":
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(1), 1'u64, syncUpdate)
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(1), 1'u64, syncUpdate,
|
||||
getFirstSlotAtFinalizedEpoch)
|
||||
let p1 = SomeTPeer()
|
||||
let p2 = SomeTPeer()
|
||||
check len(queue) == 2
|
||||
|
@ -72,7 +77,8 @@ suite "SyncManager test suite":
|
|||
r22.slot == Slot(1) and r22.count == 1'u64 and r22.step == 1'u64
|
||||
|
||||
test "[SyncQueue] Full and incomplete success/fail start from zero":
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 2'u64, syncUpdate)
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 2'u64, syncUpdate,
|
||||
getFirstSlotAtFinalizedEpoch)
|
||||
let p1 = SomeTPeer()
|
||||
let p2 = SomeTPeer()
|
||||
let p3 = SomeTPeer()
|
||||
|
@ -110,7 +116,8 @@ suite "SyncManager test suite":
|
|||
r33.slot == Slot(4) and r33.count == 1'u64 and r33.step == 1'u64
|
||||
|
||||
test "[SyncQueue] Full and incomplete success/fail start from non-zero":
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(1), Slot(5), 3'u64, syncUpdate)
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(1), Slot(5), 3'u64, syncUpdate,
|
||||
getFirstSlotAtFinalizedEpoch)
|
||||
let p1 = SomeTPeer()
|
||||
let p2 = SomeTPeer()
|
||||
check len(queue) == 5
|
||||
|
@ -137,7 +144,8 @@ suite "SyncManager test suite":
|
|||
r42.slot == Slot(4) and r42.count == 2'u64 and r42.step == 1'u64
|
||||
|
||||
test "[SyncQueue] Smart and stupid success/fail":
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64, syncUpdate)
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64, syncUpdate,
|
||||
getFirstSlotAtFinalizedEpoch)
|
||||
let p1 = SomeTPeer()
|
||||
let p2 = SomeTPeer()
|
||||
check len(queue) == 5
|
||||
|
@ -164,7 +172,8 @@ suite "SyncManager test suite":
|
|||
r52.slot == Slot(4) and r52.count == 1'u64 and r52.step == 1'u64
|
||||
|
||||
test "[SyncQueue] One smart and one stupid + debt split + empty":
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64, syncUpdate)
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64, syncUpdate,
|
||||
getFirstSlotAtFinalizedEpoch)
|
||||
let p1 = SomeTPeer()
|
||||
let p2 = SomeTPeer()
|
||||
let p3 = SomeTPeer()
|
||||
|
@ -210,7 +219,7 @@ suite "SyncManager test suite":
|
|||
|
||||
var chain = createChain(Slot(0), Slot(2))
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(2), 1'u64,
|
||||
syncReceiver, 1)
|
||||
syncReceiver, getFirstSlotAtFinalizedEpoch, 1)
|
||||
let p1 = SomeTPeer()
|
||||
let p2 = SomeTPeer()
|
||||
let p3 = SomeTPeer()
|
||||
|
@ -253,7 +262,7 @@ suite "SyncManager test suite":
|
|||
|
||||
var chain = createChain(Slot(5), Slot(11))
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(5), Slot(11), 2'u64,
|
||||
syncReceiver, 2)
|
||||
syncReceiver, getFirstSlotAtFinalizedEpoch, 2)
|
||||
let p1 = SomeTPeer()
|
||||
let p2 = SomeTPeer()
|
||||
let p3 = SomeTPeer()
|
||||
|
@ -303,7 +312,7 @@ suite "SyncManager test suite":
|
|||
|
||||
var chain = createChain(Slot(5), Slot(18))
|
||||
var queue = SyncQueue.init(SomeTPeer, Slot(5), Slot(18), 2'u64,
|
||||
syncReceiver, 2)
|
||||
syncReceiver, getFirstSlotAtFinalizedEpoch, 2)
|
||||
let p1 = SomeTPeer()
|
||||
let p2 = SomeTPeer()
|
||||
let p3 = SomeTPeer()
|
||||
|
|
Loading…
Reference in New Issue