sync queue
This commit is contained in:
parent
ad64b22485
commit
d292e94560
|
@ -558,7 +558,7 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
|
||||||
if shouldGetDataColumns:
|
if shouldGetDataColumns:
|
||||||
let data_columns = await man.getDataColumnSidecars(peer, req)
|
let data_columns = await man.getDataColumnSidecars(peer, req)
|
||||||
if data_columns.isErr():
|
if data_columns.isErr():
|
||||||
peer.updateScore(PeerScoreNoValues)
|
# peer.updateScore(PeerScoreNoValues)
|
||||||
man.queue.push(req)
|
man.queue.push(req)
|
||||||
debug "Failed to receive data columns on request",
|
debug "Failed to receive data columns on request",
|
||||||
request = req, err = data_columns.error
|
request = req, err = data_columns.error
|
||||||
|
@ -572,7 +572,7 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
|
||||||
let slots = mapIt(dataColumnData, it[].signed_block_header.message.slot)
|
let slots = mapIt(dataColumnData, it[].signed_block_header.message.slot)
|
||||||
let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]])
|
let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]])
|
||||||
if not(checkResponse(req, uniqueSlots)):
|
if not(checkResponse(req, uniqueSlots)):
|
||||||
peer.updateScore(PeerScoreBadResponse)
|
# peer.updateScore(PeerScoreBadResponse)
|
||||||
man.queue.push(req)
|
man.queue.push(req)
|
||||||
warn "Received data columns sequence is not in requested range",
|
warn "Received data columns sequence is not in requested range",
|
||||||
data_columns_count = len(dataColumnData), data_columns_map = getShortMap(req, dataColumnData),
|
data_columns_count = len(dataColumnData), data_columns_map = getShortMap(req, dataColumnData),
|
||||||
|
@ -580,13 +580,13 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
|
||||||
return
|
return
|
||||||
let groupedDataColumns = groupDataColumns(req, blockData, dataColumnData)
|
let groupedDataColumns = groupDataColumns(req, blockData, dataColumnData)
|
||||||
if groupedDataColumns.isErr():
|
if groupedDataColumns.isErr():
|
||||||
peer.updateScore(PeerScoreNoValues)
|
# peer.updateScore(PeerScoreNoValues)
|
||||||
man.queue.push(req)
|
man.queue.push(req)
|
||||||
# warn "Received data columns is inconsistent",
|
# warn "Received data columns is inconsistent",
|
||||||
# data_columns_map = getShortMap(req, dataColumnData), request = req, msg=groupedDataColumns.error()
|
# data_columns_map = getShortMap(req, dataColumnData), request = req, msg=groupedDataColumns.error()
|
||||||
return
|
return
|
||||||
if (let checkRes = groupedDataColumns.get.checkDataColumns(); checkRes.isErr):
|
if (let checkRes = groupedDataColumns.get.checkDataColumns(); checkRes.isErr):
|
||||||
peer.updateScore(PeerScoreBadResponse)
|
# peer.updateScore(PeerScoreBadResponse)
|
||||||
man.queue.push(req)
|
man.queue.push(req)
|
||||||
warn "Received data columns is invalid",
|
warn "Received data columns is invalid",
|
||||||
data_columns_count = len(dataColumnData),
|
data_columns_count = len(dataColumnData),
|
||||||
|
|
|
@ -727,42 +727,42 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
||||||
# Nim versions, remove workaround and move `res` into for loop
|
# Nim versions, remove workaround and move `res` into for loop
|
||||||
res: Result[void, VerifierError]
|
res: Result[void, VerifierError]
|
||||||
|
|
||||||
var i=0
|
# var i=0
|
||||||
for blk, blb in sq.blocks(item):
|
# for blk, blb in sq.blocks(item):
|
||||||
res = await sq.blockVerifier(blk[], blb, Opt.none(DataColumnSidecars), maybeFinalized)
|
# res = await sq.blockVerifier(blk[], blb, Opt.none(DataColumnSidecars), maybeFinalized)
|
||||||
inc(i)
|
# inc(i)
|
||||||
|
|
||||||
if res.isOk():
|
# if res.isOk():
|
||||||
goodBlock = some(blk[].slot)
|
# goodBlock = some(blk[].slot)
|
||||||
else:
|
# else:
|
||||||
case res.error()
|
# case res.error()
|
||||||
of VerifierError.MissingParent:
|
# of VerifierError.MissingParent:
|
||||||
missingParentSlot = some(blk[].slot)
|
# missingParentSlot = some(blk[].slot)
|
||||||
break
|
# break
|
||||||
of VerifierError.Duplicate:
|
# of VerifierError.Duplicate:
|
||||||
# Keep going, happens naturally
|
# # Keep going, happens naturally
|
||||||
discard
|
# discard
|
||||||
of VerifierError.UnviableFork:
|
# of VerifierError.UnviableFork:
|
||||||
# Keep going so as to register other unviable blocks with the
|
# # Keep going so as to register other unviable blocks with the
|
||||||
# quarantine
|
# # quarantine
|
||||||
if unviableBlock.isNone:
|
# if unviableBlock.isNone:
|
||||||
# Remember the first unviable block, so we can log it
|
# # Remember the first unviable block, so we can log it
|
||||||
unviableBlock = some((blk[].root, blk[].slot))
|
# unviableBlock = some((blk[].root, blk[].slot))
|
||||||
|
|
||||||
of VerifierError.Invalid:
|
# of VerifierError.Invalid:
|
||||||
hasInvalidBlock = true
|
# hasInvalidBlock = true
|
||||||
|
|
||||||
let req = item.request
|
# let req = item.request
|
||||||
notice "Received invalid sequence of blocks", request = req,
|
# notice "Received invalid sequence of blocks", request = req,
|
||||||
blocks_count = len(item.data),
|
# blocks_count = len(item.data),
|
||||||
blocks_map = getShortMap(req, item.data)
|
# blocks_map = getShortMap(req, item.data)
|
||||||
req.item.updateScore(PeerScoreBadValues)
|
# req.item.updateScore(PeerScoreBadValues)
|
||||||
break
|
# break
|
||||||
|
|
||||||
var counter = 0
|
var counter = 0
|
||||||
for blk, col in sq.das_blocks(item):
|
for blk, col in sq.das_blocks(item):
|
||||||
res = await sq.blockVerifier(blk[], Opt.none(BlobSidecars), col, maybeFinalized)
|
res = await sq.blockVerifier(blk[], Opt.none(BlobSidecars), col, maybeFinalized)
|
||||||
inc i
|
inc counter
|
||||||
|
|
||||||
if res.isOk:
|
if res.isOk:
|
||||||
goodBlock = some(blk[].slot)
|
goodBlock = some(blk[].slot)
|
||||||
|
@ -788,7 +788,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
||||||
notice "Received invalid sequence of blocks", request = req,
|
notice "Received invalid sequence of blocks", request = req,
|
||||||
blocks_count = len(item.data),
|
blocks_count = len(item.data),
|
||||||
blocks_map = getShortMap(req, item.data)
|
blocks_map = getShortMap(req, item.data)
|
||||||
req.item.updateScore(PeerScoreBadValues)
|
# req.item.updateScore(PeerScoreBadValues)
|
||||||
|
|
||||||
# When errors happen while processing blocks, we retry the same request
|
# When errors happen while processing blocks, we retry the same request
|
||||||
# with, hopefully, a different peer
|
# with, hopefully, a different peer
|
||||||
|
|
Loading…
Reference in New Issue