Update flare header download mechanism (#2607)

* Reverse order in staged blob lists

why:
  having the largest block number with the least header list index `0`
  makes it easier to grow the list with parent headers, i.e. decreasing
  block numbers.

* Set a header response threshold when to ditch peer

* Refactor extension of staged header chains record

why:
  Was cobbled together as a proof of concept after several approaches of
  how to run the download.

* TODO update

* Make debugging code independent of `release` flag

* Update import from jacek
This commit is contained in:
Jordan Hrycaj 2024-09-10 11:37:49 +00:00 committed by GitHub
parent 38c58c4feb
commit 1ced684d8f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 183 additions and 148 deletions

View File

@ -1,3 +1,6 @@
* Update/resolve code fragments which are tagged FIXME
* Revisit timeouts when fetching header data from the network
* Check noisy and verification sections whether they are really wanted
when going into production
+ **extraTraceMessages**
+ **verifyDataStructureOk**

View File

@ -8,6 +8,7 @@
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
{.push raises:[].}
import
@ -161,7 +162,7 @@ proc runMulti*(buddy: FlareBuddyRef) {.async.} =
else:
when extraTraceMessages:
debug info & ": failed, done", peer
debug info & ": nothing fetched, done", peer
# ------------------------------------------------------------------------------
# End

View File

@ -221,21 +221,31 @@ proc dbInitEra1*(ctx: FlareCtxRef): bool =
proc dbStashHeaders*(
ctx: FlareCtxRef;
first: BlockNumber;
rlpBlobs: openArray[Blob];
revBlobs: openArray[Blob];
) =
## Temporarily store header chain to persistent db (oblivious of the chain
## layout.) Note that headres should not be stashed if they are available
## on the `Era1` repo, i.e. if the corresponding block number is at most
## layout.) The headers should not be stashed if they are available on the
## `Era1` repo, i.e. if the corresponding block number is at most
## `ctx.pool.e1AvailMax`.
##
## The `revBlobs[]` arguments are passed in reverse order so that block
## numbers apply as
## ::
## #first -- revBlobs[^1]
## #(first+1) -- revBlobs[^2]
## ..
##
const info = "dbStashHeaders"
let kvt = ctx.db.ctx.getKvt()
for n,data in rlpBlobs:
let key = flareHeaderKey(first + n.uint)
let
kvt = ctx.db.ctx.getKvt()
last = first + revBlobs.len.uint - 1
for n,data in revBlobs:
let key = flareHeaderKey(last - n.uint)
kvt.put(key.toOpenArray, data).isOkOr:
raiseAssert info & ": put() failed: " & $$error
when extraTraceMessages:
trace info & ": headers stashed", first=first.bnStr, nHeaders=rlpBlobs.len
trace info & ": headers stashed",
iv=BnRange.new(first, last), nHeaders=revBlobs.len
proc dbPeekHeader*(ctx: FlareCtxRef; num: BlockNumber): Opt[BlockHeader] =
## Retrieve some stashed header.

View File

@ -27,14 +27,14 @@ const
extraTraceMessages = false # or true
## Enabled additional logging noise
verifyStagedQueueOk = not defined(release) or true
verifyDataStructureOk = false or true
## Debugging mode
# ------------------------------------------------------------------------------
# Private debugging helpers
# ------------------------------------------------------------------------------
when verifyStagedQueueOk:
when verifyDataStructureOk:
proc verifyStagedQueue(
ctx: FlareCtxRef;
info: static[string];
@ -54,7 +54,7 @@ when verifyStagedQueueOk:
while rc.isOk:
let
key = rc.value.key
nHeaders = rc.value.data.headers.len.uint
nHeaders = rc.value.data.revHdrs.len.uint
minPt = key - nHeaders + 1
unproc = ctx.unprocCovered(minPt, key)
if 0 < unproc:
@ -202,9 +202,9 @@ proc stagedCollect*(
# Request interval
ivReq = BnRange.new(ivReqMin, ivTop)
# Current length of the headers queue. This is one way to calculate
# the response length from the network.
nLhcHeaders = lhc.headers.len
# Current length of the headers queue. This is used to calculate the
# response length from the network.
nLhcHeaders = lhc.revHdrs.len
# Fetch and extend chain record
if not await buddy.fetchAndCheck(ivReq, lhc, info):
@ -223,17 +223,17 @@ proc stagedCollect*(
break
# Update remaining interval
let ivRespLen = lhc.headers.len - nLhcHeaders
if iv.minPt + ivRespLen.uint < ivTop:
let newIvTop = ivTop - ivRespLen.uint # will mostly be `ivReq.minPt-1`
when extraTraceMessages:
trace info & ": collected range", peer, iv=BnRange.new(iv.minPt, ivTop),
ivReq, ivResp=BnRange.new(newIvTop+1, ivReq.maxPt), ivRespLen,
isOpportunistic
ivTop = newIvTop
else:
let ivRespLen = lhc.revHdrs.len - nLhcHeaders
if ivTop <= iv.minPt + ivRespLen.uint or buddy.ctrl.stopped:
break
let newIvTop = ivTop - ivRespLen.uint # will mostly be `ivReq.minPt-1`
when extraTraceMessages:
trace info & ": collected range", peer, iv=BnRange.new(iv.minPt, ivTop),
ivReq, ivResp=BnRange.new(newIvTop+1, ivReq.maxPt), ivRespLen,
isOpportunistic
ivTop = newIvTop
# Store `lhcOpt` chain on the `staged` queue
let qItem = ctx.lhc.staged.insert(iv.maxPt).valueOr:
raiseAssert info & ": duplicate key on staged queue iv=" & $iv
@ -242,10 +242,11 @@ proc stagedCollect*(
when extraTraceMessages:
trace info & ": stashed on staged queue", peer,
iv=BnRange.new(iv.maxPt - lhc.headers.len.uint + 1, iv.maxPt),
nHeaders=lhc.headers.len, isOpportunistic
nHeaders=lhc.headers.len, isOpportunistic, ctrl=buddy.ctrl.state
else:
trace info & ": stashed on staged queue", peer,
topBlock=iv.maxPt.bnStr, nHeaders=lhc.headers.len, isOpportunistic
topBlock=iv.maxPt.bnStr, nHeaders=lhc.revHdrs.len,
isOpportunistic, ctrl=buddy.ctrl.state
return true
@ -261,7 +262,7 @@ proc stagedProcess*(ctx: FlareCtxRef; info: static[string]): int =
let
least = ctx.layout.least # `L` from `README.md` (1) or `worker_desc`
iv = BnRange.new(qItem.key - qItem.data.headers.len.uint + 1, qItem.key)
iv = BnRange.new(qItem.key - qItem.data.revHdrs.len.uint + 1, qItem.key)
if iv.maxPt+1 < least:
when extraTraceMessages:
trace info & ": there is a gap", iv, L=least.bnStr, nSaved=result
@ -287,7 +288,7 @@ proc stagedProcess*(ctx: FlareCtxRef; info: static[string]): int =
break
# Store headers on database
ctx.dbStashHeaders(iv.minPt, qItem.data.headers)
ctx.dbStashHeaders(iv.minPt, qItem.data.revHdrs)
ctx.layout.least = iv.minPt
ctx.layout.leastParent = qItem.data.parentHash
let ok = ctx.dbStoreLinkedHChainsLayout()
@ -344,13 +345,13 @@ proc stagedReorg*(ctx: FlareCtxRef; info: static[string]) =
defer: walk.destroy()
var rc = walk.first
while rc.isOk:
let (key, nHeaders) = (rc.value.key, rc.value.data.headers.len.uint)
let (key, nHeaders) = (rc.value.key, rc.value.data.revHdrs.len.uint)
ctx.unprocMerge(key - nHeaders + 1, key)
rc = walk.next
# Reset `staged` queue
ctx.lhc.staged.clear()
when verifyStagedQueueOk:
when verifyDataStructureOk:
ctx.verifyStagedQueue(info, multiMode = false)
when extraTraceMessages:

View File

@ -24,6 +24,40 @@ logScope:
const extraTraceMessages = false # or true
## Enabled additional logging noise
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
# Copied from `nimbus_import`
func shortLog(a: chronos.Duration, parts = int.high): string =
## Returns string representation of Duration ``a`` as nanoseconds value.
var
res = ""
v = a.nanoseconds()
parts = parts
template f(n: string, T: Duration) =
if v >= T.nanoseconds():
res.add($(uint64(v div T.nanoseconds())))
res.add(n)
v = v mod T.nanoseconds()
dec parts
if v == 0 or parts <= 0:
return res
f("s", Second)
f("ms", Millisecond)
f("us", Microsecond)
f("ns", Nanosecond)
res
# For some reason neither `formatIt` nor `$` works as expected with logging
# the `elapsed` variable, below. This might be due to the fact that the
# `headersFetchReversed()` function is a generic one, i.e. a template.
func toStr(a: chronos.Duration): string =
a.shortLog(2)
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
@ -36,6 +70,8 @@ proc headersFetchReversed*(
): Future[Result[seq[BlockHeader],void]]
{.async.} =
## Get a list of headers in reverse order.
const
threshold = fetchHeaderReqZombieThreshold # shortcut
let
peer = buddy.peer
useHash = (topHash != EMPTY_ROOT_HASH)
@ -56,6 +92,7 @@ proc headersFetchReversed*(
startBlock: HashOrNum(
isHash: false,
number: ivReq.maxPt))
start = Moment.now()
when extraTraceMessages:
trace trEthSendSendingGetBlockHeaders & " reverse", peer, ivReq,
@ -64,32 +101,45 @@ proc headersFetchReversed*(
# Fetch headers from peer
var resp: Option[blockHeadersObj]
try:
# There is no obvious way to set an individual timeout for this call. The
# eth/xx driver sets a global response timeout to `10s`. By how it is
# implemented, the `Future` returned by `peer.getBlockHeaders(req)` cannot
# reliably be used in a `withTimeout()` directive. It would rather crash
# in `rplx` with a violated `req.timeoutAt <= Moment.now()` assertion.
resp = await peer.getBlockHeaders(req)
except TransportError as e:
`info` info & ", stop", peer, ivReq, nReq=req.maxResults, useHash,
error=($e.name), msg=e.msg
`info` info & " error", peer, ivReq, nReq=req.maxResults, useHash,
elapsed=(Moment.now() - start).toStr, error=($e.name), msg=e.msg
return err()
# Beware of peer terminating the session while fetching data
if buddy.ctrl.stopped:
return err()
# Kludge: Ban an overly slow peer for a while
let elapsed = Moment.now() - start
if threshold < elapsed:
buddy.ctrl.zombie = true # abandon slow peer
if resp.isNone:
# Evaluate result
if resp.isNone or buddy.ctrl.stopped:
when extraTraceMessages:
trace trEthRecvReceivedBlockHeaders, peer,
ivReq, nReq=req.maxResults, respose="n/a", useHash
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
nResp=0, elapsed=elapsed.toStr, threshold, ctrl=buddy.ctrl.state
return err()
let h: seq[BlockHeader] = resp.get.headers
if h.len == 0 or ivReq.len < h.len.uint:
when extraTraceMessages:
trace trEthRecvReceivedBlockHeaders, peer, ivReq, nReq=req.maxResults,
useHash, nResp=h.len
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
nResp=h.len, elapsed=elapsed.toStr, threshold, ctrl=buddy.ctrl.state
return err()
when extraTraceMessages:
trace trEthRecvReceivedBlockHeaders, peer, ivReq, nReq=req.maxResults,
useHash, ivResp=BnRange.new(h[^1].number,h[0].number), nResp=h.len
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
ivResp=BnRange.new(h[^1].number,h[0].number), nResp=h.len,
elapsed=elapsed.toStr, threshold, ctrl=buddy.ctrl.state
else:
if buddy.ctrl.stopped:
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
ivResp=BnRange.new(h[^1].number,h[0].number), nResp=h.len,
elapsed=elapsed.toStr, threshold, ctrl=buddy.ctrl.state
return ok(h)

View File

@ -20,7 +20,7 @@ const
extraTraceMessages = false # or true
## Enabled additional logging noise
verifyLinkedHChainOk = not defined(release) # or true
verifyDataStructureOk = false # or true
## Debugging mode
when extraTraceMessages:
@ -41,90 +41,30 @@ proc `$`(w: Hash256): string =
formatIt(Hash256):
$it
when verifyLinkedHChainOk:
when verifyDataStructureOk:
proc verifyHeaderChainItem(lhc: ref LinkedHChain; info: static[string]) =
when extraTraceMessages:
trace info & ": verifying", nLhc=lhc.headers.len
trace info & ": verifying", nLhc=lhc.revHdrs.len
var
firstHdr, prvHdr: BlockHeader
topHdr, childHdr: BlockHeader
try:
firstHdr = rlp.decode(lhc.headers[0], BlockHeader)
doAssert lhc.parentHash == firstHdr.parentHash
doAssert lhc.revHdrs[0].keccakHash == lhc.hash
topHdr = rlp.decode(lhc.revHdrs[0], BlockHeader)
prvHdr = firstHdr
for n in 1 ..< lhc.headers.len:
let header = rlp.decode(lhc.headers[n], BlockHeader)
doAssert lhc.headers[n-1].keccakHash == header.parentHash
doAssert prvHdr.number + 1 == header.number
prvHdr = header
childHdr = topHdr
for n in 1 ..< lhc.revHdrs.len:
let header = rlp.decode(lhc.revHdrs[n], BlockHeader)
doAssert childHdr.number == header.number + 1
doAssert lhc.revHdrs[n].keccakHash == childHdr.parentHash
childHdr = header
doAssert lhc.headers[^1].keccakHash == lhc.hash
doAssert childHdr.parentHash == lhc.parentHash
except RlpError as e:
raiseAssert "verifyHeaderChainItem oops(" & $e.name & ") msg=" & e.msg
when extraTraceMessages:
trace info & ": verify ok",
iv=BnRange.new(firstHdr.number,prvHdr.number), nLhc=lhc.headers.len
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc newLHChain(
rev: seq[BlockHeader];
buddy: FlareBuddyRef;
blockNumber: BlockNumber;
topHash: Hash256;
info: static[string];
): Opt[ref LinkedHChain] =
## Verify list of headers while assembling them to a `LinkedHChain`
when extraTraceMessages:
trace info, nHeaders=rev.len
# Verify top block number
assert 0 < rev.len # debugging only
if rev[0].number != blockNumber:
when extraTraceMessages:
trace info & ": top block number mismatch",
number=rev[0].number.bnStr, expected=blockNumber.bnStr
return err()
# Make space for return code array
var chain = (ref LinkedHChain)(headers: newSeq[Blob](rev.len))
# Set up header with larges block number
let blob0 = rlp.encode(rev[0])
chain.headers[rev.len-1] = blob0
chain.hash = blob0.keccakHash
# Verify top block hash (if any)
if topHash != EMPTY_ROOT_HASH and chain.hash != topHash:
when extraTraceMessages:
trace info & ": top block hash mismatch",
hash=(chain.hash.data.toHex), expected=(topHash.data.toHex)
return err()
# Make sure that block headers are chained
for n in 1 ..< rev.len:
if rev[n].number + 1 != rev[n-1].number:
when extraTraceMessages:
trace info & ": #numbers mismatch", n,
parentNumber=rev[n-1].number.bnStr, number=rev[n].number.bnStr
return err()
let blob = rlp.encode(rev[n])
if rev[n-1].parentHash != blob.keccakHash:
when extraTraceMessages:
trace info & ": hash mismatch", n,
parentHash=rev[n-1].parentHash, hash=blob.keccakHash
return err()
chain.headers[rev.len-n-1] = blob
# Finalise
chain.parentHash = rev[rev.len-1].parentHash
when extraTraceMessages:
trace info & " new chain record", nChain=chain.headers.len
ok(chain)
iv=BnRange.new(childHdr.number,topHdr.number), nLhc=lhc.revHdrs.len
# ------------------------------------------------------------------------------
# Public functions
@ -133,39 +73,67 @@ proc newLHChain(
proc extendLinkedHChain*(
rev: seq[BlockHeader];
buddy: FlareBuddyRef;
blockNumber: BlockNumber;
topNumber: BlockNumber;
lhc: ref LinkedHChain; # update in place
info: static[string];
): bool =
## Returns sort of `lhc[] += rev[]` where `lhc[]` is updated in place.
when extraTraceMessages:
let
peer = buddy.peer
isOpportunistic = lhc.parentHash == EMPTY_ROOT_HASH
let peer = buddy.peer
let newLhc = rev.newLHChain(buddy, blockNumber, lhc.parentHash, info).valueOr:
# Verify top block number
assert 0 < rev.len # debugging only
if rev[0].number != topNumber:
when extraTraceMessages:
trace info & ": fetched headers unusable", peer,
blockNumber=blockNumber.bnStr, isOpportunistic
trace info & ": top block number mismatch", peer, n=0,
number=rev[0].number.bnStr, expected=topNumber.bnStr
return false
# Prepend `newLhc` before `lhc`
#
# FIXME: This must be cleaned up and optimised at some point.
#
# Make space for return code array
let offset = lhc.revHdrs.len
lhc.revHdrs.setLen(offset + rev.len)
# Set up header with largest block number
let
blob0 = rlp.encode(rev[0])
hash0 = blob0.keccakHash
lhc.revHdrs[offset] = blob0
if offset == 0:
lhc.hash = hash0
# Verify top block hash (if any)
if lhc.parentHash != EMPTY_ROOT_HASH and hash0 != lhc.parentHash:
when extraTraceMessages:
trace info & ": top hash mismatch", peer, hash0, expected=lhc.parentHash
lhc.revHdrs.setLen(offset)
return false
# Encode block headers and make sure they are chained
for n in 1 ..< rev.len:
if rev[n].number + 1 != rev[n-1].number:
when extraTraceMessages:
trace info & ": #numbers mismatch", peer, n,
parentNumber=rev[n-1].number.bnStr, number=rev[n].number.bnStr
lhc.revHdrs.setLen(offset)
return false
lhc.revHdrs[offset + n] = rlp.encode(rev[n])
let hashN = lhc.revHdrs[offset + n].keccakHash
if rev[n-1].parentHash != hashN:
when extraTraceMessages:
trace info & ": hash mismatch", peer, n,
parentHash=rev[n-1].parentHash, hashN
lhc.revHdrs.setLen(offset)
return false
# Finalise
lhc.parentHash = rev[rev.len-1].parentHash
when extraTraceMessages:
trace info & ": extending chain record", peer,
blockNumber=blockNumber.bnStr, len=lhc.headers.len,
newLen=(newLhc.headers.len + lhc.headers.len), isOpportunistic
trace info & " extended chain record", peer, topNumber=topNumber.bnStr,
offset, nLhc=lhc.revHdrs.len
if lhc.headers.len == 0:
lhc.hash = newLhc.hash
lhc.headers = newLhc.headers
else:
lhc.headers = newLhc.headers & lhc.headers
lhc.parentHash = newLhc.parentHash
when verifyLinkedHChainOk:
when verifyDataStructureOk:
lhc.verifyHeaderChainItem info
true

View File

@ -45,6 +45,10 @@ const
## smaller unprocessed slots that mostly all will be served leads to less
## fragmentation on a multi-peer downloading approach.
fetchHeaderReqZombieThreshold* = chronos.seconds(2)
## Response time allowance. If the response time for the set of headers
## exceeds this threshold, then this peer will be banned for a while.
nFetchHeadersOpportunisticly* = 8 * nFetchHeadersRequest
## Length of the request/stage batch. Several headers are consecutively
## fetched and stashed together as a single record on the staged queue.
@ -83,16 +87,14 @@ type
## Traversal descriptor
LinkedHChain* = object
## Public block items for the `LinkedHChainQueue` list, indexed by
## largest block number.
## Public block items for the `LinkedHChainQueue` list, indexed by the
## largest block number. The list `revHdrs[]` is reversed, i.e. the largest
## block number has the least index `0`. This makes it easier to grow the
## sequence with parent headers, i.e. decreasing block numbers.
##
## FIXME: `headers[]` should be reversed, i.e. `headers[0]` has the
## highest block number. This makes it natural to extend the
## sequence with parent headers at the growing end.
##
parentHash*: Hash256 ## Parent hash of `headers[0]`
headers*: seq[Blob] ## Encoded linked header chain
hash*: Hash256 ## Hash of `headers[^1]`
hash*: Hash256 ## Hash of `headers[0]`
revHdrs*: seq[Blob] ## Encoded linked header chain
parentHash*: Hash256 ## Parent hash of `headers[^1]`
# -------------------