Fix SyncManager backward syncing step algorithm.
This commit is contained in:
parent
52fc646fbd
commit
f35c55e590
|
@ -174,7 +174,243 @@ template headAge(): uint64 =
|
||||||
template peerStatusAge(): Duration =
|
template peerStatusAge(): Duration =
|
||||||
Moment.now() - peer.state(BeaconSync).statusLastTime
|
Moment.now() - peer.state(BeaconSync).statusLastTime
|
||||||
|
|
||||||
proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
|
proc syncBackwardStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.
|
||||||
|
async.} =
|
||||||
|
let wallSlot = man.getLocalWallSlot()
|
||||||
|
let headSlot = man.getLocalHeadSlot()
|
||||||
|
var peerSlot = peer.getHeadSlot()
|
||||||
|
|
||||||
|
debug "Peer's syncing status", wall_clock_slot = wallSlot,
|
||||||
|
remote_head_slot = peerSlot, local_head_slot = headSlot,
|
||||||
|
peer_score = peer.getScore(), peer = peer, index = index,
|
||||||
|
peer_speed = peer.netKbps(), direction = man.direction,
|
||||||
|
topics = "syncman"
|
||||||
|
|
||||||
|
# Check if peer's head slot is bigger than our wall clock slot.
|
||||||
|
if peerSlot > wallSlot + man.toleranceValue:
|
||||||
|
peer.updateScore(PeerScoreHeadTooNew)
|
||||||
|
|
||||||
|
warn "Peer reports a head newer than our wall clock - clock out of sync?",
|
||||||
|
wall_clock_slot = wallSlot, remote_head_slot = peerSlot,
|
||||||
|
local_head_slot = headSlot, peer = peer, index = index,
|
||||||
|
tolerance_value = man.toleranceValue, peer_speed = peer.netKbps(),
|
||||||
|
peer_score = peer.getScore(), direction = man.direction,
|
||||||
|
topics = "syncman"
|
||||||
|
return
|
||||||
|
|
||||||
|
# Check if we need to update peer's status information
|
||||||
|
if peerStatusAge >= StatusExpirationTime:
|
||||||
|
# Peer's status information is very old, its time to update it
|
||||||
|
man.workers[index].status = SyncWorkerStatus.UpdatingStatus
|
||||||
|
trace "Updating peer's status information", wall_clock_slot = wallSlot,
|
||||||
|
remote_head_slot = peerSlot, local_head_slot = headSlot,
|
||||||
|
peer = peer, peer_score = peer.getScore(), index = index,
|
||||||
|
peer_speed = peer.netKbps(), direction = man.direction,
|
||||||
|
topics = "syncman"
|
||||||
|
|
||||||
|
try:
|
||||||
|
let res = await peer.updateStatus()
|
||||||
|
if not(res):
|
||||||
|
peer.updateScore(PeerScoreNoStatus)
|
||||||
|
debug "Failed to get remote peer's status, exiting", peer = peer,
|
||||||
|
peer_score = peer.getScore(), peer_head_slot = peerSlot,
|
||||||
|
peer_speed = peer.netKbps(), index = index,
|
||||||
|
direction = man.direction, topics = "syncman"
|
||||||
|
return
|
||||||
|
except CatchableError as exc:
|
||||||
|
debug "Unexpected exception while updating peer's status",
|
||||||
|
peer = peer, peer_score = peer.getScore(),
|
||||||
|
peer_head_slot = peerSlot, peer_speed = peer.netKbps(),
|
||||||
|
index = index, errMsg = exc.msg, direction = man.direction,
|
||||||
|
topics = "syncman"
|
||||||
|
return
|
||||||
|
|
||||||
|
let newPeerSlot = peer.getHeadSlot()
|
||||||
|
if peerSlot >= newPeerSlot:
|
||||||
|
peer.updateScore(PeerScoreStaleStatus)
|
||||||
|
debug "Peer's status information is stale",
|
||||||
|
wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot,
|
||||||
|
local_head_slot = headSlot, remote_new_head_slot = newPeerSlot,
|
||||||
|
peer = peer, peer_score = peer.getScore(), index = index,
|
||||||
|
peer_speed = peer.netKbps(), direction = man.direction,
|
||||||
|
topics = "syncman"
|
||||||
|
else:
|
||||||
|
debug "Peer's status information updated", wall_clock_slot = wallSlot,
|
||||||
|
remote_old_head_slot = peerSlot, local_head_slot = headSlot,
|
||||||
|
remote_new_head_slot = newPeerSlot, peer = peer,
|
||||||
|
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
|
||||||
|
index = index, direction = man.direction, topics = "syncman"
|
||||||
|
peer.updateScore(PeerScoreGoodStatus)
|
||||||
|
peerSlot = newPeerSlot
|
||||||
|
|
||||||
|
if headAge <= man.maxHeadAge:
|
||||||
|
info "We are in sync with network", wall_clock_slot = wallSlot,
|
||||||
|
remote_head_slot = peerSlot, local_head_slot = headSlot,
|
||||||
|
peer = peer, peer_score = peer.getScore(), index = index,
|
||||||
|
peer_speed = peer.netKbps(), topics = "syncman"
|
||||||
|
# We clear SyncManager's `notInSyncEvent` so all the workers will become
|
||||||
|
# sleeping soon.
|
||||||
|
man.notInSyncEvent.clear()
|
||||||
|
return
|
||||||
|
|
||||||
|
let firstSlot = man.getFirstSlot()
|
||||||
|
|
||||||
|
if firstSlot == man.getLastSlot():
|
||||||
|
info "We are in sync with network", wall_clock_slot = wallSlot,
|
||||||
|
remote_head_slot = peerSlot, local_head_slot = headSlot,
|
||||||
|
peer = peer, peer_score = peer.getScore(), index = index,
|
||||||
|
first_slot = firstSlot, peer_speed = peer.netKbps(),
|
||||||
|
topics = "syncman"
|
||||||
|
# We clear SyncManager's `notInSyncEvent` so all the workers will become
|
||||||
|
# sleeping soon.
|
||||||
|
man.notInSyncEvent.clear()
|
||||||
|
return
|
||||||
|
|
||||||
|
if firstSlot > peerSlot:
|
||||||
|
debug "Peer's head slot is low; refreshing peer's status information",
|
||||||
|
wall_clock_slot = wallSlot, remote_head_slot = peerSlot,
|
||||||
|
local_head_slot = headSlot, first_slot = firstSlot,
|
||||||
|
peer = peer, peer_score = peer.getScore(), index = index,
|
||||||
|
peer_speed = peer.netKbps(), direction = man.direction,
|
||||||
|
topics = "syncman"
|
||||||
|
|
||||||
|
man.workers[index].status = SyncWorkerStatus.UpdatingStatus
|
||||||
|
|
||||||
|
if peerStatusAge <= StatusUpdateInterval:
|
||||||
|
await sleepAsync(StatusUpdateInterval - peerStatusAge)
|
||||||
|
|
||||||
|
try:
|
||||||
|
let res = await peer.updateStatus()
|
||||||
|
if not(res):
|
||||||
|
peer.updateScore(PeerScoreNoStatus)
|
||||||
|
debug "Failed to get remote peer's status, exiting", peer = peer,
|
||||||
|
peer_score = peer.getScore(), peer_head_slot = peerSlot,
|
||||||
|
peer_speed = peer.netKbps(), index = index,
|
||||||
|
direction = man.direction, topics = "syncman"
|
||||||
|
return
|
||||||
|
except CatchableError as exc:
|
||||||
|
debug "Unexpected exception while updating peer's status",
|
||||||
|
peer = peer, peer_score = peer.getScore(),
|
||||||
|
peer_head_slot = peerSlot, peer_speed = peer.netKbps(),
|
||||||
|
index = index, errMsg = exc.msg, direction = man.direction,
|
||||||
|
topics = "syncman"
|
||||||
|
return
|
||||||
|
|
||||||
|
let newPeerSlot = peer.getHeadSlot()
|
||||||
|
if peerSlot >= newPeerSlot:
|
||||||
|
peer.updateScore(PeerScoreStaleStatus)
|
||||||
|
debug "Peer's status information is stale",
|
||||||
|
wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot,
|
||||||
|
local_head_slot = headSlot, remote_new_head_slot = newPeerSlot,
|
||||||
|
peer = peer, peer_score = peer.getScore(), index = index,
|
||||||
|
peer_speed = peer.netKbps(), direction = man.direction,
|
||||||
|
topics = "syncman"
|
||||||
|
else:
|
||||||
|
# This is not very good solution because we should not discriminate and/or
|
||||||
|
# penalize peers which are in sync process too, but their latest head is
|
||||||
|
# lower then our latest head. We should keep connections with such peers
|
||||||
|
# (so this peers are able to get in sync using our data), but we should
|
||||||
|
# not use this peers for syncing because this peers are useless for us.
|
||||||
|
# Right now we decreasing peer's score a bit, so it will not be
|
||||||
|
# disconnected due to low peer's score, but new fresh peers could replace
|
||||||
|
# peers with low latest head.
|
||||||
|
if firstSlot > newPeerSlot:
|
||||||
|
# Peer's head slot is still lower then needed.
|
||||||
|
debug "Peer's head slot is lower then local first slot",
|
||||||
|
wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot,
|
||||||
|
local_head_slot = headSlot, remote_new_head_slot = newPeerSlot,
|
||||||
|
first_slot = firstSlot, peer = peer,
|
||||||
|
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
|
||||||
|
index = index, direction = man.direction, topics = "syncman"
|
||||||
|
peer.updateScore(PeerScoreUseless)
|
||||||
|
else:
|
||||||
|
debug "Peer's status information updated", wall_clock_slot = wallSlot,
|
||||||
|
remote_old_head_slot = peerSlot, local_head_slot = headSlot,
|
||||||
|
remote_new_head_slot = newPeerSlot, peer = peer,
|
||||||
|
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
|
||||||
|
index = index, direction = man.direction, topics = "syncman"
|
||||||
|
peer.updateScore(PeerScoreGoodStatus)
|
||||||
|
peerSlot = newPeerSlot
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
man.workers[index].status = SyncWorkerStatus.Requesting
|
||||||
|
|
||||||
|
let req = man.queue.pop(peerSlot, peer)
|
||||||
|
if req.isEmpty():
|
||||||
|
# SyncQueue could return empty request in 2 cases:
|
||||||
|
# 1. There no more slots in SyncQueue to download (we are synced, but
|
||||||
|
# our ``notInSyncEvent`` is not yet cleared).
|
||||||
|
# 2. Current peer's known head slot is too low to satisfy request.
|
||||||
|
#
|
||||||
|
# To avoid endless loop we going to wait for RESP_TIMEOUT time here.
|
||||||
|
# This time is enough for all pending requests to finish and it is also
|
||||||
|
# enough for main sync loop to clear ``notInSyncEvent``.
|
||||||
|
debug "Empty request received from queue, exiting", peer = peer,
|
||||||
|
local_head_slot = headSlot, remote_head_slot = peerSlot,
|
||||||
|
queue_input_slot = man.queue.inpSlot,
|
||||||
|
queue_output_slot = man.queue.outSlot,
|
||||||
|
queue_last_slot = man.queue.finalSlot,
|
||||||
|
peer_speed = peer.netKbps(), peer_score = peer.getScore(),
|
||||||
|
index = index, direction = man.direction, topics = "syncman"
|
||||||
|
await sleepAsync(RESP_TIMEOUT)
|
||||||
|
return
|
||||||
|
|
||||||
|
debug "Creating new request for peer", wall_clock_slot = wallSlot,
|
||||||
|
remote_head_slot = peerSlot, local_head_slot = headSlot,
|
||||||
|
request_slot = req.slot, request_count = req.count,
|
||||||
|
request_step = req.step, peer = peer, peer_speed = peer.netKbps(),
|
||||||
|
peer_score = peer.getScore(), index = index,
|
||||||
|
direction = man.direction, topics = "syncman"
|
||||||
|
|
||||||
|
man.workers[index].status = SyncWorkerStatus.Downloading
|
||||||
|
|
||||||
|
try:
|
||||||
|
let blocks = await man.getBlocks(peer, req)
|
||||||
|
if blocks.isOk:
|
||||||
|
let data = blocks.get()
|
||||||
|
let smap = getShortMap(req, data)
|
||||||
|
debug "Received blocks on request", blocks_count = len(data),
|
||||||
|
blocks_map = smap, request_slot = req.slot,
|
||||||
|
request_count = req.count, request_step = req.step,
|
||||||
|
peer = peer, peer_score = peer.getScore(),
|
||||||
|
peer_speed = peer.netKbps(), index = index,
|
||||||
|
direction = man.direction, topics = "syncman"
|
||||||
|
|
||||||
|
if not(checkResponse(req, data)):
|
||||||
|
peer.updateScore(PeerScoreBadResponse)
|
||||||
|
warn "Received blocks sequence is not in requested range",
|
||||||
|
blocks_count = len(data), blocks_map = smap,
|
||||||
|
request_slot = req.slot, request_count = req.count,
|
||||||
|
request_step = req.step, peer = peer,
|
||||||
|
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
|
||||||
|
index = index, direction = man.direction, topics = "syncman"
|
||||||
|
return
|
||||||
|
|
||||||
|
# Scoring will happen in `syncUpdate`.
|
||||||
|
man.workers[index].status = SyncWorkerStatus.Queueing
|
||||||
|
await man.queue.push(req, data, proc() =
|
||||||
|
man.workers[index].status = SyncWorkerStatus.Processing)
|
||||||
|
else:
|
||||||
|
peer.updateScore(PeerScoreNoBlocks)
|
||||||
|
man.queue.push(req)
|
||||||
|
debug "Failed to receive blocks on request",
|
||||||
|
request_slot = req.slot, request_count = req.count,
|
||||||
|
request_step = req.step, peer = peer, index = index,
|
||||||
|
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
|
||||||
|
direction = man.direction, topics = "syncman"
|
||||||
|
return
|
||||||
|
|
||||||
|
except CatchableError as exc:
|
||||||
|
debug "Unexpected exception while receiving blocks",
|
||||||
|
request_slot = req.slot, request_count = req.count,
|
||||||
|
request_step = req.step, peer = peer, index = index,
|
||||||
|
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
|
||||||
|
errMsg = exc.msg, direction = man.direction, topics = "syncman"
|
||||||
|
return
|
||||||
|
|
||||||
|
proc syncForwardStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.
|
||||||
|
async.} =
|
||||||
let wallSlot = man.getLocalWallSlot()
|
let wallSlot = man.getLocalWallSlot()
|
||||||
let headSlot = man.getLocalHeadSlot()
|
let headSlot = man.getLocalHeadSlot()
|
||||||
var peerSlot = peer.getHeadSlot()
|
var peerSlot = peer.getHeadSlot()
|
||||||
|
@ -408,7 +644,11 @@ proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} =
|
||||||
await man.notInSyncEvent.wait()
|
await man.notInSyncEvent.wait()
|
||||||
man.workers[index].status = SyncWorkerStatus.WaitingPeer
|
man.workers[index].status = SyncWorkerStatus.WaitingPeer
|
||||||
let peer = await man.pool.acquire()
|
let peer = await man.pool.acquire()
|
||||||
await man.syncStep(index, peer)
|
case man.direction
|
||||||
|
of SyncQueueKind.Forward:
|
||||||
|
await man.syncForwardStep(index, peer)
|
||||||
|
of SyncQueueKind.Backward:
|
||||||
|
await man.syncBackwardStep(index, peer)
|
||||||
man.pool.release(peer)
|
man.pool.release(peer)
|
||||||
|
|
||||||
proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string,
|
proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string,
|
||||||
|
|
Loading…
Reference in New Issue