Fix tight loop at the end of sync process. (#1731)
This commit is contained in:
parent
2bc26940d4
commit
08795b3f5d
|
@ -32,6 +32,9 @@ const
|
||||||
SyncWorkersCount* = 20
|
SyncWorkersCount* = 20
|
||||||
## Number of sync workers to spawn
|
## Number of sync workers to spawn
|
||||||
|
|
||||||
|
StatusUpdateInterval* = chronos.minutes(1)
|
||||||
|
## Minimum time between two subsequent calls to update peer's status
|
||||||
|
|
||||||
type
|
type
|
||||||
SyncFailureKind* = enum
|
SyncFailureKind* = enum
|
||||||
StatusInvalid,
|
StatusInvalid,
|
||||||
|
@ -665,6 +668,9 @@ template peerAge(): uint64 =
|
||||||
template queueAge(): uint64 =
|
template queueAge(): uint64 =
|
||||||
wallSlot - man.queue.outSlot
|
wallSlot - man.queue.outSlot
|
||||||
|
|
||||||
|
template peerStatusAge(): Duration =
|
||||||
|
Moment.now() - peer.state(BeaconSync).statusLastTime
|
||||||
|
|
||||||
func syncQueueLen*[A, B](man: SyncManager[A, B]): uint64 =
|
func syncQueueLen*[A, B](man: SyncManager[A, B]): uint64 =
|
||||||
man.queue.len
|
man.queue.len
|
||||||
|
|
||||||
|
@ -735,11 +741,60 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
|
||||||
peer.updateScore(PeerScoreGoodStatus)
|
peer.updateScore(PeerScoreGoodStatus)
|
||||||
peerSlot = newPeerSlot
|
peerSlot = newPeerSlot
|
||||||
|
|
||||||
if (peerAge <= man.maxHeadAge) and (headAge <= man.maxHeadAge):
|
if headAge <= man.maxHeadAge:
|
||||||
debug "We are in sync with peer, exiting", wall_clock_slot = wallSlot,
|
debug "We are in sync with network, exiting", wall_clock_slot = wallSlot,
|
||||||
remote_head_slot = peerSlot, local_head_slot = headSlot,
|
remote_head_slot = peerSlot, local_head_slot = headSlot,
|
||||||
peer = peer, peer_score = peer.getScore(), index = index,
|
peer = peer, peer_score = peer.getScore(), index = index,
|
||||||
peer_speed = peer.netKbps(), topics = "syncman"
|
peer_speed = peer.netKbps(), topics = "syncman"
|
||||||
|
# We clear SyncManager's `notInSyncEvent` so all the workers will become
|
||||||
|
# sleeping soon.
|
||||||
|
man.notInSyncEvent.clear()
|
||||||
|
return
|
||||||
|
|
||||||
|
if peerAge <= man.maxHeadAge:
|
||||||
|
debug "We are in sync with peer, refreshing peer's status information",
|
||||||
|
wall_clock_slot = wallSlot, remote_head_slot = peerSlot,
|
||||||
|
local_head_slot = headSlot, peer = peer, peer_score = peer.getScore(),
|
||||||
|
index = index, peer_speed = peer.netKbps(), topics = "syncman"
|
||||||
|
|
||||||
|
if peerStatusAge <= StatusUpdateInterval:
|
||||||
|
await sleepAsync(StatusUpdateInterval - peerStatusAge)
|
||||||
|
|
||||||
|
man.workers[index].status = SyncWorkerStatus.UpdatingStatus
|
||||||
|
try:
|
||||||
|
let res = await peer.updateStatus()
|
||||||
|
if not(res):
|
||||||
|
peer.updateScore(PeerScoreNoStatus)
|
||||||
|
debug "Failed to get remote peer's status, exiting", peer = peer,
|
||||||
|
peer_score = peer.getScore(), peer_head_slot = peerSlot,
|
||||||
|
peer_speed = peer.netKbps(), index = index, topics = "syncman"
|
||||||
|
let failure = SyncFailure.init(SyncFailureKind.StatusDownload, peer)
|
||||||
|
man.failures.add(failure)
|
||||||
|
return
|
||||||
|
except CatchableError as exc:
|
||||||
|
debug "Unexpected exception while updating peer's status",
|
||||||
|
peer = peer, peer_score = peer.getScore(),
|
||||||
|
peer_head_slot = peerSlot, peer_speed = peer.netKbps(),
|
||||||
|
index = index, errMsg = exc.msg, topics = "syncman"
|
||||||
|
return
|
||||||
|
|
||||||
|
let newPeerSlot = peer.getHeadSlot()
|
||||||
|
if peerSlot >= newPeerSlot:
|
||||||
|
peer.updateScore(PeerScoreStaleStatus)
|
||||||
|
debug "Peer's status information is stale",
|
||||||
|
wall_clock_slot = wallSlot, remote_old_head_slot = peerSlot,
|
||||||
|
local_head_slot = headSlot, remote_new_head_slot = newPeerSlot,
|
||||||
|
peer = peer, peer_score = peer.getScore(), index = index,
|
||||||
|
peer_speed = peer.netKbps(), topics = "syncman"
|
||||||
|
else:
|
||||||
|
debug "Peer's status information updated", wall_clock_slot = wallSlot,
|
||||||
|
remote_old_head_slot = peerSlot, local_head_slot = headSlot,
|
||||||
|
remote_new_head_slot = newPeerSlot, peer = peer,
|
||||||
|
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
|
||||||
|
index = index, topics = "syncman"
|
||||||
|
peer.updateScore(PeerScoreGoodStatus)
|
||||||
|
peerSlot = newPeerSlot
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
man.workers[index].status = SyncWorkerStatus.Requesting
|
man.workers[index].status = SyncWorkerStatus.Requesting
|
||||||
|
@ -829,9 +884,7 @@ proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} =
|
||||||
man.workers[index].status = SyncWorkerStatus.Sleeping
|
man.workers[index].status = SyncWorkerStatus.Sleeping
|
||||||
# This event is going to be set until we are not in sync with network
|
# This event is going to be set until we are not in sync with network
|
||||||
await man.notInSyncEvent.wait()
|
await man.notInSyncEvent.wait()
|
||||||
|
|
||||||
man.workers[index].status = SyncWorkerStatus.WaitingPeer
|
man.workers[index].status = SyncWorkerStatus.WaitingPeer
|
||||||
|
|
||||||
let peer = await man.pool.acquire()
|
let peer = await man.pool.acquire()
|
||||||
await man.syncStep(index, peer)
|
await man.syncStep(index, peer)
|
||||||
man.pool.release(peer)
|
man.pool.release(peer)
|
||||||
|
|
|
@ -37,6 +37,7 @@ type
|
||||||
forkDigest*: ForkDigest
|
forkDigest*: ForkDigest
|
||||||
|
|
||||||
BeaconSyncPeerState* = ref object
|
BeaconSyncPeerState* = ref object
|
||||||
|
statusLastTime*: chronos.Moment
|
||||||
statusMsg*: StatusMsg
|
statusMsg*: StatusMsg
|
||||||
|
|
||||||
BlockRootSlot* = object
|
BlockRootSlot* = object
|
||||||
|
@ -190,6 +191,7 @@ p2pProtocol BeaconSync(version = 1,
|
||||||
proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) =
|
proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) =
|
||||||
debug "Peer status", peer, statusMsg
|
debug "Peer status", peer, statusMsg
|
||||||
peer.state(BeaconSync).statusMsg = statusMsg
|
peer.state(BeaconSync).statusMsg = statusMsg
|
||||||
|
peer.state(BeaconSync).statusLastTime = Moment.now()
|
||||||
|
|
||||||
proc updateStatus*(peer: Peer): Future[bool] {.async.} =
|
proc updateStatus*(peer: Peer): Future[bool] {.async.} =
|
||||||
## Request `status` of remote peer ``peer``.
|
## Request `status` of remote peer ``peer``.
|
||||||
|
@ -200,12 +202,14 @@ proc updateStatus*(peer: Peer): Future[bool] {.async.} =
|
||||||
let theirFut = awaitne peer.status(ourStatus,
|
let theirFut = awaitne peer.status(ourStatus,
|
||||||
timeout = chronos.seconds(60))
|
timeout = chronos.seconds(60))
|
||||||
if theirFut.failed():
|
if theirFut.failed():
|
||||||
result = false
|
return false
|
||||||
else:
|
else:
|
||||||
let theirStatus = theirFut.read()
|
let theirStatus = theirFut.read()
|
||||||
if theirStatus.isOk:
|
if theirStatus.isOk:
|
||||||
peer.setStatusMsg(theirStatus.get)
|
peer.setStatusMsg(theirStatus.get)
|
||||||
result = true
|
return true
|
||||||
|
else:
|
||||||
|
return false
|
||||||
|
|
||||||
proc getHeadSlot*(peer: Peer): Slot {.inline.} =
|
proc getHeadSlot*(peer: Peer): Slot {.inline.} =
|
||||||
## Returns head slot for specific peer ``peer``.
|
## Returns head slot for specific peer ``peer``.
|
||||||
|
|
Loading…
Reference in New Issue