mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-05-12 14:29:39 +00:00
389 lines
13 KiB
Nim
389 lines
13 KiB
Nim
## Logos Storage
|
|
## Copyright (c) 2026 Status Research & Development GmbH
|
|
## Licensed under either of
|
|
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
## at your option.
|
|
## This file may not be copied, modified, or distributed except according to
|
|
## those terms.
|
|
|
|
{.push raises: [].}
|
|
|
|
import std/[tables, sets, monotimes, options]
|
|
|
|
import pkg/chronos
|
|
import pkg/libp2p
|
|
import pkg/metrics
|
|
import pkg/questionable
|
|
|
|
import ../../blocktype
|
|
import ../../logutils
|
|
import ../../stores/blockstore
|
|
|
|
import ./scheduler
|
|
import ./swarm
|
|
import ./downloadcontext
|
|
|
|
export scheduler, swarm, downloadcontext
|
|
|
|
logScope:
|
|
topics = "storage activedownload"
|
|
|
|
declareGauge(
|
|
storage_block_exchange_retrieval_time_us,
|
|
"storage blockexchange block retrieval time us",
|
|
)
|
|
|
|
type
|
|
RetriesExhaustedError* = object of StorageError
|
|
DownloadTerminatedError* = object of StorageError
|
|
BlockHandle* = Future[?!Block].Raising([CancelledError])
|
|
BlockHandleOpaque* = Future[?!void].Raising([CancelledError])
|
|
|
|
BlockReq* = object
|
|
handle*: BlockHandle
|
|
opaqueHandle*: BlockHandleOpaque
|
|
blockRetries*: int
|
|
startTime*: int64
|
|
|
|
PendingBatch* = object
|
|
start*: uint64
|
|
count*: uint64
|
|
localCount*: uint64 # blocks already local when batch was scheduled
|
|
peerId*: PeerId
|
|
sentAt*: Moment
|
|
timeoutFuture*: Future[void] # timeout handler to cancel on completion
|
|
requestFuture*: Future[void] # request future to cancel on timeout
|
|
|
|
ActiveDownload* = ref object
|
|
id*: uint64 # for request/response correlation - echoed in protocol
|
|
ctx*: DownloadContext
|
|
blocks*: Table[BlockAddress, BlockReq] # per-download block requests
|
|
pendingBatches*: Table[uint64, PendingBatch] # batch start -> pending info
|
|
exhaustedBlocks*: HashSet[BlockAddress]
|
|
# blocks that exhausted retries - failed permanently
|
|
maxBlockRetries*: int
|
|
retryInterval*: Duration
|
|
cancelled*: bool
|
|
isBackground*: bool
|
|
fetchLocal*: bool
|
|
completionFuture*: Future[?!void].Raising([CancelledError])
|
|
|
|
proc waitForComplete*(
|
|
download: ActiveDownload
|
|
): Future[?!void] {.async: (raises: [CancelledError]).} =
|
|
return await download.completionFuture
|
|
|
|
proc signalCompletionIfDone(download: ActiveDownload, error: ref StorageError = nil) =
|
|
if download.completionFuture.finished:
|
|
return
|
|
if error != nil:
|
|
download.completionFuture.complete(void.failure(error))
|
|
let termErr = (ref DownloadTerminatedError)(msg: "Download terminated")
|
|
for _, blockReq in download.blocks:
|
|
if not blockReq.handle.finished:
|
|
blockReq.handle.complete(Block.failure(termErr))
|
|
blockReq.opaqueHandle.complete(Result[void, ref CatchableError].err(termErr))
|
|
elif download.ctx.isComplete:
|
|
download.completionFuture.complete(success())
|
|
|
|
proc treeCid*(download: ActiveDownload): Cid =
|
|
download.ctx.md.manifest.treeCid
|
|
|
|
proc manifestCid*(download: ActiveDownload): Cid =
|
|
download.ctx.md.manifestCid
|
|
|
|
proc makeBlockAddress*(download: ActiveDownload, index: uint64): BlockAddress =
|
|
BlockAddress(treeCid: download.treeCid, index: index.int)
|
|
|
|
proc getOrCreateBlockReq(download: ActiveDownload, address: BlockAddress): BlockReq =
|
|
download.blocks.withValue(address, blkReq):
|
|
return blkReq[]
|
|
do:
|
|
let blkReq = BlockReq(
|
|
handle: BlockHandle.init("ActiveDownload.getWantHandle"),
|
|
opaqueHandle: BlockHandleOpaque.init("ActiveDownload.getWantHandleOpaque"),
|
|
blockRetries: download.maxBlockRetries,
|
|
startTime: getMonoTime().ticks,
|
|
)
|
|
download.blocks[address] = blkReq
|
|
|
|
let handle = blkReq.handle
|
|
|
|
proc cleanUpBlock(data: pointer) {.raises: [].} =
|
|
download.blocks.del(address)
|
|
|
|
handle.addCallback(cleanUpBlock)
|
|
handle.cancelCallback = proc(data: pointer) {.raises: [].} =
|
|
if not handle.finished:
|
|
handle.removeCallback(cleanUpBlock)
|
|
cleanUpBlock(nil)
|
|
|
|
return blkReq
|
|
|
|
proc getWantHandle*(
|
|
download: ActiveDownload, address: BlockAddress
|
|
): Future[?!Block] {.async: (raw: true, raises: [CancelledError]).} =
|
|
let blkReq = download.getOrCreateBlockReq(address)
|
|
if download.completionFuture.finished and not blkReq.handle.finished:
|
|
let err = (ref DownloadTerminatedError)(msg: "Download terminated")
|
|
blkReq.handle.complete(Block.failure(err))
|
|
blkReq.opaqueHandle.complete(Result[void, ref CatchableError].err(err))
|
|
blkReq.handle
|
|
|
|
proc getWantHandleOpaque*(
|
|
download: ActiveDownload, address: BlockAddress
|
|
): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} =
|
|
download.getOrCreateBlockReq(address).opaqueHandle
|
|
|
|
proc completeWantHandle*(
|
|
download: ActiveDownload, address: BlockAddress, blk: Option[Block] = none(Block)
|
|
): bool {.raises: [].} =
|
|
download.blocks.withValue(address, blockReq):
|
|
proc recordRetrievalTime(startTime: int64) =
|
|
let
|
|
stopTime = getMonoTime().ticks
|
|
retrievalDurationUs = (stopTime - startTime) div 1000
|
|
storage_block_exchange_retrieval_time_us.set(retrievalDurationUs)
|
|
|
|
if blk.isSome:
|
|
if not blockReq[].handle.finished:
|
|
blockReq[].handle.complete(success(blk.get))
|
|
blockReq[].opaqueHandle.complete(success())
|
|
recordRetrievalTime(blockReq[].startTime)
|
|
return true
|
|
else:
|
|
trace "Want handle already completed", address
|
|
return false
|
|
else:
|
|
if not blockReq[].opaqueHandle.finished:
|
|
blockReq[].opaqueHandle.complete(success())
|
|
recordRetrievalTime(blockReq[].startTime)
|
|
return true
|
|
else:
|
|
return false
|
|
do:
|
|
trace "No pending want handle found", address
|
|
return false
|
|
|
|
proc failWantHandle(
|
|
download: ActiveDownload, address: BlockAddress, error: ref StorageError
|
|
) {.raises: [].} =
|
|
download.blocks.withValue(address, blockReq):
|
|
if not blockReq[].handle.finished:
|
|
blockReq[].handle.complete(Block.failure(error))
|
|
blockReq[].opaqueHandle.complete(Result[void, ref CatchableError].err(error))
|
|
|
|
func retries*(download: ActiveDownload, address: BlockAddress): int =
|
|
download.blocks.withValue(address, pending):
|
|
result = pending[].blockRetries
|
|
do:
|
|
result = 0
|
|
|
|
func decRetries*(download: ActiveDownload, address: BlockAddress) =
|
|
download.blocks.withValue(address, pending):
|
|
pending[].blockRetries -= 1
|
|
|
|
func retriesExhausted*(download: ActiveDownload, address: BlockAddress): bool =
|
|
download.blocks.withValue(address, pending):
|
|
result = pending[].blockRetries <= 0
|
|
|
|
proc decrementBlockRetries*(
|
|
download: ActiveDownload, addresses: seq[BlockAddress]
|
|
): seq[BlockAddress] =
|
|
result = @[]
|
|
for address in addresses:
|
|
download.blocks.withValue(address, req):
|
|
req[].blockRetries -= 1
|
|
if req[].blockRetries <= 0:
|
|
result.add(address)
|
|
|
|
proc failExhaustedBlocks*(download: ActiveDownload, addresses: seq[BlockAddress]) =
|
|
if addresses.len == 0:
|
|
return
|
|
|
|
for address in addresses:
|
|
download.exhaustedBlocks.incl(address)
|
|
download.ctx.received += 1
|
|
|
|
let error = (ref RetriesExhaustedError)(
|
|
msg: "Block retries exhausted after " & $download.maxBlockRetries & " attempts"
|
|
)
|
|
for address in addresses:
|
|
download.failWantHandle(address, error)
|
|
download.blocks.del(address)
|
|
|
|
download.signalCompletionIfDone(error)
|
|
|
|
proc failLocalMissing*(download: ActiveDownload, address: BlockAddress) =
|
|
let error = (ref BlockNotFoundError)(msg: "Block not found locally: " & $address)
|
|
download.failWantHandle(address, error)
|
|
download.signalCompletionIfDone(error)
|
|
|
|
proc isBlockExhausted*(download: ActiveDownload, address: BlockAddress): bool =
|
|
address in download.exhaustedBlocks
|
|
|
|
proc getBlockAddressesForRange*(
|
|
download: ActiveDownload, start: uint64, count: uint64
|
|
): seq[BlockAddress] =
|
|
result = @[]
|
|
for i in start ..< start + count:
|
|
let address = download.makeBlockAddress(i)
|
|
if address in download.blocks:
|
|
result.add(address)
|
|
|
|
func contains*(download: ActiveDownload, address: BlockAddress): bool =
|
|
address in download.blocks
|
|
|
|
proc markBlockReturned*(download: ActiveDownload) =
|
|
download.ctx.markBlockReturned()
|
|
|
|
proc markBatchInFlight*(
|
|
download: ActiveDownload,
|
|
start: uint64,
|
|
count: uint64,
|
|
localCount: uint64,
|
|
peerId: PeerId,
|
|
timeoutFuture: Future[void] = nil,
|
|
) =
|
|
download.pendingBatches[start] = PendingBatch(
|
|
start: start,
|
|
count: count,
|
|
localCount: localCount,
|
|
peerId: peerId,
|
|
sentAt: Moment.now(),
|
|
timeoutFuture: timeoutFuture,
|
|
)
|
|
|
|
proc setBatchTimeoutFuture*(
|
|
download: ActiveDownload, start: uint64, timeoutFuture: Future[void]
|
|
) =
|
|
download.pendingBatches.withValue(start, pending):
|
|
pending[].timeoutFuture = timeoutFuture
|
|
|
|
proc setBatchRequestFuture*(
|
|
download: ActiveDownload, start: uint64, requestFuture: Future[void]
|
|
) =
|
|
download.pendingBatches.withValue(start, pending):
|
|
pending[].requestFuture = requestFuture
|
|
|
|
proc completeBatchLocal*(download: ActiveDownload, start: uint64, count: uint64) =
|
|
download.ctx.scheduler.markComplete(start)
|
|
download.ctx.markBatchReceived(start, count, 0)
|
|
download.signalCompletionIfDone()
|
|
|
|
proc completeBatch*(
|
|
download: ActiveDownload,
|
|
start: uint64,
|
|
blocksDeliveryCount: uint64,
|
|
totalBytes: uint64,
|
|
) =
|
|
var localCount: uint64 = 0
|
|
download.pendingBatches.withValue(start, pending):
|
|
localCount = pending[].localCount
|
|
if not pending[].timeoutFuture.isNil and not pending[].timeoutFuture.finished:
|
|
pending[].timeoutFuture.cancelSoon()
|
|
download.pendingBatches.del(start)
|
|
|
|
download.ctx.scheduler.markComplete(start)
|
|
|
|
download.ctx.markBatchReceived(start, localCount + blocksDeliveryCount, totalBytes)
|
|
|
|
download.signalCompletionIfDone()
|
|
|
|
proc requeueBatch*(
|
|
download: ActiveDownload, start: uint64, count: uint64, front: bool = false
|
|
) =
|
|
download.pendingBatches.withValue(start, pending):
|
|
if not pending[].timeoutFuture.isNil and not pending[].timeoutFuture.finished:
|
|
pending[].timeoutFuture.cancelSoon()
|
|
download.pendingBatches.del(start)
|
|
|
|
if front:
|
|
download.ctx.scheduler.requeueFront(start, count)
|
|
else:
|
|
download.ctx.scheduler.requeueBack(start, count)
|
|
|
|
proc partialCompleteBatch*(
|
|
download: ActiveDownload,
|
|
originalStart: uint64,
|
|
originalCount: uint64,
|
|
receivedBlocksCount: uint64,
|
|
missingRanges: seq[tuple[start: uint64, count: uint64]],
|
|
totalBytes: uint64,
|
|
) =
|
|
var localCount: uint64 = 0
|
|
download.pendingBatches.withValue(originalStart, pending):
|
|
localCount = pending[].localCount
|
|
if not pending[].timeoutFuture.isNil and not pending[].timeoutFuture.finished:
|
|
pending[].timeoutFuture.cancelSoon()
|
|
download.pendingBatches.del(originalStart)
|
|
|
|
var missingBatches: seq[BlockBatch] = @[]
|
|
for r in missingRanges:
|
|
missingBatches.add((start: r.start, count: r.count))
|
|
|
|
download.ctx.scheduler.partialComplete(originalStart, missingBatches)
|
|
|
|
download.ctx.markBatchReceived(
|
|
originalStart, localCount + receivedBlocksCount, totalBytes
|
|
)
|
|
|
|
download.signalCompletionIfDone()
|
|
|
|
proc isDownloadComplete*(download: ActiveDownload): bool =
|
|
download.ctx.isComplete()
|
|
|
|
proc hasWorkRemaining*(download: ActiveDownload): bool =
|
|
not download.ctx.scheduler.isEmpty()
|
|
|
|
proc pendingBatchCount*(download: ActiveDownload): int =
|
|
download.pendingBatches.len
|
|
|
|
proc handlePeerFailure*(download: ActiveDownload, peerId: PeerId) =
|
|
var toRequeue: seq[tuple[start: uint64, count: uint64]] = @[]
|
|
for start, batch in download.pendingBatches:
|
|
if batch.peerId == peerId:
|
|
toRequeue.add((start, batch.count))
|
|
|
|
for (start, count) in toRequeue:
|
|
download.requeueBatch(start, count, front = true)
|
|
|
|
trace "Requeued batches from failed peer", peer = peerId, batches = toRequeue.len
|
|
|
|
proc getSwarm(download: ActiveDownload): Swarm =
|
|
download.ctx.swarm
|
|
|
|
proc updatePeerAvailability*(
|
|
download: ActiveDownload, peerId: PeerId, availability: BlockAvailability
|
|
) =
|
|
if download.ctx.swarm.getPeer(peerId).isNone:
|
|
discard download.ctx.swarm.addPeer(peerId, availability)
|
|
else:
|
|
download.ctx.swarm.updatePeerAvailability(peerId, availability)
|
|
|
|
proc addPeerIfAbsent*(
|
|
download: ActiveDownload, peerId: PeerId, availability: BlockAvailability
|
|
): bool =
|
|
let existingPeer = download.ctx.swarm.getPeer(peerId)
|
|
if existingPeer.isSome:
|
|
# peer already tracked, skip if bakComplete
|
|
return existingPeer.get().availability.kind != bakComplete
|
|
|
|
discard download.ctx.swarm.addPeer(peerId, availability)
|
|
return true # new peer added, send WantHave
|
|
|
|
proc handleBatchRetry*(
|
|
download: ActiveDownload, start: uint64, count: uint64, waitTime: Duration
|
|
) {.async: (raises: [CancelledError]).} =
|
|
let
|
|
addresses = download.getBlockAddressesForRange(start, count)
|
|
exhausted = download.decrementBlockRetries(addresses)
|
|
if exhausted.len > 0:
|
|
warn "Block retries exhausted",
|
|
treeCid = download.treeCid, exhaustedCount = exhausted.len
|
|
download.failExhaustedBlocks(exhausted)
|
|
|
|
download.requeueBatch(start, count, front = false)
|
|
await sleepAsync(waitTime)
|