Refactor and optimize sync logs. (#3451)

* Refactor and optimize logs.

* Introduce shortLog(SyncRequest).

* Address review comment.

* make sync queue logs more consistent

Adds a few minor logging improvements:
- Fixes a typo (`was happened` -> `has happened`)
- Avoids passing `reset_slot` argument to log statement multiple times
- Uses same `rewind_to_slot` label when logging in both sync directions
- Consistent rewind point logging

Co-authored-by: cheatfate <eugene.kabanov@status.im>
This commit is contained in:
Etan Kissling 2022-03-03 09:05:33 +01:00 committed by GitHub
parent 3b20d57277
commit 3ffab01b07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 149 additions and 183 deletions

View File

@ -65,6 +65,7 @@ type
avgSyncSpeed*: float avgSyncSpeed*: float
syncStatus*: string syncStatus*: string
direction: SyncQueueKind direction: SyncQueueKind
ident*: string
SyncMoment* = object SyncMoment* = object
stamp*: chronos.Moment stamp*: chronos.Moment
@ -116,7 +117,8 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
blockVerifier: BlockVerifier, blockVerifier: BlockVerifier,
maxHeadAge = uint64(SLOTS_PER_EPOCH * 1), maxHeadAge = uint64(SLOTS_PER_EPOCH * 1),
chunkSize = uint64(SLOTS_PER_EPOCH), chunkSize = uint64(SLOTS_PER_EPOCH),
toleranceValue = uint64(1) toleranceValue = uint64(1),
ident = "main"
): SyncManager[A, B] = ): SyncManager[A, B] =
let (getFirstSlot, getLastSlot, getSafeSlot) = case direction let (getFirstSlot, getLastSlot, getSafeSlot) = case direction
of SyncQueueKind.Forward: of SyncQueueKind.Forward:
@ -137,7 +139,8 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
chunkSize: chunkSize, chunkSize: chunkSize,
blockVerifier: blockVerifier, blockVerifier: blockVerifier,
notInSyncEvent: newAsyncEvent(), notInSyncEvent: newAsyncEvent(),
direction: direction direction: direction,
ident: ident
) )
res.initQueue() res.initQueue()
res res
@ -145,11 +148,16 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
proc getBlocks*[A, B](man: SyncManager[A, B], peer: A, proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
req: SyncRequest): Future[BeaconBlocksRes] {.async.} = req: SyncRequest): Future[BeaconBlocksRes] {.async.} =
mixin beaconBlocksByRange, getScore, `==` mixin beaconBlocksByRange, getScore, `==`
logScope:
peer_score = peer.getScore()
peer_speed = peer.netKbps()
sync_ident = man.ident
direction = man.direction
topics = "syncman"
doAssert(not(req.isEmpty()), "Request must not be empty!") doAssert(not(req.isEmpty()), "Request must not be empty!")
debug "Requesting blocks from peer", peer = peer, debug "Requesting blocks from peer", request = req
slot = req.slot, slot_count = req.count, step = req.step,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
direction = man.direction, topics = "syncman"
try: try:
let res = let res =
if peer.useSyncV2(): if peer.useSyncV2():
@ -160,24 +168,16 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
blcks.mapIt(newClone(ForkedSignedBeaconBlock.init(it)))) blcks.mapIt(newClone(ForkedSignedBeaconBlock.init(it))))
if res.isErr(): if res.isErr():
debug "Error, while reading getBlocks response", debug "Error, while reading getBlocks response", request = req,
peer = peer, slot = req.slot, count = req.count, error = $res.error()
step = req.step, peer_speed = peer.netKbps(),
direction = man.direction, topics = "syncman",
error = $res.error()
return return
return res return res
except CancelledError: except CancelledError:
debug "Interrupt, while waiting getBlocks response", peer = peer, debug "Interrupt, while waiting getBlocks response", request = req
slot = req.slot, slot_count = req.count, step = req.step,
peer_speed = peer.netKbps(), direction = man.direction,
topics = "syncman"
return return
except CatchableError as exc: except CatchableError as exc:
debug "Error, while waiting getBlocks response", peer = peer, debug "Error, while waiting getBlocks response", request = req,
slot = req.slot, slot_count = req.count, step = req.step, errName = exc.name, errMsg = exc.msg
errName = exc.name, errMsg = exc.msg, peer_speed = peer.netKbps(),
direction = man.direction, topics = "syncman"
return return
proc remainingSlots(man: SyncManager): uint64 = proc remainingSlots(man: SyncManager): uint64 =
@ -187,17 +187,25 @@ proc remainingSlots(man: SyncManager): uint64 =
man.getFirstSlot() - man.getLastSlot() man.getFirstSlot() - man.getLastSlot()
proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
logScope:
peer_score = peer.getScore()
peer_speed = peer.netKbps()
index = index
sync_ident = man.ident
topics = "syncman"
var var
headSlot = man.getLocalHeadSlot() headSlot = man.getLocalHeadSlot()
wallSlot = man.getLocalWallSlot() wallSlot = man.getLocalWallSlot()
peerSlot = peer.getHeadSlot() peerSlot = peer.getHeadSlot()
block: # Check that peer status is recent and relevant block: # Check that peer status is recent and relevant
logScope:
peer = peer
direction = man.direction
debug "Peer's syncing status", wall_clock_slot = wallSlot, debug "Peer's syncing status", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot, remote_head_slot = peerSlot, local_head_slot = headSlot
peer_score = peer.getScore(), peer = peer, index = index,
peer_speed = peer.netKbps(), direction = man.direction,
topics = "syncman"
let let
peerStatusAge = Moment.now() - peer.state(BeaconSync).statusLastTime peerStatusAge = Moment.now() - peer.state(BeaconSync).statusLastTime
@ -216,26 +224,19 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
await sleepAsync(StatusExpirationTime div 2 - peerStatusAge) await sleepAsync(StatusExpirationTime div 2 - peerStatusAge)
trace "Updating peer's status information", wall_clock_slot = wallSlot, trace "Updating peer's status information", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot, remote_head_slot = peerSlot, local_head_slot = headSlot
peer = peer, peer_score = peer.getScore(), index = index,
peer_speed = peer.netKbps(), direction = man.direction,
topics = "syncman"
try: try:
let res = await peer.updateStatus() let res = await peer.updateStatus()
if not(res): if not(res):
peer.updateScore(PeerScoreNoStatus) peer.updateScore(PeerScoreNoStatus)
debug "Failed to get remote peer's status, exiting", peer = peer, debug "Failed to get remote peer's status, exiting",
peer_score = peer.getScore(), peer_head_slot = peerSlot, peer_head_slot = peerSlot
peer_speed = peer.netKbps(), index = index,
direction = man.direction, topics = "syncman"
return return
except CatchableError as exc: except CatchableError as exc:
debug "Unexpected exception while updating peer's status", debug "Unexpected exception while updating peer's status",
peer = peer, peer_score = peer.getScore(), peer_head_slot = peerSlot, errName = exc.name, errMsg = exc.msg
peer_head_slot = peerSlot, peer_speed = peer.netKbps(),
index = index, errMsg = exc.msg, direction = man.direction,
topics = "syncman"
return return
let newPeerSlot = peer.getHeadSlot() let newPeerSlot = peer.getHeadSlot()
@ -243,16 +244,11 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
peer.updateScore(PeerScoreStaleStatus) peer.updateScore(PeerScoreStaleStatus)
debug "Peer's status information is stale", debug "Peer's status information is stale",
wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot, wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot,
local_head_slot = headSlot, remote_new_head_slot = newPeerSlot, local_head_slot = headSlot, remote_new_head_slot = newPeerSlot
peer = peer, peer_score = peer.getScore(), index = index,
peer_speed = peer.netKbps(), direction = man.direction,
topics = "syncman"
else: else:
debug "Peer's status information updated", wall_clock_slot = wallSlot, debug "Peer's status information updated", wall_clock_slot = wallSlot,
remote_old_head_slot = peerSlot, local_head_slot = headSlot, remote_old_head_slot = peerSlot, local_head_slot = headSlot,
remote_new_head_slot = newPeerSlot, peer = peer, remote_new_head_slot = newPeerSlot
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
index = index, direction = man.direction, topics = "syncman"
peer.updateScore(PeerScoreGoodStatus) peer.updateScore(PeerScoreGoodStatus)
peerSlot = newPeerSlot peerSlot = newPeerSlot
@ -268,26 +264,21 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
warn "Peer reports a head newer than our wall clock - clock out of sync?", warn "Peer reports a head newer than our wall clock - clock out of sync?",
wall_clock_slot = wallSlot, remote_head_slot = peerSlot, wall_clock_slot = wallSlot, remote_head_slot = peerSlot,
local_head_slot = headSlot, peer = peer, index = index, local_head_slot = headSlot, tolerance_value = man.toleranceValue
tolerance_value = man.toleranceValue, peer_speed = peer.netKbps(),
peer_score = peer.getScore(), direction = man.direction,
topics = "syncman"
return return
if man.remainingSlots() <= man.maxHeadAge: if man.remainingSlots() <= man.maxHeadAge:
logScope:
peer = peer
direction = man.direction
case man.direction case man.direction
of SyncQueueKind.Forward: of SyncQueueKind.Forward:
info "We are in sync with network", wall_clock_slot = wallSlot, info "We are in sync with network", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot, remote_head_slot = peerSlot, local_head_slot = headSlot
peer = peer, peer_score = peer.getScore(), index = index,
peer_speed = peer.netKbps(), direction = man.direction,
topics = "syncman"
of SyncQueueKind.Backward: of SyncQueueKind.Backward:
info "Backfill complete", wall_clock_slot = wallSlot, info "Backfill complete", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot, remote_head_slot = peerSlot, local_head_slot = headSlot
peer = peer, peer_score = peer.getScore(), index = index,
peer_speed = peer.netKbps(), direction = man.direction,
topics = "syncman"
# We clear SyncManager's `notInSyncEvent` so all the workers will become # We clear SyncManager's `notInSyncEvent` so all the workers will become
# sleeping soon. # sleeping soon.
@ -307,13 +298,11 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
# Right now we decreasing peer's score a bit, so it will not be # Right now we decreasing peer's score a bit, so it will not be
# disconnected due to low peer's score, but new fresh peers could replace # disconnected due to low peer's score, but new fresh peers could replace
# peers with low latest head. # peers with low latest head.
debug "Peer's head slot is lower then local head slot", debug "Peer's head slot is lower then local head slot", peer = peer,
wall_clock_slot = wallSlot, remote_head_slot = peerSlot, wall_clock_slot = wallSlot, remote_head_slot = peerSlot,
local_last_slot = man.getLastSlot(), local_last_slot = man.getLastSlot(),
local_first_slot = man.getFirstSlot(), peer = peer, local_first_slot = man.getFirstSlot(),
peer_score = peer.getScore(), direction = man.direction
peer_speed = peer.netKbps(), index = index,
direction = man.direction, topics = "syncman"
peer.updateScore(PeerScoreUseless) peer.updateScore(PeerScoreUseless)
return return
@ -336,18 +325,13 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
local_head_slot = headSlot, remote_head_slot = peerSlot, local_head_slot = headSlot, remote_head_slot = peerSlot,
queue_input_slot = man.queue.inpSlot, queue_input_slot = man.queue.inpSlot,
queue_output_slot = man.queue.outSlot, queue_output_slot = man.queue.outSlot,
queue_last_slot = man.queue.finalSlot, queue_last_slot = man.queue.finalSlot, direction = man.direction
peer_speed = peer.netKbps(), peer_score = peer.getScore(),
index = index, direction = man.direction, topics = "syncman"
await sleepAsync(RESP_TIMEOUT) await sleepAsync(RESP_TIMEOUT)
return return
debug "Creating new request for peer", wall_clock_slot = wallSlot, debug "Creating new request for peer", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot, remote_head_slot = peerSlot, local_head_slot = headSlot,
request_slot = req.slot, request_count = req.count, request = req
request_step = req.step, peer = peer, peer_speed = peer.netKbps(),
peer_score = peer.getScore(), index = index,
direction = man.direction, topics = "syncman"
man.workers[index].status = SyncWorkerStatus.Downloading man.workers[index].status = SyncWorkerStatus.Downloading
@ -357,20 +341,13 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
let data = blocks.get() let data = blocks.get()
let smap = getShortMap(req, data) let smap = getShortMap(req, data)
debug "Received blocks on request", blocks_count = len(data), debug "Received blocks on request", blocks_count = len(data),
blocks_map = smap, request_slot = req.slot, blocks_map = smap, request = req
request_count = req.count, request_step = req.step,
peer = peer, peer_score = peer.getScore(),
peer_speed = peer.netKbps(), index = index,
direction = man.direction, topics = "syncman"
if not(checkResponse(req, data)): if not(checkResponse(req, data)):
peer.updateScore(PeerScoreBadResponse) peer.updateScore(PeerScoreBadResponse)
warn "Received blocks sequence is not in requested range", warn "Received blocks sequence is not in requested range",
blocks_count = len(data), blocks_map = smap, blocks_count = len(data), blocks_map = smap,
request_slot = req.slot, request_count = req.count, request = req
request_step = req.step, peer = peer,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
index = index, direction = man.direction, topics = "syncman"
return return
# Scoring will happen in `syncUpdate`. # Scoring will happen in `syncUpdate`.
@ -380,27 +357,24 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
else: else:
peer.updateScore(PeerScoreNoBlocks) peer.updateScore(PeerScoreNoBlocks)
man.queue.push(req) man.queue.push(req)
debug "Failed to receive blocks on request", debug "Failed to receive blocks on request", request = req
request_slot = req.slot, request_count = req.count,
request_step = req.step, peer = peer, index = index,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
direction = man.direction, topics = "syncman"
return return
except CatchableError as exc: except CatchableError as exc:
debug "Unexpected exception while receiving blocks", debug "Unexpected exception while receiving blocks", request = req,
request_slot = req.slot, request_count = req.count, errName = exc.name, errMsg = exc.msg
request_step = req.step, peer = peer, index = index,
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
errName = exc.name, errMsg = exc.msg, direction = man.direction,
topics = "syncman"
return return
proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} = proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} =
mixin getKey, getScore, getHeadSlot mixin getKey, getScore, getHeadSlot
debug "Starting syncing worker", index = index, direction = man.direction, logScope:
topics = "syncman" index = index
sync_ident = man.ident
direction = man.direction
topics = "syncman"
debug "Starting syncing worker"
while true: while true:
var peer: A = nil var peer: A = nil
@ -420,16 +394,14 @@ proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} =
true true
except CatchableError as exc: except CatchableError as exc:
debug "Unexpected exception in sync worker", debug "Unexpected exception in sync worker",
peer = peer, index = index, peer = peer, peer_score = peer.getScore(),
peer_score = peer.getScore(), peer_speed = peer.netKbps(), peer_speed = peer.netKbps(),
errName = exc.name, errMsg = exc.msg, direction = man.direction, errName = exc.name, errMsg = exc.msg
topics = "syncman"
true true
if doBreak: if doBreak:
break break
debug "Sync worker stopped", index = index, direction = man.direction, debug "Sync worker stopped"
topics = "syncman"
proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string, proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string,
sleeping: int, sleeping: int,
@ -465,6 +437,12 @@ proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string,
(map, sleeping, waiting, pending) (map, sleeping, waiting, pending)
proc guardTask[A, B](man: SyncManager[A, B]) {.async.} = proc guardTask[A, B](man: SyncManager[A, B]) {.async.} =
logScope:
index = index
sync_ident = man.ident
direction = man.direction
topics = "syncman"
var pending: array[SyncWorkersCount, Future[void]] var pending: array[SyncWorkersCount, Future[void]]
# Starting all the synchronization workers. # Starting all the synchronization workers.
@ -479,11 +457,9 @@ proc guardTask[A, B](man: SyncManager[A, B]) {.async.} =
let index = pending.find(failFuture) let index = pending.find(failFuture)
if failFuture.failed(): if failFuture.failed():
warn "Synchronization worker stopped working unexpectedly with an error", warn "Synchronization worker stopped working unexpectedly with an error",
index = index, errMsg = failFuture.error.msg, errName = failFuture.error.name, errMsg = failFuture.error.msg
direction = man.direction
else: else:
warn "Synchronization worker stopped working unexpectedly without error", warn "Synchronization worker stopped working unexpectedly without error"
index = index, direction = man.direction
let future = syncWorker[A, B](man, index) let future = syncWorker[A, B](man, index)
man.workers[index].future = future man.workers[index].future = future
@ -516,13 +492,17 @@ proc toTimeLeftString*(d: Duration): string =
res res
proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
logScope:
sync_ident = man.ident
direction = man.direction
topics = "syncman"
mixin getKey, getScore mixin getKey, getScore
var pauseTime = 0 var pauseTime = 0
var guardTaskFut = man.guardTask() var guardTaskFut = man.guardTask()
debug "Synchronization loop started", topics = "syncman", debug "Synchronization loop started"
direction = man.direction
proc averageSpeedTask() {.async.} = proc averageSpeedTask() {.async.} =
while true: while true:
@ -568,8 +548,7 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
pending_workers_count = pending, pending_workers_count = pending,
wall_head_slot = wallSlot, local_head_slot = headSlot, wall_head_slot = wallSlot, local_head_slot = headSlot,
pause_time = $chronos.seconds(pauseTime), pause_time = $chronos.seconds(pauseTime),
avg_sync_speed = man.avgSyncSpeed, ins_sync_speed = man.insSyncSpeed, avg_sync_speed = man.avgSyncSpeed, ins_sync_speed = man.insSyncSpeed
direction = man.direction, topics = "syncman"
let let
pivot = man.progressPivot pivot = man.progressPivot
@ -607,8 +586,7 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
wall_head_slot = wallSlot, local_head_slot = headSlot, wall_head_slot = wallSlot, local_head_slot = headSlot,
difference = (wallSlot - headSlot), max_head_age = man.maxHeadAge, difference = (wallSlot - headSlot), max_head_age = man.maxHeadAge,
sleeping_workers_count = sleeping, sleeping_workers_count = sleeping,
waiting_workers_count = waiting, pending_workers_count = pending, waiting_workers_count = waiting, pending_workers_count = pending
direction = man.direction, topics = "syncman"
# We already synced, so we should reset all the pending workers from # We already synced, so we should reset all the pending workers from
# any state they have. # any state they have.
man.queue.clearAndWakeup() man.queue.clearAndWakeup()
@ -621,14 +599,12 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
debug "Forward synchronization process finished, sleeping", debug "Forward synchronization process finished, sleeping",
wall_head_slot = wallSlot, local_head_slot = headSlot, wall_head_slot = wallSlot, local_head_slot = headSlot,
difference = (wallSlot - headSlot), difference = (wallSlot - headSlot),
max_head_age = man.maxHeadAge, direction = man.direction, max_head_age = man.maxHeadAge
topics = "syncman"
else: else:
debug "Synchronization loop sleeping", wall_head_slot = wallSlot, debug "Synchronization loop sleeping", wall_head_slot = wallSlot,
local_head_slot = headSlot, local_head_slot = headSlot,
difference = (wallSlot - headSlot), difference = (wallSlot - headSlot),
max_head_age = man.maxHeadAge, direction = man.direction, max_head_age = man.maxHeadAge
topics = "syncman"
of SyncQueueKind.Backward: of SyncQueueKind.Backward:
# Backward syncing is going to be executed only once, so we exit loop # Backward syncing is going to be executed only once, so we exit loop
# and stop all pending tasks which belongs to this instance (sync # and stop all pending tasks which belongs to this instance (sync
@ -652,8 +628,7 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
debug "Backward synchronization process finished, exiting", debug "Backward synchronization process finished, exiting",
wall_head_slot = wallSlot, local_head_slot = headSlot, wall_head_slot = wallSlot, local_head_slot = headSlot,
backfill_slot = man.getLastSlot(), backfill_slot = man.getLastSlot(),
max_head_age = man.maxHeadAge, direction = man.direction, max_head_age = man.maxHeadAge
topics = "syncman"
break break
else: else:
if not(man.notInSyncEvent.isSet()): if not(man.notInSyncEvent.isSet()):
@ -666,8 +641,7 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
period = man.maxHeadAge, wall_head_slot = wallSlot, period = man.maxHeadAge, wall_head_slot = wallSlot,
local_head_slot = headSlot, local_head_slot = headSlot,
missing_slots = man.remainingSlots(), missing_slots = man.remainingSlots(),
progress = float(man.queue.progress()), progress = float(man.queue.progress())
topics = "syncman"
else: else:
man.notInSyncEvent.fire() man.notInSyncEvent.fire()
man.inProgress = true man.inProgress = true

