diff --git a/nimbus/sync/flare/TODO.md b/nimbus/sync/flare/TODO.md index 57cd3c821..75c9639a0 100644 --- a/nimbus/sync/flare/TODO.md +++ b/nimbus/sync/flare/TODO.md @@ -1,3 +1,6 @@ * Update/resolve code fragments which are tagged FIXME -* Revisit timeouts when fetching header data from the network +* Check noisy and verification sections whether they are really wanted + when going into production + + **extraTraceMessages** + + **verifyDataStructureOk** diff --git a/nimbus/sync/flare/worker.nim b/nimbus/sync/flare/worker.nim index 6a9cf7747..85d67d11a 100644 --- a/nimbus/sync/flare/worker.nim +++ b/nimbus/sync/flare/worker.nim @@ -8,6 +8,7 @@ # at your option. This file may not be copied, modified, or distributed # except according to those terms. + {.push raises:[].} import @@ -161,7 +162,7 @@ proc runMulti*(buddy: FlareBuddyRef) {.async.} = else: when extraTraceMessages: - debug info & ": failed, done", peer + debug info & ": nothing fetched, done", peer # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/flare/worker/db.nim b/nimbus/sync/flare/worker/db.nim index cd186e379..0afad6642 100644 --- a/nimbus/sync/flare/worker/db.nim +++ b/nimbus/sync/flare/worker/db.nim @@ -221,21 +221,31 @@ proc dbInitEra1*(ctx: FlareCtxRef): bool = proc dbStashHeaders*( ctx: FlareCtxRef; first: BlockNumber; - rlpBlobs: openArray[Blob]; + revBlobs: openArray[Blob]; ) = ## Temporarily store header chain to persistent db (oblivious of the chain - ## layout.) Note that headres should not be stashed if they are available - ## on the `Era1` repo, i.e. if the corresponding block number is at most + ## layout.) The headers should not be stashed if they are available on the + ## `Era1` repo, i.e. if the corresponding block number is at most ## `ctx.pool.e1AvailMax`. ## + ## The `revBlobs[]` arguments are passed in reverse order so that block + ## numbers apply as + ## :: + ## #first -- revBlobs[^1] + ## #(first+1) -- revBlobs[^2] + ## .. + ## const info = "dbStashHeaders" - let kvt = ctx.db.ctx.getKvt() - for n,data in rlpBlobs: - let key = flareHeaderKey(first + n.uint) + let + kvt = ctx.db.ctx.getKvt() + last = first + revBlobs.len.uint - 1 + for n,data in revBlobs: + let key = flareHeaderKey(last - n.uint) kvt.put(key.toOpenArray, data).isOkOr: raiseAssert info & ": put() failed: " & $$error when extraTraceMessages: - trace info & ": headers stashed", first=first.bnStr, nHeaders=rlpBlobs.len + trace info & ": headers stashed", + iv=BnRange.new(first, last), nHeaders=revBlobs.len proc dbPeekHeader*(ctx: FlareCtxRef; num: BlockNumber): Opt[BlockHeader] = ## Retrieve some stashed header. diff --git a/nimbus/sync/flare/worker/staged.nim b/nimbus/sync/flare/worker/staged.nim index 0a9c2a80b..068003472 100644 --- a/nimbus/sync/flare/worker/staged.nim +++ b/nimbus/sync/flare/worker/staged.nim @@ -27,14 +27,14 @@ const extraTraceMessages = false # or true ## Enabled additional logging noise - verifyStagedQueueOk = not defined(release) or true + verifyDataStructureOk = false or true ## Debugging mode # ------------------------------------------------------------------------------ # Private debugging helpers # ------------------------------------------------------------------------------ -when verifyStagedQueueOk: +when verifyDataStructureOk: proc verifyStagedQueue( ctx: FlareCtxRef; info: static[string]; @@ -54,7 +54,7 @@ when verifyStagedQueueOk: while rc.isOk: let key = rc.value.key - nHeaders = rc.value.data.headers.len.uint + nHeaders = rc.value.data.revHdrs.len.uint minPt = key - nHeaders + 1 unproc = ctx.unprocCovered(minPt, key) if 0 < unproc: @@ -202,9 +202,9 @@ proc stagedCollect*( # Request interval ivReq = BnRange.new(ivReqMin, ivTop) - # Current length of the headers queue. This is one way to calculate - # the response length from the network. - nLhcHeaders = lhc.headers.len + # Current length of the headers queue. This is used to calculate the + # response length from the network. + nLhcHeaders = lhc.revHdrs.len # Fetch and extend chain record if not await buddy.fetchAndCheck(ivReq, lhc, info): @@ -223,17 +223,17 @@ proc stagedCollect*( break # Update remaining interval - let ivRespLen = lhc.headers.len - nLhcHeaders - if iv.minPt + ivRespLen.uint < ivTop: - let newIvTop = ivTop - ivRespLen.uint # will mostly be `ivReq.minPt-1` - when extraTraceMessages: - trace info & ": collected range", peer, iv=BnRange.new(iv.minPt, ivTop), - ivReq, ivResp=BnRange.new(newIvTop+1, ivReq.maxPt), ivRespLen, - isOpportunistic - ivTop = newIvTop - else: + let ivRespLen = lhc.revHdrs.len - nLhcHeaders + if ivTop <= iv.minPt + ivRespLen.uint or buddy.ctrl.stopped: break + let newIvTop = ivTop - ivRespLen.uint # will mostly be `ivReq.minPt-1` + when extraTraceMessages: + trace info & ": collected range", peer, iv=BnRange.new(iv.minPt, ivTop), + ivReq, ivResp=BnRange.new(newIvTop+1, ivReq.maxPt), ivRespLen, + isOpportunistic + ivTop = newIvTop + # Store `lhcOpt` chain on the `staged` queue let qItem = ctx.lhc.staged.insert(iv.maxPt).valueOr: raiseAssert info & ": duplicate key on staged queue iv=" & $iv @@ -242,10 +242,11 @@ proc stagedCollect*( when extraTraceMessages: trace info & ": stashed on staged queue", peer, iv=BnRange.new(iv.maxPt - lhc.headers.len.uint + 1, iv.maxPt), - nHeaders=lhc.headers.len, isOpportunistic + nHeaders=lhc.headers.len, isOpportunistic, ctrl=buddy.ctrl.state else: trace info & ": stashed on staged queue", peer, - topBlock=iv.maxPt.bnStr, nHeaders=lhc.headers.len, isOpportunistic + topBlock=iv.maxPt.bnStr, nHeaders=lhc.revHdrs.len, + isOpportunistic, ctrl=buddy.ctrl.state return true @@ -261,7 +262,7 @@ proc stagedProcess*(ctx: FlareCtxRef; info: static[string]): int = let least = ctx.layout.least # `L` from `README.md` (1) or `worker_desc` - iv = BnRange.new(qItem.key - qItem.data.headers.len.uint + 1, qItem.key) + iv = BnRange.new(qItem.key - qItem.data.revHdrs.len.uint + 1, qItem.key) if iv.maxPt+1 < least: when extraTraceMessages: trace info & ": there is a gap", iv, L=least.bnStr, nSaved=result @@ -287,7 +288,7 @@ proc stagedProcess*(ctx: FlareCtxRef; info: static[string]): int = break # Store headers on database - ctx.dbStashHeaders(iv.minPt, qItem.data.headers) + ctx.dbStashHeaders(iv.minPt, qItem.data.revHdrs) ctx.layout.least = iv.minPt ctx.layout.leastParent = qItem.data.parentHash let ok = ctx.dbStoreLinkedHChainsLayout() @@ -344,13 +345,13 @@ proc stagedReorg*(ctx: FlareCtxRef; info: static[string]) = defer: walk.destroy() var rc = walk.first while rc.isOk: - let (key, nHeaders) = (rc.value.key, rc.value.data.headers.len.uint) + let (key, nHeaders) = (rc.value.key, rc.value.data.revHdrs.len.uint) ctx.unprocMerge(key - nHeaders + 1, key) rc = walk.next # Reset `staged` queue ctx.lhc.staged.clear() - when verifyStagedQueueOk: + when verifyDataStructureOk: ctx.verifyStagedQueue(info, multiMode = false) when extraTraceMessages: diff --git a/nimbus/sync/flare/worker/staged/headers.nim b/nimbus/sync/flare/worker/staged/headers.nim index dad1b4b71..a921ab5da 100644 --- a/nimbus/sync/flare/worker/staged/headers.nim +++ b/nimbus/sync/flare/worker/staged/headers.nim @@ -24,6 +24,40 @@ logScope: const extraTraceMessages = false # or true ## Enabled additional logging noise +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +# Copied from `nimbus_import` +func shortLog(a: chronos.Duration, parts = int.high): string = + ## Returns string representation of Duration ``a`` as nanoseconds value. + var + res = "" + v = a.nanoseconds() + parts = parts + + template f(n: string, T: Duration) = + if v >= T.nanoseconds(): + res.add($(uint64(v div T.nanoseconds()))) + res.add(n) + v = v mod T.nanoseconds() + dec parts + if v == 0 or parts <= 0: + return res + + f("s", Second) + f("ms", Millisecond) + f("us", Microsecond) + f("ns", Nanosecond) + + res + +# For some reason neither `formatIt` nor `$` works as expected with logging +# the `elapsed` variable, below. This might be due to the fact that the +# `headersFetchReversed()` function is a generic one, i.e. a template. +func toStr(a: chronos.Duration): string = + a.shortLog(2) + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ @@ -36,6 +70,8 @@ proc headersFetchReversed*( ): Future[Result[seq[BlockHeader],void]] {.async.} = ## Get a list of headers in reverse order. + const + threshold = fetchHeaderReqZombieThreshold # shortcut let peer = buddy.peer useHash = (topHash != EMPTY_ROOT_HASH) @@ -56,6 +92,7 @@ proc headersFetchReversed*( startBlock: HashOrNum( isHash: false, number: ivReq.maxPt)) + start = Moment.now() when extraTraceMessages: trace trEthSendSendingGetBlockHeaders & " reverse", peer, ivReq, @@ -64,32 +101,45 @@ proc headersFetchReversed*( # Fetch headers from peer var resp: Option[blockHeadersObj] try: + # There is no obvious way to set an individual timeout for this call. The + # eth/xx driver sets a global response timeout to `10s`. By how it is + # implemented, the `Future` returned by `peer.getBlockHeaders(req)` cannot + # reliably be used in a `withTimeout()` directive. It would rather crash + # in `rplx` with a violated `req.timeoutAt <= Moment.now()` assertion. resp = await peer.getBlockHeaders(req) except TransportError as e: - `info` info & ", stop", peer, ivReq, nReq=req.maxResults, useHash, - error=($e.name), msg=e.msg + `info` info & " error", peer, ivReq, nReq=req.maxResults, useHash, + elapsed=(Moment.now() - start).toStr, error=($e.name), msg=e.msg return err() - # Beware of peer terminating the session while fetching data - if buddy.ctrl.stopped: - return err() + # Kludge: Ban an overly slow peer for a while + let elapsed = Moment.now() - start + if threshold < elapsed: + buddy.ctrl.zombie = true # abandon slow peer - if resp.isNone: + # Evaluate result + if resp.isNone or buddy.ctrl.stopped: when extraTraceMessages: - trace trEthRecvReceivedBlockHeaders, peer, - ivReq, nReq=req.maxResults, respose="n/a", useHash + trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash, + nResp=0, elapsed=elapsed.toStr, threshold, ctrl=buddy.ctrl.state return err() let h: seq[BlockHeader] = resp.get.headers if h.len == 0 or ivReq.len < h.len.uint: when extraTraceMessages: - trace trEthRecvReceivedBlockHeaders, peer, ivReq, nReq=req.maxResults, - useHash, nResp=h.len + trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash, + nResp=h.len, elapsed=elapsed.toStr, threshold, ctrl=buddy.ctrl.state return err() when extraTraceMessages: - trace trEthRecvReceivedBlockHeaders, peer, ivReq, nReq=req.maxResults, - useHash, ivResp=BnRange.new(h[^1].number,h[0].number), nResp=h.len + trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash, + ivResp=BnRange.new(h[^1].number,h[0].number), nResp=h.len, + elapsed=elapsed.toStr, threshold, ctrl=buddy.ctrl.state + else: + if buddy.ctrl.stopped: + trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash, + ivResp=BnRange.new(h[^1].number,h[0].number), nResp=h.len, + elapsed=elapsed.toStr, threshold, ctrl=buddy.ctrl.state return ok(h) diff --git a/nimbus/sync/flare/worker/staged/linked_hchain.nim b/nimbus/sync/flare/worker/staged/linked_hchain.nim index 51d54e0ae..ddfb3bf99 100644 --- a/nimbus/sync/flare/worker/staged/linked_hchain.nim +++ b/nimbus/sync/flare/worker/staged/linked_hchain.nim @@ -20,7 +20,7 @@ const extraTraceMessages = false # or true ## Enabled additional logging noise - verifyLinkedHChainOk = not defined(release) # or true + verifyDataStructureOk = false # or true ## Debugging mode when extraTraceMessages: @@ -41,90 +41,30 @@ proc `$`(w: Hash256): string = formatIt(Hash256): $it -when verifyLinkedHChainOk: +when verifyDataStructureOk: proc verifyHeaderChainItem(lhc: ref LinkedHChain; info: static[string]) = when extraTraceMessages: - trace info & ": verifying", nLhc=lhc.headers.len + trace info & ": verifying", nLhc=lhc.revHdrs.len var - firstHdr, prvHdr: BlockHeader + topHdr, childHdr: BlockHeader try: - firstHdr = rlp.decode(lhc.headers[0], BlockHeader) - doAssert lhc.parentHash == firstHdr.parentHash + doAssert lhc.revHdrs[0].keccakHash == lhc.hash + topHdr = rlp.decode(lhc.revHdrs[0], BlockHeader) - prvHdr = firstHdr - for n in 1 ..< lhc.headers.len: - let header = rlp.decode(lhc.headers[n], BlockHeader) - doAssert lhc.headers[n-1].keccakHash == header.parentHash - doAssert prvHdr.number + 1 == header.number - prvHdr = header + childHdr = topHdr + for n in 1 ..< lhc.revHdrs.len: + let header = rlp.decode(lhc.revHdrs[n], BlockHeader) + doAssert childHdr.number == header.number + 1 + doAssert lhc.revHdrs[n].keccakHash == childHdr.parentHash + childHdr = header - doAssert lhc.headers[^1].keccakHash == lhc.hash + doAssert childHdr.parentHash == lhc.parentHash except RlpError as e: raiseAssert "verifyHeaderChainItem oops(" & $e.name & ") msg=" & e.msg when extraTraceMessages: trace info & ": verify ok", - iv=BnRange.new(firstHdr.number,prvHdr.number), nLhc=lhc.headers.len - -# ------------------------------------------------------------------------------ -# Private functions -# ------------------------------------------------------------------------------ - -proc newLHChain( - rev: seq[BlockHeader]; - buddy: FlareBuddyRef; - blockNumber: BlockNumber; - topHash: Hash256; - info: static[string]; - ): Opt[ref LinkedHChain] = - ## Verify list of headers while assembling them to a `LinkedHChain` - when extraTraceMessages: - trace info, nHeaders=rev.len - - # Verify top block number - assert 0 < rev.len # debugging only - if rev[0].number != blockNumber: - when extraTraceMessages: - trace info & ": top block number mismatch", - number=rev[0].number.bnStr, expected=blockNumber.bnStr - return err() - - # Make space for return code array - var chain = (ref LinkedHChain)(headers: newSeq[Blob](rev.len)) - - # Set up header with larges block number - let blob0 = rlp.encode(rev[0]) - chain.headers[rev.len-1] = blob0 - chain.hash = blob0.keccakHash - - # Verify top block hash (if any) - if topHash != EMPTY_ROOT_HASH and chain.hash != topHash: - when extraTraceMessages: - trace info & ": top block hash mismatch", - hash=(chain.hash.data.toHex), expected=(topHash.data.toHex) - return err() - - # Make sure that block headers are chained - for n in 1 ..< rev.len: - if rev[n].number + 1 != rev[n-1].number: - when extraTraceMessages: - trace info & ": #numbers mismatch", n, - parentNumber=rev[n-1].number.bnStr, number=rev[n].number.bnStr - return err() - let blob = rlp.encode(rev[n]) - if rev[n-1].parentHash != blob.keccakHash: - when extraTraceMessages: - trace info & ": hash mismatch", n, - parentHash=rev[n-1].parentHash, hash=blob.keccakHash - return err() - chain.headers[rev.len-n-1] = blob - - # Finalise - chain.parentHash = rev[rev.len-1].parentHash - - when extraTraceMessages: - trace info & " new chain record", nChain=chain.headers.len - ok(chain) + iv=BnRange.new(childHdr.number,topHdr.number), nLhc=lhc.revHdrs.len # ------------------------------------------------------------------------------ # Public functions @@ -133,39 +73,67 @@ proc newLHChain( proc extendLinkedHChain*( rev: seq[BlockHeader]; buddy: FlareBuddyRef; - blockNumber: BlockNumber; + topNumber: BlockNumber; lhc: ref LinkedHChain; # update in place info: static[string]; ): bool = - + ## Returns sort of `lhc[] += rev[]` where `lhc[]` is updated in place. when extraTraceMessages: - let - peer = buddy.peer - isOpportunistic = lhc.parentHash == EMPTY_ROOT_HASH + let peer = buddy.peer - let newLhc = rev.newLHChain(buddy, blockNumber, lhc.parentHash, info).valueOr: + # Verify top block number + assert 0 < rev.len # debugging only + if rev[0].number != topNumber: when extraTraceMessages: - trace info & ": fetched headers unusable", peer, - blockNumber=blockNumber.bnStr, isOpportunistic + trace info & ": top block number mismatch", peer, n=0, + number=rev[0].number.bnStr, expected=topNumber.bnStr return false - # Prepend `newLhc` before `lhc` - # - # FIXME: This must be cleaned up and optimised at some point. - # + # Make space for return code array + let offset = lhc.revHdrs.len + lhc.revHdrs.setLen(offset + rev.len) + + # Set up header with largest block number + let + blob0 = rlp.encode(rev[0]) + hash0 = blob0.keccakHash + lhc.revHdrs[offset] = blob0 + if offset == 0: + lhc.hash = hash0 + + # Verify top block hash (if any) + if lhc.parentHash != EMPTY_ROOT_HASH and hash0 != lhc.parentHash: + when extraTraceMessages: + trace info & ": top hash mismatch", peer, hash0, expected=lhc.parentHash + lhc.revHdrs.setLen(offset) + return false + + # Encode block headers and make sure they are chained + for n in 1 ..< rev.len: + if rev[n].number + 1 != rev[n-1].number: + when extraTraceMessages: + trace info & ": #numbers mismatch", peer, n, + parentNumber=rev[n-1].number.bnStr, number=rev[n].number.bnStr + lhc.revHdrs.setLen(offset) + return false + + lhc.revHdrs[offset + n] = rlp.encode(rev[n]) + let hashN = lhc.revHdrs[offset + n].keccakHash + if rev[n-1].parentHash != hashN: + when extraTraceMessages: + trace info & ": hash mismatch", peer, n, + parentHash=rev[n-1].parentHash, hashN + lhc.revHdrs.setLen(offset) + return false + + # Finalise + lhc.parentHash = rev[rev.len-1].parentHash + when extraTraceMessages: - trace info & ": extending chain record", peer, - blockNumber=blockNumber.bnStr, len=lhc.headers.len, - newLen=(newLhc.headers.len + lhc.headers.len), isOpportunistic + trace info & " extended chain record", peer, topNumber=topNumber.bnStr, + offset, nLhc=lhc.revHdrs.len - if lhc.headers.len == 0: - lhc.hash = newLhc.hash - lhc.headers = newLhc.headers - else: - lhc.headers = newLhc.headers & lhc.headers - lhc.parentHash = newLhc.parentHash - - when verifyLinkedHChainOk: + when verifyDataStructureOk: lhc.verifyHeaderChainItem info true diff --git a/nimbus/sync/flare/worker_desc.nim b/nimbus/sync/flare/worker_desc.nim index 099f6e66e..49f7e2933 100644 --- a/nimbus/sync/flare/worker_desc.nim +++ b/nimbus/sync/flare/worker_desc.nim @@ -45,6 +45,10 @@ const ## smaller unprocessed slots that mostly all will be served leads to less ## fragmentation on a multi-peer downloading approach. + fetchHeaderReqZombieThreshold* = chronos.seconds(2) + ## Response time allowance. If the response time for the set of headers + ## exceeds this threshold, then this peer will be banned for a while. + nFetchHeadersOpportunisticly* = 8 * nFetchHeadersRequest ## Length of the request/stage batch. Several headers are consecutively ## fetched and stashed together as a single record on the staged queue. @@ -83,16 +87,14 @@ type ## Traversal descriptor LinkedHChain* = object - ## Public block items for the `LinkedHChainQueue` list, indexed by - ## largest block number. + ## Public block items for the `LinkedHChainQueue` list, indexed by the + ## largest block number. The list `revHdrs[]` is reversed, i.e. the largest + ## block number has the least index `0`. This makes it easier to grow the + ## sequence with parent headers, i.e. decreasing block numbers. ## - ## FIXME: `headers[]` should be reversed, i.e. `headers[0]` has the - ## highest block number. This makes it natural to extend the - ## sequence with parent headers at the growing end. - ## - parentHash*: Hash256 ## Parent hash of `headers[0]` - headers*: seq[Blob] ## Encoded linked header chain - hash*: Hash256 ## Hash of `headers[^1]` + hash*: Hash256 ## Hash of `headers[0]` + revHdrs*: seq[Blob] ## Encoded linked header chain + parentHash*: Hash256 ## Parent hash of `headers[^1]` # -------------------