From f235cc66215b34e0839c3981113254e2c6def360 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 14 Nov 2022 18:01:05 -0600 Subject: [PATCH] don't spawn queue requests (#313) * don't spawn queue requests * adjust list blocks idle timer * increase timeout on failing test... --- codex/blockexchange/engine/discovery.nim | 46 +++++++++---------- .../discovery/testdiscoveryengine.nim | 2 +- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index 882a29e0..c05c0034 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -13,6 +13,7 @@ import pkg/chronos import pkg/chronicles import pkg/libp2p import pkg/metrics +import pkg/questionable/results import ../protobuf/presence @@ -52,8 +53,8 @@ type advertiseQueue*: AsyncQueue[Cid] # Advertise queue advertiseTasks*: seq[Future[void]] # Advertise tasks discoveryLoop*: Future[void] # Discovery loop task handle - discoveryTasks*: seq[Future[void]] # Discovery tasks discoveryQueue*: AsyncQueue[Cid] # Discovery queue + discoveryTasks*: seq[Future[void]] # Discovery tasks minPeersPerBlock*: int # Max number of peers with block discoveryLoopSleep: Duration # Discovery loop sleep advertiseLoopSleep: Duration # Advertise loop sleep @@ -78,7 +79,9 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} = proc advertiseQueueLoop*(b: DiscoveryEngine) {.async.} = proc onBlock(cid: Cid) {.async.} = try: + trace "Listed block", cid await b.advertiseQueue.put(cid) + await sleepAsync(50.millis) # TODO: temp workaround because we're announcing all CIDs except CancelledError as exc: trace "Cancelling block listing" raise exc @@ -107,11 +110,14 @@ proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} = continue try: - let request = b.discovery.provide(cid) + let + request = b.discovery.provide(cid) + b.inFlightAdvReqs[cid] = request codex_inflight_discovery.set(b.inFlightAdvReqs.len.int64) trace "Advertising block", cid, inflight = b.inFlightAdvReqs.len await request + finally: b.inFlightAdvReqs.del(cid) codex_inflight_discovery.set(b.inFlightAdvReqs.len.int64) @@ -169,34 +175,28 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} = trace "Exiting discovery task runner" proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} = - proc queueReq() {.async.} = - try: - for cid in cids: - if cid notin b.discoveryQueue: - trace "Queueing find block request", cid - await b.discoveryQueue.put(cid) - except CatchableError as exc: - trace "Exception queueing discovery request", exc = exc.msg - - asyncSpawn queueReq() + for cid in cids: + if cid notin b.discoveryQueue: + try: + trace "Queueing find block", cid, queue = b.discoveryQueue.len + b.discoveryQueue.putNoWait(cid) + except CatchableError as exc: + trace "Exception queueing discovery request", exc = exc.msg proc queueProvideBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} = - proc queueReq() {.async.} = - try: - for cid in cids: - if cid notin b.advertiseQueue: - trace "Queueing provide block request", cid - await b.advertiseQueue.put(cid) - except CatchableError as exc: - trace "Exception queueing discovery request", exc = exc.msg - - asyncSpawn queueReq() + for cid in cids: + if cid notin b.advertiseQueue: + try: + trace "Queueing provide block", cid, queue = b.discoveryQueue.len + b.advertiseQueue.putNoWait(cid) + except CatchableError as exc: + trace "Exception queueing discovery request", exc = exc.msg proc start*(b: DiscoveryEngine) {.async.} = ## Start the discengine task ## - trace "discovery engine start" + trace "Discovery engine start" if b.discEngineRunning: warn "Starting discovery engine twice" diff --git a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim index f8fd316f..3c5e768e 100644 --- a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim +++ b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim @@ -86,7 +86,7 @@ suite "Test Discovery Engine": await discoveryEngine.start() await allFuturesThrowing( - allFinished(toSeq(haves.values))).wait(1.seconds) + allFinished(toSeq(haves.values))).wait(5.seconds) await discoveryEngine.stop() test "Should queue discovery request":