mirror of
https://github.com/status-im/nimbus-eth1.git
synced 2025-01-27 04:26:07 +00:00
Fix race condition on syncer termination (#2916)
* Cosmetics * Must not async wait inside termination `for` loop why: Async-waiting inside a `for` loop will switch to temination process which uncontrollably will modify the for-loop data base. * Avoid `waitFor` in scheduler termination code why: Is reserved for main loop
This commit is contained in:
parent
72d08030d9
commit
9a9d391217
@ -64,7 +64,7 @@ proc stop*(nimbus: NimbusNode, conf: NimbusConf) {.async, gcsafe.} =
|
||||
if nimbus.peerManager.isNil.not:
|
||||
await nimbus.peerManager.stop()
|
||||
if nimbus.beaconSyncRef.isNil.not:
|
||||
nimbus.beaconSyncRef.stop()
|
||||
await nimbus.beaconSyncRef.stop()
|
||||
if nimbus.metricsServer.isNil.not:
|
||||
await nimbus.metricsServer.stop()
|
||||
|
||||
|
@ -68,8 +68,8 @@ proc init*(
|
||||
proc start*(desc: BeaconSyncRef): bool =
|
||||
desc.startSync()
|
||||
|
||||
proc stop*(desc: BeaconSyncRef) =
|
||||
desc.stopSync()
|
||||
proc stop*(desc: BeaconSyncRef) {.async.} =
|
||||
await desc.stopSync()
|
||||
|
||||
# ------------------------------------------------------------------------------
|
||||
# End
|
||||
|
@ -287,12 +287,12 @@ proc blocksStagedImport*(
|
||||
if nBn <= ctx.chain.baseNumber:
|
||||
trace info & ": ignoring block <= base", n, iv,
|
||||
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
|
||||
nthBn=nBn.bnStr, nthHash=qItem.data.getNthHash(n)
|
||||
nthBn=nBn.bnStr, nthHash=qItem.data.getNthHash(n).short
|
||||
continue
|
||||
ctx.pool.chain.importBlock(qItem.data.blocks[n]).isOkOr:
|
||||
warn info & ": import block error", n, iv,
|
||||
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
|
||||
nthBn=nBn.bnStr, nthHash=qItem.data.getNthHash(n), `error`=error
|
||||
nthBn=nBn.bnStr, nthHash=qItem.data.getNthHash(n).short, `error`=error
|
||||
# Restore what is left over below
|
||||
maxImport = ctx.chain.latestNumber()
|
||||
break importLoop
|
||||
@ -319,7 +319,7 @@ proc blocksStagedImport*(
|
||||
ctx.pool.chain.forkChoice(nthHash, finHash).isOkOr:
|
||||
warn info & ": fork choice error", n, iv,
|
||||
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
|
||||
F=ctx.layout.final.bnStr, nthBn=nBn.bnStr, nthHash,
|
||||
F=ctx.layout.final.bnStr, nthBn=nBn.bnStr, nthHash=nthHash.short,
|
||||
finHash=(if finHash == nthHash: "nthHash" else: "F"), `error`=error
|
||||
# Restore what is left over below
|
||||
maxImport = ctx.chain.latestNumber()
|
||||
|
@ -160,7 +160,7 @@ proc key(peer: Peer): ENode =
|
||||
# Private functions
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc terminate[S,W](dsc: RunnerSyncRef[S,W]) =
|
||||
proc terminate[S,W](dsc: RunnerSyncRef[S,W]) {.async.} =
|
||||
## Request termination and wait for sub-tasks to finish
|
||||
mixin runRelease
|
||||
|
||||
@ -174,19 +174,18 @@ proc terminate[S,W](dsc: RunnerSyncRef[S,W]) =
|
||||
for w in dsc.buddies.nextPairs:
|
||||
if w.data.isRunning:
|
||||
w.data.worker.ctrl.stopped = true
|
||||
# Activate async job so it can finish
|
||||
try:
|
||||
waitFor sleepAsync termWaitPollingTime
|
||||
except CancelledError:
|
||||
trace "Shutdown: peer timeout was cancelled",
|
||||
peer=w.data.worker.peer, nWorkers=dsc.buddies.len
|
||||
else:
|
||||
dsc.buddies.del w.key # this is OK to delete
|
||||
# Activate async jobs so they can finish
|
||||
try:
|
||||
waitFor sleepAsync termWaitPollingTime
|
||||
except CancelledError:
|
||||
trace "Shutdown: peer timeout was cancelled", nWorkers=dsc.buddies.len
|
||||
|
||||
while dsc.daemonRunning:
|
||||
# Activate async job so it can finish
|
||||
try:
|
||||
waitFor sleepAsync termWaitPollingTime
|
||||
await sleepAsync termWaitPollingTime
|
||||
except CancelledError:
|
||||
trace "Shutdown: daemon timeout was cancelled", nWorkers=dsc.buddies.len
|
||||
|
||||
@ -482,9 +481,9 @@ proc startSync*[S,W](dsc: RunnerSyncRef[S,W]): bool =
|
||||
return true
|
||||
|
||||
|
||||
proc stopSync*[S,W](dsc: RunnerSyncRef[S,W]) =
|
||||
proc stopSync*[S,W](dsc: RunnerSyncRef[S,W]) {.async.} =
|
||||
## Stop syncing and free peer handlers .
|
||||
dsc.terminate()
|
||||
await dsc.terminate()
|
||||
|
||||
|
||||
proc isRunning*[S,W](dsc: RunnerSyncRef[S,W]): bool =
|
||||
|
Loading…
x
Reference in New Issue
Block a user