From 9a9d39121753af6935e1e27bd86ba8957234cef5 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Fri, 6 Dec 2024 12:11:40 +0000 Subject: [PATCH] Fix race condition on syncer termination (#2916) * Cosmetics * Must not async wait inside termination `for` loop why: Async-waiting inside a `for` loop will switch to temination process which uncontrollably will modify the for-loop data base. * Avoid `waitFor` in scheduler termination code why: Is reserved for main loop --- nimbus/nimbus_desc.nim | 2 +- nimbus/sync/beacon.nim | 4 ++-- nimbus/sync/beacon/worker/blocks_staged.nim | 6 +++--- nimbus/sync/sync_sched.nim | 19 +++++++++---------- 4 files changed, 15 insertions(+), 16 deletions(-) diff --git a/nimbus/nimbus_desc.nim b/nimbus/nimbus_desc.nim index cb2f51264..a7116bd23 100644 --- a/nimbus/nimbus_desc.nim +++ b/nimbus/nimbus_desc.nim @@ -64,7 +64,7 @@ proc stop*(nimbus: NimbusNode, conf: NimbusConf) {.async, gcsafe.} = if nimbus.peerManager.isNil.not: await nimbus.peerManager.stop() if nimbus.beaconSyncRef.isNil.not: - nimbus.beaconSyncRef.stop() + await nimbus.beaconSyncRef.stop() if nimbus.metricsServer.isNil.not: await nimbus.metricsServer.stop() diff --git a/nimbus/sync/beacon.nim b/nimbus/sync/beacon.nim index 463ca070c..b912edf0b 100644 --- a/nimbus/sync/beacon.nim +++ b/nimbus/sync/beacon.nim @@ -68,8 +68,8 @@ proc init*( proc start*(desc: BeaconSyncRef): bool = desc.startSync() -proc stop*(desc: BeaconSyncRef) = - desc.stopSync() +proc stop*(desc: BeaconSyncRef) {.async.} = + await desc.stopSync() # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/beacon/worker/blocks_staged.nim b/nimbus/sync/beacon/worker/blocks_staged.nim index b898d8a37..bdcb9e24d 100644 --- a/nimbus/sync/beacon/worker/blocks_staged.nim +++ b/nimbus/sync/beacon/worker/blocks_staged.nim @@ -287,12 +287,12 @@ proc blocksStagedImport*( if nBn <= ctx.chain.baseNumber: trace info & ": ignoring block <= base", n, iv, B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr, - nthBn=nBn.bnStr, nthHash=qItem.data.getNthHash(n) + nthBn=nBn.bnStr, nthHash=qItem.data.getNthHash(n).short continue ctx.pool.chain.importBlock(qItem.data.blocks[n]).isOkOr: warn info & ": import block error", n, iv, B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr, - nthBn=nBn.bnStr, nthHash=qItem.data.getNthHash(n), `error`=error + nthBn=nBn.bnStr, nthHash=qItem.data.getNthHash(n).short, `error`=error # Restore what is left over below maxImport = ctx.chain.latestNumber() break importLoop @@ -319,7 +319,7 @@ proc blocksStagedImport*( ctx.pool.chain.forkChoice(nthHash, finHash).isOkOr: warn info & ": fork choice error", n, iv, B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr, - F=ctx.layout.final.bnStr, nthBn=nBn.bnStr, nthHash, + F=ctx.layout.final.bnStr, nthBn=nBn.bnStr, nthHash=nthHash.short, finHash=(if finHash == nthHash: "nthHash" else: "F"), `error`=error # Restore what is left over below maxImport = ctx.chain.latestNumber() diff --git a/nimbus/sync/sync_sched.nim b/nimbus/sync/sync_sched.nim index c6eb0cc5d..9ee2b7f4a 100644 --- a/nimbus/sync/sync_sched.nim +++ b/nimbus/sync/sync_sched.nim @@ -160,7 +160,7 @@ proc key(peer: Peer): ENode = # Private functions # ------------------------------------------------------------------------------ -proc terminate[S,W](dsc: RunnerSyncRef[S,W]) = +proc terminate[S,W](dsc: RunnerSyncRef[S,W]) {.async.} = ## Request termination and wait for sub-tasks to finish mixin runRelease @@ -174,19 +174,18 @@ proc terminate[S,W](dsc: RunnerSyncRef[S,W]) = for w in dsc.buddies.nextPairs: if w.data.isRunning: w.data.worker.ctrl.stopped = true - # Activate async job so it can finish - try: - waitFor sleepAsync termWaitPollingTime - except CancelledError: - trace "Shutdown: peer timeout was cancelled", - peer=w.data.worker.peer, nWorkers=dsc.buddies.len else: dsc.buddies.del w.key # this is OK to delete + # Activate async jobs so they can finish + try: + waitFor sleepAsync termWaitPollingTime + except CancelledError: + trace "Shutdown: peer timeout was cancelled", nWorkers=dsc.buddies.len while dsc.daemonRunning: # Activate async job so it can finish try: - waitFor sleepAsync termWaitPollingTime + await sleepAsync termWaitPollingTime except CancelledError: trace "Shutdown: daemon timeout was cancelled", nWorkers=dsc.buddies.len @@ -482,9 +481,9 @@ proc startSync*[S,W](dsc: RunnerSyncRef[S,W]): bool = return true -proc stopSync*[S,W](dsc: RunnerSyncRef[S,W]) = +proc stopSync*[S,W](dsc: RunnerSyncRef[S,W]) {.async.} = ## Stop syncing and free peer handlers . - dsc.terminate() + await dsc.terminate() proc isRunning*[S,W](dsc: RunnerSyncRef[S,W]): bool =