## Nim-Codex ## Copyright (c) 2022 Status Research & Development GmbH ## Licensed under either of ## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) ## * MIT license ([LICENSE-MIT](LICENSE-MIT)) ## at your option. ## This file may not be copied, modified, or distributed except according to ## those terms. import std/sequtils import pkg/chronos import pkg/chronicles import pkg/libp2p/cid import pkg/metrics import pkg/questionable import pkg/questionable/results import ../protobuf/presence import ../network import ../peers import ../../utils import ../../discovery import ../../stores/blockstore import ./pendingblocks logScope: topics = "codex discoveryengine" declareGauge(codexInflightDiscovery, "inflight discovery requests") const DefaultConcurrentDiscRequests = 10 DefaultConcurrentAdvertRequests = 10 DefaultDiscoveryTimeout = 1.minutes DefaultMinPeersPerBlock = 3 DefaultDiscoveryLoopSleep = 3.seconds DefaultAdvertiseLoopSleep = 30.minutes type DiscoveryEngine* = ref object of RootObj localStore*: BlockStore # Local block store for this instance peers*: PeerCtxStore # Peer context store network*: BlockExcNetwork # Network interface discovery*: Discovery # Discovery interface pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved discEngineRunning*: bool # Indicates if discovery is running concurrentAdvReqs: int # Concurrent advertise requests concurrentDiscReqs: int # Concurrent discovery requests advertiseLoop*: Future[void] # Advertise loop task handle advertiseQueue*: AsyncQueue[Cid] # Advertise queue advertiseTasks*: seq[Future[void]] # Advertise tasks discoveryLoop*: Future[void] # Discovery loop task handle discoveryQueue*: AsyncQueue[Cid] # Discovery queue discoveryTasks*: seq[Future[void]] # Discovery tasks minPeersPerBlock*: int # Max number of peers with block discoveryLoopSleep: Duration # Discovery loop sleep advertiseLoopSleep: Duration # Advertise loop sleep inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests advertiseType*: BlockType # Advertice blocks, manifests or both proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} = while b.discEngineRunning: for cid in toSeq(b.pendingBlocks.wantList): try: await b.discoveryQueue.put(cid) except CatchableError as exc: trace "Exception in discovery loop", exc = exc.msg logScope: sleep = b.discoveryLoopSleep wanted = b.pendingBlocks.len trace "About to sleep discovery loop" await sleepAsync(b.discoveryLoopSleep) proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} = while b.discEngineRunning: if cids =? await b.localStore.listBlocks(blockType = b.advertiseType): trace "Begin iterating blocks..." for c in cids: if cid =? await c: await b.advertiseQueue.put(cid) await sleepAsync(50.millis) trace "Iterating blocks finished." trace "About to sleep advertise loop", sleep = b.advertiseLoopSleep await sleepAsync(b.advertiseLoopSleep) trace "Exiting advertise task loop" proc advertiseTaskLoop(b: DiscoveryEngine) {.async.} = ## Run advertise tasks ## while b.discEngineRunning: try: let cid = await b.advertiseQueue.get() if cid in b.inFlightAdvReqs: trace "Advertise request already in progress", cid continue try: let request = b.discovery.provide(cid) b.inFlightAdvReqs[cid] = request codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) trace "Advertising block", cid, inflight = b.inFlightAdvReqs.len await request finally: b.inFlightAdvReqs.del(cid) codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) trace "Advertised block", cid, inflight = b.inFlightAdvReqs.len except CatchableError as exc: trace "Exception in advertise task runner", exc = exc.msg trace "Exiting advertise task runner" proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} = ## Run discovery tasks ## while b.discEngineRunning: try: let cid = await b.discoveryQueue.get() if cid in b.inFlightDiscReqs: trace "Discovery request already in progress", cid continue let haves = b.peers.peersHave(cid) trace "Current number of peers for block", cid, peers = haves.len if haves.len < b.minPeersPerBlock: trace "Discovering block", cid try: let request = b.discovery .find(cid) .wait(DefaultDiscoveryTimeout) b.inFlightDiscReqs[cid] = request codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) let peers = await request trace "Discovered peers for block", peers = peers.len, cid let dialed = await allFinished( peers.mapIt( b.network.dialPeer(it.data) )) for i, f in dialed: if f.failed: await b.discovery.removeProvider(peers[i].data.peerId) finally: b.inFlightDiscReqs.del(cid) codexInflightDiscovery.set(b.inFlightAdvReqs.len.int64) except CatchableError as exc: trace "Exception in discovery task runner", exc = exc.msg trace "Exiting discovery task runner" proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} = for cid in cids: if cid notin b.discoveryQueue: try: trace "Queueing find block", cid, queue = b.discoveryQueue.len b.discoveryQueue.putNoWait(cid) except CatchableError as exc: trace "Exception queueing discovery request", exc = exc.msg proc queueProvideBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) {.inline.} = for cid in cids: if cid notin b.advertiseQueue: try: trace "Queueing provide block", cid, queue = b.discoveryQueue.len b.advertiseQueue.putNoWait(cid) except CatchableError as exc: trace "Exception queueing discovery request", exc = exc.msg proc start*(b: DiscoveryEngine) {.async.} = ## Start the discengine task ## trace "Discovery engine start" if b.discEngineRunning: warn "Starting discovery engine twice" return b.discEngineRunning = true for i in 0..