Fix syncing problem introduced by light forward syncing PR. (#6714)

* Fix blobs check response algorithm and add more logging information.

* Add MAX_BLOBS_PER_BLOCK check.
Add tests.

* Adopt AllTests.

* Address review comments.

* One more missed lenu64.
This commit is contained in:
Eugene Kabanov 2024-11-19 07:35:09 +02:00 committed by GitHub
parent 63fe01fb86
commit bd04dcc3dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 170 additions and 68 deletions

View File

@ -993,12 +993,13 @@ OK: 7/7 Fail: 0/7 Skip: 0/7
+ [SyncQueue#Forward] Start and finish slots equal OK + [SyncQueue#Forward] Start and finish slots equal OK
+ [SyncQueue#Forward] Two full requests success/fail OK + [SyncQueue#Forward] Two full requests success/fail OK
+ [SyncQueue#Forward] getRewindPoint() test OK + [SyncQueue#Forward] getRewindPoint() test OK
+ [SyncQueue] checkBlobsResponse() test OK
+ [SyncQueue] checkResponse() test OK + [SyncQueue] checkResponse() test OK
+ [SyncQueue] contains() test OK + [SyncQueue] contains() test OK
+ [SyncQueue] getLastNonEmptySlot() test OK + [SyncQueue] getLastNonEmptySlot() test OK
+ [SyncQueue] hasEndGap() test OK + [SyncQueue] hasEndGap() test OK
``` ```
OK: 24/24 Fail: 0/24 Skip: 0/24 OK: 25/25 Fail: 0/25 Skip: 0/25
## Type helpers ## Type helpers
```diff ```diff
+ BeaconBlock OK + BeaconBlock OK
@ -1142,4 +1143,4 @@ OK: 2/2 Fail: 0/2 Skip: 0/2
OK: 9/9 Fail: 0/9 Skip: 0/9 OK: 9/9 Fail: 0/9 Skip: 0/9
---TOTAL--- ---TOTAL---
OK: 775/780 Fail: 0/780 Skip: 5/780 OK: 776/781 Fail: 0/781 Skip: 5/781

View File

@ -510,12 +510,14 @@ proc syncStep[A, B](
blocks_map = getShortMap(req, blockData), request = req blocks_map = getShortMap(req, blockData), request = req
let slots = mapIt(blockData, it[].slot) let slots = mapIt(blockData, it[].slot)
if not(checkResponse(req, slots)): checkResponse(req, slots).isOkOr:
peer.updateScore(PeerScoreBadResponse) peer.updateScore(PeerScoreBadResponse)
man.queue.push(req) man.queue.push(req)
warn "Received blocks sequence is not in requested range", warn "Incorrect blocks sequence received",
blocks_count = len(blockData), blocks_count = len(blockData),
blocks_map = getShortMap(req, blockData), request = req blocks_map = getShortMap(req, blockData),
request = req,
reason = error
return return
let shouldGetBlobs = let shouldGetBlobs =
@ -547,13 +549,14 @@ proc syncStep[A, B](
if len(blobData) > 0: if len(blobData) > 0:
let slots = mapIt(blobData, it[].signed_block_header.message.slot) let slots = mapIt(blobData, it[].signed_block_header.message.slot)
if not(checkResponse(req, slots)): checkBlobsResponse(req, slots).isOkOr:
peer.updateScore(PeerScoreBadResponse) peer.updateScore(PeerScoreBadResponse)
man.queue.push(req) man.queue.push(req)
warn "Received blobs sequence is not in requested range", warn "Incorrect blobs sequence received",
blobs_count = len(blobData), blobs_count = len(blobData),
blobs_map = getShortMap(req, blobData), blobs_map = getShortMap(req, blobData),
request = req request = req,
reason = error
return return
let groupedBlobs = groupBlobs(blockData, blobData).valueOr: let groupedBlobs = groupBlobs(blockData, blobData).valueOr:
peer.updateScore(PeerScoreNoValues) peer.updateScore(PeerScoreNoValues)
@ -562,14 +565,14 @@ proc syncStep[A, B](
blobs_map = getShortMap(req, blobData), blobs_map = getShortMap(req, blobData),
request = req, msg = error request = req, msg = error
return return
if (let checkRes = groupedBlobs.checkBlobs(); checkRes.isErr): groupedBlobs.checkBlobs().isOkOr:
peer.updateScore(PeerScoreBadResponse) peer.updateScore(PeerScoreBadResponse)
man.queue.push(req) man.queue.push(req)
warn "Received blobs sequence is invalid", warn "Received blobs verification failed",
blobs_count = len(blobData), blobs_count = len(blobData),
blobs_map = getShortMap(req, blobData), blobs_map = getShortMap(req, blobData),
request = req, request = req,
msg = checkRes.error reason = error
return return
Opt.some(groupedBlobs) Opt.some(groupedBlobs)
else: else:

View File

@ -139,19 +139,20 @@ proc cmp*[T](a, b: SyncRequest[T]): int =
cmp(uint64(a.slot), uint64(b.slot)) cmp(uint64(a.slot), uint64(b.slot))
proc checkResponse*[T](req: SyncRequest[T], proc checkResponse*[T](req: SyncRequest[T],
data: openArray[Slot]): bool = data: openArray[Slot]): Result[void, cstring] =
if len(data) == 0: if len(data) == 0:
# Impossible to verify empty response. # Impossible to verify empty response.
return true return ok()
if uint64(len(data)) > req.count: if lenu64(data) > req.count:
# Number of blocks in response should be less or equal to number of # Number of blocks in response should be less or equal to number of
# requested blocks. # requested blocks.
return false return err("Too many blocks received")
var slot = req.slot var
var rindex = 0'u64 slot = req.slot
var dindex = 0 rindex = 0'u64
dindex = 0
while (rindex < req.count) and (dindex < len(data)): while (rindex < req.count) and (dindex < len(data)):
if slot < data[dindex]: if slot < data[dindex]:
@ -159,14 +160,43 @@ proc checkResponse*[T](req: SyncRequest[T],
elif slot == data[dindex]: elif slot == data[dindex]:
inc(dindex) inc(dindex)
else: else:
return false return err("Incorrect order or duplicate blocks found")
slot += 1'u64 slot += 1'u64
rindex += 1'u64 rindex += 1'u64
if dindex == len(data): if dindex != len(data):
return true return err("Some of the blocks are outside the requested range")
ok()
proc checkBlobsResponse*[T](req: SyncRequest[T],
data: openArray[Slot]): Result[void, cstring] =
if len(data) == 0:
# Impossible to verify empty response.
return ok()
if lenu64(data) > (req.count * MAX_BLOBS_PER_BLOCK):
# Number of blobs in response should be less or equal to number of
# requested (blocks * MAX_BLOBS_PER_BLOCK).
return err("Too many blobs received")
var
pslot = data[0]
counter = 0'u64
for slot in data:
if (slot < req.slot) or (slot >= req.slot + req.count):
return err("Some of the blobs are not in requested range")
if slot < pslot:
return err("Incorrect order")
if slot == pslot:
inc(counter)
if counter > MAX_BLOBS_PER_BLOCK:
return err("Number of blobs in the block exceeds the limit")
else: else:
return false counter = 1'u64
pslot = slot
ok()
proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, start: Slot, proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, start: Slot,
finish: Slot, t2: typedesc[T]): SyncRequest[T] = finish: Slot, t2: typedesc[T]): SyncRequest[T] =

View File

@ -1060,58 +1060,126 @@ suite "SyncManager test suite":
test "[SyncQueue] checkResponse() test": test "[SyncQueue] checkResponse() test":
let let
chain = createChain(Slot(10), Slot(20))
r1 = SyncRequest[SomeTPeer](slot: Slot(11), count: 1'u64) r1 = SyncRequest[SomeTPeer](slot: Slot(11), count: 1'u64)
r21 = SyncRequest[SomeTPeer](slot: Slot(11), count: 2'u64) r2 = SyncRequest[SomeTPeer](slot: Slot(11), count: 2'u64)
r3 = SyncRequest[SomeTPeer](slot: Slot(11), count: 3'u64) r3 = SyncRequest[SomeTPeer](slot: Slot(11), count: 3'u64)
let slots = mapIt(chain, it[].slot) check:
checkResponse(r1, [Slot(11)]).isOk() == true
checkResponse(r1, @[]).isOk() == true
checkResponse(r1, @[Slot(11), Slot(11)]).isOk() == false
checkResponse(r1, [Slot(10)]).isOk() == false
checkResponse(r1, [Slot(12)]).isOk() == false
checkResponse(r2, [Slot(11)]).isOk() == true
checkResponse(r2, [Slot(12)]).isOk() == true
checkResponse(r2, @[]).isOk() == true
checkResponse(r2, [Slot(11), Slot(12)]).isOk() == true
checkResponse(r2, [Slot(12)]).isOk() == true
checkResponse(r2, [Slot(11), Slot(12), Slot(13)]).isOk() == false
checkResponse(r2, [Slot(10), Slot(11)]).isOk() == false
checkResponse(r2, [Slot(10)]).isOk() == false
checkResponse(r2, [Slot(12), Slot(11)]).isOk() == false
checkResponse(r2, [Slot(12), Slot(13)]).isOk() == false
checkResponse(r2, [Slot(13)]).isOk() == false
checkResponse(r2, [Slot(11), Slot(11)]).isOk() == false
checkResponse(r2, [Slot(12), Slot(12)]).isOk() == false
checkResponse(r3, @[Slot(11)]).isOk() == true
checkResponse(r3, @[Slot(12)]).isOk() == true
checkResponse(r3, @[Slot(13)]).isOk() == true
checkResponse(r3, @[Slot(11), Slot(12)]).isOk() == true
checkResponse(r3, @[Slot(11), Slot(13)]).isOk() == true
checkResponse(r3, @[Slot(12), Slot(13)]).isOk() == true
checkResponse(r3, @[Slot(11), Slot(13), Slot(12)]).isOk() == false
checkResponse(r3, @[Slot(12), Slot(13), Slot(11)]).isOk() == false
checkResponse(r3, @[Slot(13), Slot(12), Slot(11)]).isOk() == false
checkResponse(r3, @[Slot(13), Slot(11)]).isOk() == false
checkResponse(r3, @[Slot(13), Slot(12)]).isOk() == false
checkResponse(r3, @[Slot(12), Slot(11)]).isOk() == false
checkResponse(r3, @[Slot(11), Slot(11), Slot(11)]).isOk() == false
checkResponse(r3, @[Slot(11), Slot(12), Slot(12)]).isOk() == false
checkResponse(r3, @[Slot(11), Slot(13), Slot(13)]).isOk() == false
checkResponse(r3, @[Slot(12), Slot(13), Slot(13)]).isOk() == false
checkResponse(r3, @[Slot(12), Slot(12), Slot(12)]).isOk() == false
checkResponse(r3, @[Slot(13), Slot(13), Slot(13)]).isOk() == false
checkResponse(r3, @[Slot(11), Slot(11)]).isOk() == false
checkResponse(r3, @[Slot(12), Slot(12)]).isOk() == false
checkResponse(r3, @[Slot(13), Slot(13)]).isOk() == false
test "[SyncQueue] checkBlobsResponse() test":
let
r1 = SyncRequest[SomeTPeer](slot: Slot(11), count: 1'u64)
r2 = SyncRequest[SomeTPeer](slot: Slot(11), count: 2'u64)
r3 = SyncRequest[SomeTPeer](slot: Slot(11), count: 3'u64)
d1 = Slot(11).repeat(MAX_BLOBS_PER_BLOCK)
d2 = Slot(12).repeat(MAX_BLOBS_PER_BLOCK)
d3 = Slot(13).repeat(MAX_BLOBS_PER_BLOCK)
check: check:
checkResponse(r1, @[slots[1]]) == true checkBlobsResponse(r1, [Slot(11)]).isOk() == true
checkResponse(r1, @[]) == true checkBlobsResponse(r1, @[]).isOk() == true
checkResponse(r1, @[slots[1], slots[1]]) == false checkBlobsResponse(r1, [Slot(11), Slot(11)]).isOk() == true
checkResponse(r1, @[slots[0]]) == false checkBlobsResponse(r1, [Slot(11), Slot(11), Slot(11)]).isOk() == true
checkResponse(r1, @[slots[2]]) == false checkBlobsResponse(r1, d1).isOk() == true
checkBlobsResponse(r1, d1 & @[Slot(11)]).isOk() == false
checkBlobsResponse(r1, [Slot(10)]).isOk() == false
checkBlobsResponse(r1, [Slot(12)]).isOk() == false
checkResponse(r21, @[slots[1]]) == true checkBlobsResponse(r2, [Slot(11)]).isOk() == true
checkResponse(r21, @[]) == true checkBlobsResponse(r2, [Slot(12)]).isOk() == true
checkResponse(r21, @[slots[1], slots[2]]) == true checkBlobsResponse(r2, @[]).isOk() == true
checkResponse(r21, @[slots[2]]) == true checkBlobsResponse(r2, [Slot(11), Slot(12)]).isOk() == true
checkResponse(r21, @[slots[1], slots[2], slots[3]]) == false checkBlobsResponse(r2, [Slot(11), Slot(11)]).isOk() == true
checkResponse(r21, @[slots[0], slots[1]]) == false checkBlobsResponse(r2, [Slot(12), Slot(12)]).isOk() == true
checkResponse(r21, @[slots[0]]) == false checkBlobsResponse(r2, d1).isOk() == true
checkResponse(r21, @[slots[2], slots[1]]) == false checkBlobsResponse(r2, d2).isOk() == true
checkResponse(r21, @[slots[2], slots[1]]) == false checkBlobsResponse(r2, d1 & d2).isOk() == true
checkResponse(r21, @[slots[2], slots[3]]) == false checkBlobsResponse(r2, [Slot(11), Slot(12), Slot(11)]).isOk() == false
checkResponse(r21, @[slots[3]]) == false checkBlobsResponse(r2, [Slot(12), Slot(11)]).isOk() == false
checkBlobsResponse(r2, d1 & @[Slot(11)]).isOk() == false
checkBlobsResponse(r2, d2 & @[Slot(12)]).isOk() == false
checkBlobsResponse(r2, @[Slot(11)] & d2 & @[Slot(12)]).isOk() == false
checkBlobsResponse(r2, d1 & d2 & @[Slot(12)]).isOk() == false
checkBlobsResponse(r2, d2 & d1).isOk() == false
checkResponse(r21, @[slots[1], slots[1]]) == false checkBlobsResponse(r3, [Slot(11)]).isOk() == true
checkResponse(r21, @[slots[2], slots[2]]) == false checkBlobsResponse(r3, [Slot(12)]).isOk() == true
checkBlobsResponse(r3, [Slot(13)]).isOk() == true
checkResponse(r3, @[slots[1]]) == true checkBlobsResponse(r3, @[]).isOk() == true
checkResponse(r3, @[slots[2]]) == true checkBlobsResponse(r3, [Slot(11), Slot(12)]).isOk() == true
checkResponse(r3, @[slots[3]]) == true checkBlobsResponse(r3, [Slot(11), Slot(11)]).isOk() == true
checkResponse(r3, @[slots[1], slots[2]]) == true checkBlobsResponse(r3, [Slot(12), Slot(12)]).isOk() == true
checkResponse(r3, @[slots[1], slots[3]]) == true checkBlobsResponse(r3, [Slot(11), Slot(13)]).isOk() == true
checkResponse(r3, @[slots[2], slots[3]]) == true checkBlobsResponse(r3, [Slot(12), Slot(13)]).isOk() == true
checkResponse(r3, @[slots[1], slots[3], slots[2]]) == false checkBlobsResponse(r3, [Slot(13), Slot(13)]).isOk() == true
checkResponse(r3, @[slots[2], slots[3], slots[1]]) == false checkBlobsResponse(r3, d1).isOk() == true
checkResponse(r3, @[slots[3], slots[2], slots[1]]) == false checkBlobsResponse(r3, d2).isOk() == true
checkResponse(r3, @[slots[3], slots[1]]) == false checkBlobsResponse(r3, d3).isOk() == true
checkResponse(r3, @[slots[3], slots[2]]) == false checkBlobsResponse(r3, d1 & d2).isOk() == true
checkResponse(r3, @[slots[2], slots[1]]) == false checkBlobsResponse(r3, d1 & d3).isOk() == true
checkBlobsResponse(r3, d2 & d3).isOk() == true
checkResponse(r3, @[slots[1], slots[1], slots[1]]) == false checkBlobsResponse(r3, [Slot(11), Slot(12), Slot(11)]).isOk() == false
checkResponse(r3, @[slots[1], slots[2], slots[2]]) == false checkBlobsResponse(r3, [Slot(11), Slot(13), Slot(12)]).isOk() == false
checkResponse(r3, @[slots[1], slots[3], slots[3]]) == false checkBlobsResponse(r3, [Slot(12), Slot(13), Slot(11)]).isOk() == false
checkResponse(r3, @[slots[2], slots[3], slots[3]]) == false checkBlobsResponse(r3, [Slot(12), Slot(11)]).isOk() == false
checkResponse(r3, @[slots[1], slots[1], slots[1]]) == false checkBlobsResponse(r3, [Slot(13), Slot(12)]).isOk() == false
checkResponse(r3, @[slots[2], slots[2], slots[2]]) == false checkBlobsResponse(r3, [Slot(13), Slot(11)]).isOk() == false
checkResponse(r3, @[slots[3], slots[3], slots[3]]) == false checkBlobsResponse(r3, d1 & @[Slot(11)]).isOk() == false
checkResponse(r3, @[slots[1], slots[1]]) == false checkBlobsResponse(r3, d2 & @[Slot(12)]).isOk() == false
checkResponse(r3, @[slots[2], slots[2]]) == false checkBlobsResponse(r3, d3 & @[Slot(13)]).isOk() == false
checkResponse(r3, @[slots[3], slots[3]]) == false checkBlobsResponse(r3, @[Slot(11)] & d2 & @[Slot(12)]).isOk() == false
checkBlobsResponse(r3, @[Slot(12)] & d3 & @[Slot(13)]).isOk() == false
checkBlobsResponse(r3, @[Slot(11)] & d3 & @[Slot(13)]).isOk() == false
checkBlobsResponse(r2, d1 & d2 & @[Slot(12)]).isOk() == false
checkBlobsResponse(r2, d1 & d3 & @[Slot(13)]).isOk() == false
checkBlobsResponse(r2, d2 & d3 & @[Slot(13)]).isOk() == false
checkBlobsResponse(r2, d2 & d1).isOk() == false
checkBlobsResponse(r2, d3 & d2).isOk() == false
checkBlobsResponse(r2, d3 & d1).isOk() == false
test "[SyncManager] groupBlobs() test": test "[SyncManager] groupBlobs() test":
var var