mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-03 22:13:12 +00:00
* cleanup imports and logs * add BlockHandle type * revert deps * refactor: async error handling and future tracking improvements - Update async procedures to use explicit raises annotation - Modify TrackedFutures to handle futures with no raised exceptions - Replace `asyncSpawn` with explicit future tracking - Update test suites to use `unittest2` - Standardize error handling across network and async components - Remove deprecated error handling patterns This commit introduces a more robust approach to async error handling and future management, improving type safety and reducing potential runtime errors. * bump nim-serde * remove asyncSpawn * rework background downloads and prefetch * imporove logging * refactor: enhance async procedures with error handling and raise annotations * misc cleanup * misc * refactor: implement allFinishedFailed to aggregate future results with success and failure tracking * refactor: update error handling in reader procedures to raise ChunkerError and CancelledError * refactor: improve error handling in wantListHandler and accountHandler procedures * refactor: simplify LPStreamReadError creation by consolidating parameters * refactor: enhance error handling in AsyncStreamWrapper to catch unexpected errors * refactor: enhance error handling in advertiser and discovery loops to improve resilience * misc * refactor: improve code structure and readability * remove cancellation from addSlotToQueue * refactor: add assertion for unexpected errors in local store checks * refactor: prevent tracking of finished futures and improve test assertions * refactor: improve error handling in local store checks * remove usage of msgDetail * feat: add initial implementation of discovery engine and related components * refactor: improve task scheduling logic by removing unnecessary break statement * break after scheduling a task * make taskHandler cancelable * refactor: update async handlers to raise CancelledError * refactor(advertiser): streamline error handling and improve task flow in advertise loops * fix: correct spelling of "divisible" in error messages and comments * refactor(discovery): simplify discovery task loop and improve error handling * refactor(engine): filter peers before processing in cancelBlocks procedure
175 lines
5.2 KiB
Nim
175 lines
5.2 KiB
Nim
## Nim-Codex
|
|
## Copyright (c) 2022 Status Research & Development GmbH
|
|
## Licensed under either of
|
|
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
## at your option.
|
|
## This file may not be copied, modified, or distributed except according to
|
|
## those terms.
|
|
|
|
import std/sequtils
|
|
|
|
import pkg/chronos
|
|
import pkg/libp2p/cid
|
|
import pkg/libp2p/multicodec
|
|
import pkg/metrics
|
|
import pkg/questionable
|
|
import pkg/questionable/results
|
|
|
|
import ./pendingblocks
|
|
|
|
import ../protobuf/presence
|
|
import ../network
|
|
import ../peers
|
|
|
|
import ../../utils
|
|
import ../../utils/trackedfutures
|
|
import ../../discovery
|
|
import ../../stores/blockstore
|
|
import ../../logutils
|
|
import ../../manifest
|
|
|
|
logScope:
|
|
topics = "codex discoveryengine"
|
|
|
|
declareGauge(codex_inflight_discovery, "inflight discovery requests")
|
|
|
|
const
|
|
DefaultConcurrentDiscRequests = 10
|
|
DefaultDiscoveryTimeout = 1.minutes
|
|
DefaultMinPeersPerBlock = 3
|
|
DefaultDiscoveryLoopSleep = 3.seconds
|
|
|
|
type DiscoveryEngine* = ref object of RootObj
|
|
localStore*: BlockStore # Local block store for this instance
|
|
peers*: PeerCtxStore # Peer context store
|
|
network*: BlockExcNetwork # Network interface
|
|
discovery*: Discovery # Discovery interface
|
|
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
|
|
discEngineRunning*: bool # Indicates if discovery is running
|
|
concurrentDiscReqs: int # Concurrent discovery requests
|
|
discoveryLoop*: Future[void].Raising([]) # Discovery loop task handle
|
|
discoveryQueue*: AsyncQueue[Cid] # Discovery queue
|
|
trackedFutures*: TrackedFutures # Tracked Discovery tasks futures
|
|
minPeersPerBlock*: int # Max number of peers with block
|
|
discoveryLoopSleep: Duration # Discovery loop sleep
|
|
inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]]
|
|
# Inflight discovery requests
|
|
|
|
proc discoveryQueueLoop(b: DiscoveryEngine) {.async: (raises: []).} =
|
|
try:
|
|
while b.discEngineRunning:
|
|
for cid in toSeq(b.pendingBlocks.wantListBlockCids):
|
|
await b.discoveryQueue.put(cid)
|
|
|
|
await sleepAsync(b.discoveryLoopSleep)
|
|
except CancelledError:
|
|
trace "Discovery loop cancelled"
|
|
|
|
proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} =
|
|
## Run discovery tasks
|
|
##
|
|
|
|
try:
|
|
while b.discEngineRunning:
|
|
let cid = await b.discoveryQueue.get()
|
|
|
|
if cid in b.inFlightDiscReqs:
|
|
trace "Discovery request already in progress", cid
|
|
continue
|
|
|
|
let haves = b.peers.peersHave(cid)
|
|
|
|
if haves.len < b.minPeersPerBlock:
|
|
let request = b.discovery.find(cid)
|
|
b.inFlightDiscReqs[cid] = request
|
|
codex_inflight_discovery.set(b.inFlightDiscReqs.len.int64)
|
|
|
|
defer:
|
|
b.inFlightDiscReqs.del(cid)
|
|
codex_inflight_discovery.set(b.inFlightDiscReqs.len.int64)
|
|
|
|
if (await request.withTimeout(DefaultDiscoveryTimeout)) and
|
|
peers =? (await request).catch:
|
|
let dialed = await allFinished(peers.mapIt(b.network.dialPeer(it.data)))
|
|
|
|
for i, f in dialed:
|
|
if f.failed:
|
|
await b.discovery.removeProvider(peers[i].data.peerId)
|
|
except CancelledError:
|
|
trace "Discovery task cancelled"
|
|
return
|
|
|
|
info "Exiting discovery task runner"
|
|
|
|
proc queueFindBlocksReq*(b: DiscoveryEngine, cids: seq[Cid]) =
|
|
for cid in cids:
|
|
if cid notin b.discoveryQueue:
|
|
try:
|
|
b.discoveryQueue.putNoWait(cid)
|
|
except CatchableError as exc:
|
|
warn "Exception queueing discovery request", exc = exc.msg
|
|
|
|
proc start*(b: DiscoveryEngine) {.async: (raises: []).} =
|
|
## Start the discengine task
|
|
##
|
|
|
|
trace "Discovery engine starting"
|
|
|
|
if b.discEngineRunning:
|
|
warn "Starting discovery engine twice"
|
|
return
|
|
|
|
b.discEngineRunning = true
|
|
for i in 0 ..< b.concurrentDiscReqs:
|
|
let fut = b.discoveryTaskLoop()
|
|
b.trackedFutures.track(fut)
|
|
|
|
b.discoveryLoop = b.discoveryQueueLoop()
|
|
b.trackedFutures.track(b.discoveryLoop)
|
|
|
|
trace "Discovery engine started"
|
|
|
|
proc stop*(b: DiscoveryEngine) {.async: (raises: []).} =
|
|
## Stop the discovery engine
|
|
##
|
|
|
|
trace "Discovery engine stop"
|
|
if not b.discEngineRunning:
|
|
warn "Stopping discovery engine without starting it"
|
|
return
|
|
|
|
b.discEngineRunning = false
|
|
trace "Stopping discovery loop and tasks"
|
|
await b.trackedFutures.cancelTracked()
|
|
trace "Discovery loop and tasks stopped"
|
|
|
|
trace "Discovery engine stopped"
|
|
|
|
proc new*(
|
|
T: type DiscoveryEngine,
|
|
localStore: BlockStore,
|
|
peers: PeerCtxStore,
|
|
network: BlockExcNetwork,
|
|
discovery: Discovery,
|
|
pendingBlocks: PendingBlocksManager,
|
|
concurrentDiscReqs = DefaultConcurrentDiscRequests,
|
|
discoveryLoopSleep = DefaultDiscoveryLoopSleep,
|
|
minPeersPerBlock = DefaultMinPeersPerBlock,
|
|
): DiscoveryEngine =
|
|
## Create a discovery engine instance for advertising services
|
|
##
|
|
DiscoveryEngine(
|
|
localStore: localStore,
|
|
peers: peers,
|
|
network: network,
|
|
discovery: discovery,
|
|
pendingBlocks: pendingBlocks,
|
|
concurrentDiscReqs: concurrentDiscReqs,
|
|
discoveryQueue: newAsyncQueue[Cid](concurrentDiscReqs),
|
|
trackedFutures: TrackedFutures.new(),
|
|
inFlightDiscReqs: initTable[Cid, Future[seq[SignedPeerRecord]]](),
|
|
discoveryLoopSleep: discoveryLoopSleep,
|
|
minPeersPerBlock: minPeersPerBlock,
|
|
)
|