refactor: move expiry update from fetchBatched (#634)

This commit is contained in:
Adam Uhlíř 2023-11-28 22:04:11 +01:00 committed by GitHub
parent 22c31046a7
commit 0e4387d1b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 56 additions and 47 deletions

View File

@ -8,6 +8,8 @@
## those terms.
import pkg/stew/results
import pkg/chronos
import pkg/questionable/results
export results
@ -26,3 +28,13 @@ template mapFailure*[T, V, E](
template mapFailure*[T, V](exp: Result[T, V]): Result[T, ref CatchableError] =
mapFailure(exp, CodexError)
proc allFutureResult*[T](fut: seq[Future[T]]): Future[?!void] {.async.} =
try:
await allFuturesThrowing(fut)
except CancelledError as exc:
raise exc
except CatchableError as exc:
return failure(exc.msg)
return success()

View File

@ -38,6 +38,7 @@ import ./discovery
import ./contracts
import ./node/batch
import ./utils
import ./errors
export batch
@ -126,8 +127,7 @@ proc fetchBatched*(
node: CodexNodeRef,
manifest: Manifest,
batchSize = FetchBatch,
onBatch: BatchProc = nil,
expiry = SecondsSince1970.none): Future[?!void] {.async, gcsafe.} =
onBatch: BatchProc = nil): Future[?!void] {.async, gcsafe.} =
## Fetch manifest in batches of `batchSize`
##
@ -144,18 +144,11 @@ proc fetchBatched*(
if not iter.finished:
iter.next()
try:
await allFuturesThrowing(allFinished(blocks))
if blocksErr =? (await allFutureResult(blocks)).errorOption:
return failure(blocksErr)
if expiryValue =? expiry:
await allFuturesThrowing(blocks.mapIt(node.blockStore.ensureExpiry(it.read.get.cid, expiryValue)))
if not onBatch.isNil:
await onBatch(blocks.mapIt( it.read.get ))
except CancelledError as exc:
raise exc
except CatchableError as exc:
return failure(exc.msg)
if not onBatch.isNil and batchErr =? (await onBatch(blocks.mapIt( it.read.get ))).errorOption:
return failure(batchErr)
return success()
@ -434,11 +427,18 @@ proc start*(node: CodexNodeRef) {.async.} =
return failure(error)
trace "Fetching block for manifest", cid
# TODO: This will probably require a call to `getBlock` either way,
# since fetching of blocks will have to be selective according
# to a combination of parameters, such as node slot position
# and dataset geometry
if fetchErr =? (await node.fetchBatched(manifest, onBatch = onBatch, expiry = some request.expiry.toSecondsSince1970)).errorOption:
let expiry = request.expiry.toSecondsSince1970
proc expiryUpdateOnBatch(blocks: seq[bt.Block]): Future[?!void] {.async.} =
let ensureExpiryFutures = blocks.mapIt(node.blockStore.ensureExpiry(it.cid, expiry))
if updateExpiryErr =? (await allFutureResult(ensureExpiryFutures)).errorOption:
return failure(updateExpiryErr)
if not onBatch.isNil and onBatchErr =? (await onBatch(blocks)).errorOption:
return failure(onBatchErr)
return success()
if fetchErr =? (await node.fetchBatched(manifest, onBatch = expiryUpdateOnBatch)).errorOption:
let error = newException(CodexError, "Unable to retrieve blocks")
error.parent = fetchErr
return failure(error)
@ -465,7 +465,7 @@ proc start*(node: CodexNodeRef) {.async.} =
try:
await hostContracts.start()
except CatchableError as error:
error "Unable to start host contract interactions: ", error=error.msg
error "Unable to start host contract interactions", error=error.msg
node.contracts.host = HostInteractions.none
if clientContracts =? node.contracts.client:

View File

@ -1,6 +1,7 @@
import pkg/chronos
import pkg/questionable/results
import pkg/upraises
import ../blocktype as bt
type
BatchProc* = proc(blocks: seq[bt.Block]): Future[void] {.gcsafe, upraises:[].}
BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, upraises:[].}

View File

@ -51,7 +51,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
reservationId = reservation.id
availabilityId = reservation.availabilityId
proc onBatch(blocks: seq[bt.Block]) {.async.} =
proc onBatch(blocks: seq[bt.Block]): Future[?!void] {.async.} =
# release batches of blocks as they are written to disk and
# update availability size
var bytes: uint = 0
@ -59,13 +59,9 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
bytes += blk.data.len.uint
trace "Releasing batch of bytes written to disk", bytes
let r = await reservations.release(reservation.id,
return await reservations.release(reservation.id,
reservation.availabilityId,
bytes)
# `tryGet` will raise the exception that occurred during release, if there
# was one. The exception will be caught in the closure and sent to the
# SaleErrored state.
r.tryGet()
trace "Starting download"
if err =? (await onStore(request,

View File

@ -1,6 +1,7 @@
import std/os
import std/options
import std/math
import std/times
import pkg/asynctest
import pkg/chronos
@ -8,6 +9,7 @@ import pkg/chronicles
import pkg/stew/byteutils
import pkg/datastore
import pkg/questionable
import pkg/questionable/results
import pkg/stint
import pkg/nitro
@ -29,6 +31,9 @@ import ./helpers
import ./helpers/mockmarket
import ./helpers/mockclock
proc toTimesDuration(d: chronos.Duration): times.Duration =
initDuration(seconds=d.seconds)
asyncchecksuite "Test Node":
let
(path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name
@ -129,25 +134,11 @@ asyncchecksuite "Test Node":
(await node.fetchBatched(
manifest,
batchSize = batchSize,
proc(blocks: seq[bt.Block]) {.gcsafe, async.} =
proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, async.} =
check blocks.len > 0 and blocks.len <= batchSize
return success()
)).tryGet()
test "Block Batching with expiry":
let
manifest = await Manifest.fetch(chunker)
# The blocks have set default TTL, so in order to update it we have to have larger TTL
expectedExpiry: SecondsSince1970 = clock.now + DefaultBlockTtl.seconds + 123
(await node.fetchBatched(manifest, expiry=some expectedExpiry)).tryGet()
for index in 0..<manifest.blocksCount:
let blk = (await localStore.getBlock(manifest.treeCid, index)).tryGet
let expiryKey = (createBlockExpirationMetadataKey(blk.cid)).tryGet
let expiry = await localStoreMetaDs.get(expiryKey)
check (expiry.tryGet).toSecondsSince1970 == expectedExpiry
test "Store and retrieve Data Stream":
let
stream = BufferStream.new()
@ -252,7 +243,7 @@ asyncchecksuite "Test Node - host contracts":
# Setup Host Contracts and dependencies
let market = MockMarket.new()
sales = Sales.new(market, clock, localStore, 0)
sales = Sales.new(market, clock, localStore)
let hostContracts = some HostInteractions.new(clock, sales)
node.contracts = (ClientInteractions.none, hostContracts, ValidatorInteractions.none)
@ -296,13 +287,20 @@ asyncchecksuite "Test Node - host contracts":
let onStore = !sales.onStore
var request = StorageRequest.example
request.content.cid = manifestCid
request.expiry = (getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix.u256
var fetchedBytes: uint = 0
let onBatch = proc(blocks: seq[bt.Block]) {.async.} =
let onBatch = proc(blocks: seq[bt.Block]): Future[?!void] {.async.} =
for blk in blocks:
fetchedBytes += blk.data.len.uint
return success()
(await onStore(request, 0.u256, onBatch)).tryGet()
check fetchedBytes == 2291520
for index in 0..<manifest.blocksCount:
let blk = (await localStore.getBlock(manifest.treeCid, index)).tryGet
let expiryKey = (createBlockExpirationMetadataKey(blk.cid)).tryGet
let expiry = await localStoreMetaDs.get(expiryKey)
check (expiry.tryGet).toSecondsSince1970 == request.expiry.toSecondsSince1970

View File

@ -2,10 +2,12 @@ import std/random
import std/sequtils
import std/times
import std/typetraits
import pkg/codex/contracts/requests
import pkg/codex/sales/slotqueue
import pkg/stint
import pkg/codex/stores
import pkg/stint
proc exampleString*(length: int): string =
let chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
result = newString(length) # Create a new empty string with a given length
@ -46,7 +48,7 @@ proc example*(_: type StorageRequest): StorageRequest =
cid: "zb2rhheVmk3bLks5MgzTqyznLu1zqGH5jrfTA1eAZXrjx7Vob",
merkleRoot: array[32, byte].example
),
expiry: (getTime() + initDuration(hours=1)).toUnix.u256,
expiry: (getTime() + 1.hours).toUnix.u256,
nonce: Nonce.example
)