View File

@ -8,7 +8,7 @@
{.push raises: [Defect].} {.push raises: [Defect].}
import std/[options, heapqueue, tables, strutils, sequtils, math, algorithm] import std/[options, heapqueue, tables, strutils, sequtils, math, algorithm]
import stew/results, chronos, chronicles import stew/[results, base10], chronos, chronicles
import import
../spec/datatypes/[base, phase0, altair], ../spec/datatypes/[base, phase0, altair],
../spec/eth2_apis/rpc_types, ../spec/eth2_apis/rpc_types,
@ -34,7 +34,7 @@ type
Forward, Backward Forward, Backward
SyncRequest*[T] = object SyncRequest*[T] = object
kind: SyncQueueKind kind*: SyncQueueKind
index*: uint64 index*: uint64
slot*: Slot slot*: Slot
count*: uint64 count*: uint64
@ -70,11 +70,22 @@ type
readyQueue: HeapQueue[SyncResult[T]] readyQueue: HeapQueue[SyncResult[T]]
rewind: Option[RewindPoint] rewind: Option[RewindPoint]
blockVerifier: BlockVerifier blockVerifier: BlockVerifier
ident*: string
SyncManagerError* = object of CatchableError SyncManagerError* = object of CatchableError
BeaconBlocksRes* = NetRes[seq[ref ForkedSignedBeaconBlock]] BeaconBlocksRes* = NetRes[seq[ref ForkedSignedBeaconBlock]]
chronicles.formatIt SyncQueueKind: $it chronicles.formatIt SyncQueueKind: toLowerAscii($it)
template shortLog*[T](req: SyncRequest[T]): string =
Base10.toString(uint64(req.slot)) & ":" &
Base10.toString(req.count) & "@" &
Base10.toString(req.index)
chronicles.expandIt SyncRequest:
`it` = shortLog(it)
peer = shortLog(it.item)
direction = toLowerAscii($it.kind)
proc getShortMap*[T](req: SyncRequest[T], proc getShortMap*[T](req: SyncRequest[T],
data: openArray[ref ForkedSignedBeaconBlock]): string = data: openArray[ref ForkedSignedBeaconBlock]): string =
@ -168,7 +179,8 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
start, final: Slot, chunkSize: uint64, start, final: Slot, chunkSize: uint64,
getSafeSlotCb: GetSlotCallback, getSafeSlotCb: GetSlotCallback,
blockVerifier: BlockVerifier, blockVerifier: BlockVerifier,
syncQueueSize: int = -1): SyncQueue[T] = syncQueueSize: int = -1,
ident: string = "main"): SyncQueue[T] =
## Create new synchronization queue with parameters ## Create new synchronization queue with parameters
## ##
## ``start`` and ``last`` are starting and finishing Slots. ## ``start`` and ``last`` are starting and finishing Slots.
@ -230,7 +242,8 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
debtsQueue: initHeapQueue[SyncRequest[T]](), debtsQueue: initHeapQueue[SyncRequest[T]](),
inpSlot: start, inpSlot: start,
outSlot: start, outSlot: start,
blockVerifier: blockVerifier blockVerifier: blockVerifier,
ident: ident
) )
proc `<`*[T](a, b: SyncRequest[T]): bool = proc `<`*[T](a, b: SyncRequest[T]): bool =
@ -379,6 +392,11 @@ proc toDebtsQueue[T](sq: SyncQueue[T], sr: SyncRequest[T]) =
proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot, proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot,
safeSlot: Slot): Slot = safeSlot: Slot): Slot =
logScope:
sync_ident = sq.ident
direction = sq.kind
topics = "syncman"
case sq.kind case sq.kind
of SyncQueueKind.Forward: of SyncQueueKind.Forward:
# Calculate the latest finalized epoch. # Calculate the latest finalized epoch.
@ -413,8 +431,7 @@ proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot,
finalized_slot = safeSlot, fail_slot = failSlot, finalized_slot = safeSlot, fail_slot = failSlot,
finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, finalized_epoch = finalizedEpoch, fail_epoch = failEpoch,
rewind_epoch_count = rewind.epochCount, rewind_epoch_count = rewind.epochCount,
finalized_epoch = finalizedEpoch, direction = sq.kind, finalized_epoch = finalizedEpoch
topics = "syncman"
0'u64 0'u64
else: else:
# `MissingParent` happened at different slot so we going to rewind for # `MissingParent` happened at different slot so we going to rewind for
@ -424,8 +441,7 @@ proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot,
finalized_slot = safeSlot, fail_slot = failSlot, finalized_slot = safeSlot, fail_slot = failSlot,
finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, finalized_epoch = finalizedEpoch, fail_epoch = failEpoch,
rewind_epoch_count = rewind.epochCount, rewind_epoch_count = rewind.epochCount,
finalized_epoch = finalizedEpoch, direction = sq.kind, finalized_epoch = finalizedEpoch
topics = "syncman"
0'u64 0'u64
else: else:
1'u64 1'u64
@ -435,8 +451,7 @@ proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot,
warn "Сould not rewind further than the last finalized epoch", warn "Сould not rewind further than the last finalized epoch",
finalized_slot = safeSlot, fail_slot = failSlot, finalized_slot = safeSlot, fail_slot = failSlot,
finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, finalized_epoch = finalizedEpoch, fail_epoch = failEpoch,
finalized_epoch = finalizedEpoch, direction = sq.kind, finalized_epoch = finalizedEpoch
topics = "syncman"
0'u64 0'u64
else: else:
1'u64 1'u64
@ -445,8 +460,7 @@ proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot,
warn "Unable to continue syncing, please restart the node", warn "Unable to continue syncing, please restart the node",
finalized_slot = safeSlot, fail_slot = failSlot, finalized_slot = safeSlot, fail_slot = failSlot,
finalized_epoch = finalizedEpoch, fail_epoch = failEpoch, finalized_epoch = finalizedEpoch, fail_epoch = failEpoch,
finalized_epoch = finalizedEpoch, direction = sq.kind, finalized_epoch = finalizedEpoch
topics = "syncman"
# Calculate the rewind epoch, which will be equal to last rewind point or # Calculate the rewind epoch, which will be equal to last rewind point or
# finalizedEpoch # finalizedEpoch
let rewindEpoch = let rewindEpoch =
@ -467,8 +481,7 @@ proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot,
# latest stored block. # latest stored block.
if failSlot == safeSlot: if failSlot == safeSlot:
warn "Unable to continue syncing, please restart the node", warn "Unable to continue syncing, please restart the node",
safe_slot = safeSlot, fail_slot = failSlot, direction = sq.kind, safe_slot = safeSlot, fail_slot = failSlot
topics = "syncman"
safeSlot safeSlot
iterator blocks*[T](sq: SyncQueue[T], iterator blocks*[T](sq: SyncQueue[T],
@ -505,6 +518,10 @@ proc notInRange[T](sq: SyncQueue[T], sr: SyncRequest[T]): bool =
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
data: seq[ref ForkedSignedBeaconBlock], data: seq[ref ForkedSignedBeaconBlock],
processingCb: ProcessingCallback = nil) {.async.} = processingCb: ProcessingCallback = nil) {.async.} =
logScope:
sync_ident = sq.ident
topics = "syncman"
## Push successful result to queue ``sq``. ## Push successful result to queue ``sq``.
mixin updateScore mixin updateScore
@ -553,15 +570,11 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
else: else:
let rewindSlot = sq.getRewindPoint(sq.outSlot, sq.getSafeSlot()) let rewindSlot = sq.getRewindPoint(sq.outSlot, sq.getSafeSlot())
warn "Got incorrect sync result in queue, rewind happens", warn "Got incorrect sync result in queue, rewind happens",
request_slot = sq.readyQueue[0].request.slot,
request_count = sq.readyQueue[0].request.count,
request_step = sq.readyQueue[0].request.step,
blocks_map = getShortMap(sq.readyQueue[0].request, blocks_map = getShortMap(sq.readyQueue[0].request,
sq.readyQueue[0].data), sq.readyQueue[0].data),
blocks_count = len(sq.readyQueue[0].data), blocks_count = len(sq.readyQueue[0].data),
output_slot = sq.outSlot, input_slot = sq.inpSlot, output_slot = sq.outSlot, input_slot = sq.inpSlot,
peer = sq.readyQueue[0].request.item, rewind_to_slot = rewindSlot, rewind_to_slot = rewindSlot, request = sq.readyQueue[0].request
direction = sq.readyQueue[0].request.kind, topics = "syncman"
await sq.resetWait(some(rewindSlot)) await sq.resetWait(some(rewindSlot))
break break
@ -601,11 +614,9 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
hasInvalidBlock = true hasInvalidBlock = true
let req = item.request let req = item.request
warn "Received invalid sequence of blocks", peer = req.item, warn "Received invalid sequence of blocks", request = req,
request_slot = req.slot, request_count = req.count, blocks_count = len(item.data),
request_step = req.step, blocks_count = len(item.data), blocks_map = getShortMap(req, item.data)
blocks_map = getShortMap(req, item.data),
direction = req.kind, topics = "syncman"
req.item.updateScore(PeerScoreBadBlocks) req.item.updateScore(PeerScoreBadBlocks)
break break
@ -623,29 +634,22 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
sq.wakeupWaiters() sq.wakeupWaiters()
else: else:
debug "Block pool rejected peer's response", peer = item.request.item, debug "Block pool rejected peer's response", request = item.request,
request_slot = item.request.slot,
request_count = item.request.count,
request_step = item.request.step,
blocks_map = getShortMap(item.request, item.data), blocks_map = getShortMap(item.request, item.data),
blocks_count = len(item.data), blocks_count = len(item.data),
ok = hasOkBlock, ok = hasOkBlock,
unviable = unviableBlock.isSome(), unviable = unviableBlock.isSome(),
missing_parent = missingParentSlot.isSome(), missing_parent = missingParentSlot.isSome()
direction = item.request.kind, topics = "syncman"
# We need to move failed response to the debts queue. # We need to move failed response to the debts queue.
sq.toDebtsQueue(item.request) sq.toDebtsQueue(item.request)
if unviableBlock.isSome: if unviableBlock.isSome:
let req = item.request let req = item.request
notice "Received blocks from an unviable fork", notice "Received blocks from an unviable fork", request = req,
blockRoot = unviableBlock.get()[0], blockRoot = unviableBlock.get()[0],
blockSlot = unviableBlock.get()[1], peer = req.item, blockSlot = unviableBlock.get()[1],
request_slot = req.slot, request_count = req.count, blocks_count = len(item.data),
request_step = req.step, blocks_count = len(item.data), blocks_map = getShortMap(req, item.data)
blocks_map = getShortMap(req, item.data),
direction = req.kind, topics = "syncman"
req.item.updateScore(PeerScoreUnviableFork) req.item.updateScore(PeerScoreUnviableFork)
if missingParentSlot.isSome: if missingParentSlot.isSome:
@ -664,60 +668,45 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
if safeSlot < req.slot: if safeSlot < req.slot:
let rewindSlot = sq.getRewindPoint(failSlot, safeSlot) let rewindSlot = sq.getRewindPoint(failSlot, safeSlot)
warn "Unexpected missing parent, rewind happens", warn "Unexpected missing parent, rewind happens",
peer = req.item, rewind_to_slot = rewindSlot, request = req, rewind_to_slot = rewindSlot,
rewind_epoch_count = sq.rewind.get().epochCount, rewind_point = sq.rewind, finalized_slot = safeSlot,
rewind_fail_slot = failSlot, blocks_count = len(item.data),
finalized_slot = safeSlot, blocks_map = getShortMap(req, item.data)
request_slot = req.slot, request_count = req.count,
request_step = req.step, blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data),
direction = req.kind, topics = "syncman"
resetSlot = some(rewindSlot) resetSlot = some(rewindSlot)
req.item.updateScore(PeerScoreMissingBlocks) req.item.updateScore(PeerScoreMissingBlocks)
else: else:
error "Unexpected missing parent at finalized epoch slot", error "Unexpected missing parent at finalized epoch slot",
peer = req.item, to_slot = safeSlot, request = req, rewind_to_slot = safeSlot,
request_slot = req.slot, request_count = req.count, blocks_count = len(item.data),
request_step = req.step, blocks_count = len(item.data), blocks_map = getShortMap(req, item.data)
blocks_map = getShortMap(req, item.data),
direction = req.kind, topics = "syncman"
req.item.updateScore(PeerScoreBadBlocks) req.item.updateScore(PeerScoreBadBlocks)
of SyncQueueKind.Backward: of SyncQueueKind.Backward:
if safeSlot > req.slot: if safeSlot > req.slot:
let rewindSlot = sq.getRewindPoint(failSlot, safeSlot) let rewindSlot = sq.getRewindPoint(failSlot, safeSlot)
# It's quite common peers give us fewer blocks than we ask for # It's quite common peers give us fewer blocks than we ask for
info "Gap in block range response, rewinding", info "Gap in block range response, rewinding", request = req,
peer = req.item, rewind_to_slot = rewindSlot, rewind_to_slot = rewindSlot, rewind_fail_slot = failSlot,
rewind_fail_slot = failSlot, finalized_slot = safeSlot, blocks_count = len(item.data),
finalized_slot = safeSlot, blocks_map = getShortMap(req, item.data)
request_slot = req.slot, request_count = req.count,
request_step = req.step, blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data),
direction = req.kind, topics = "syncman"
resetSlot = some(rewindSlot) resetSlot = some(rewindSlot)
req.item.updateScore(PeerScoreMissingBlocks) req.item.updateScore(PeerScoreMissingBlocks)
else: else:
error "Unexpected missing parent at safe slot", error "Unexpected missing parent at safe slot", request = req,
peer = req.item, to_slot = safeSlot, to_slot = safeSlot, blocks_count = len(item.data),
request_slot = req.slot, request_count = req.count, blocks_map = getShortMap(req, item.data)
request_step = req.step, blocks_count = len(item.data),
blocks_map = getShortMap(req, item.data),
direction = req.kind, topics = "syncman"
req.item.updateScore(PeerScoreBadBlocks) req.item.updateScore(PeerScoreBadBlocks)
if resetSlot.isSome(): if resetSlot.isSome():
await sq.resetWait(resetSlot) await sq.resetWait(resetSlot)
case sq.kind case sq.kind
of SyncQueueKind.Forward: of SyncQueueKind.Forward:
debug "Rewind to slot was happened", reset_slot = reset_slot.get(), debug "Rewind to slot has happened", reset_slot = resetSlot.get(),
queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot, queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot,
rewind_epoch_count = sq.rewind.get().epochCount, rewind_point = sq.rewind, direction = sq.kind
rewind_fail_slot = sq.rewind.get().failSlot,
reset_slot = resetSlot, direction = sq.kind, topics = "syncman"
of SyncQueueKind.Backward: of SyncQueueKind.Backward:
debug "Rewind to slot was happened", reset_slot = reset_slot.get(), debug "Rewind to slot has happened", reset_slot = resetSlot.get(),
queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot, queue_input_slot = sq.inpSlot, queue_output_slot = sq.outSlot,
reset_slot = resetSlot, direction = sq.kind, topics = "syncman" direction = sq.kind
break break

View File

@ -14,6 +14,9 @@ type
proc `$`(peer: SomeTPeer): string = proc `$`(peer: SomeTPeer): string =
"SomeTPeer" "SomeTPeer"
template shortLog(peer: SomeTPeer): string =
$peer
proc updateScore(peer: SomeTPeer, score: int) = proc updateScore(peer: SomeTPeer, score: int) =
discard discard