From 0e4387d1b363db847b2a7da5ce4e20beafe4953e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Uhl=C3=AD=C5=99?= Date: Tue, 28 Nov 2023 22:04:11 +0100 Subject: [PATCH] refactor: move expiry update from fetchBatched (#634) --- codex/errors.nim | 12 ++++++++++ codex/node.nim | 38 +++++++++++++++--------------- codex/node/batch.nim | 3 ++- codex/sales/states/downloading.nim | 8 ++----- tests/codex/testnode.nim | 36 +++++++++++++--------------- tests/examples.nim | 6 +++-- 6 files changed, 56 insertions(+), 47 deletions(-) diff --git a/codex/errors.nim b/codex/errors.nim index 135af278..ebb07f04 100644 --- a/codex/errors.nim +++ b/codex/errors.nim @@ -8,6 +8,8 @@ ## those terms. import pkg/stew/results +import pkg/chronos +import pkg/questionable/results export results @@ -26,3 +28,13 @@ template mapFailure*[T, V, E]( template mapFailure*[T, V](exp: Result[T, V]): Result[T, ref CatchableError] = mapFailure(exp, CodexError) + +proc allFutureResult*[T](fut: seq[Future[T]]): Future[?!void] {.async.} = + try: + await allFuturesThrowing(fut) + except CancelledError as exc: + raise exc + except CatchableError as exc: + return failure(exc.msg) + + return success() diff --git a/codex/node.nim b/codex/node.nim index 5470af5a..cd1af002 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -38,6 +38,7 @@ import ./discovery import ./contracts import ./node/batch import ./utils +import ./errors export batch @@ -126,8 +127,7 @@ proc fetchBatched*( node: CodexNodeRef, manifest: Manifest, batchSize = FetchBatch, - onBatch: BatchProc = nil, - expiry = SecondsSince1970.none): Future[?!void] {.async, gcsafe.} = + onBatch: BatchProc = nil): Future[?!void] {.async, gcsafe.} = ## Fetch manifest in batches of `batchSize` ## @@ -144,18 +144,11 @@ proc fetchBatched*( if not iter.finished: iter.next() - try: - await allFuturesThrowing(allFinished(blocks)) + if blocksErr =? (await allFutureResult(blocks)).errorOption: + return failure(blocksErr) - if expiryValue =? expiry: - await allFuturesThrowing(blocks.mapIt(node.blockStore.ensureExpiry(it.read.get.cid, expiryValue))) - - if not onBatch.isNil: - await onBatch(blocks.mapIt( it.read.get )) - except CancelledError as exc: - raise exc - except CatchableError as exc: - return failure(exc.msg) + if not onBatch.isNil and batchErr =? (await onBatch(blocks.mapIt( it.read.get ))).errorOption: + return failure(batchErr) return success() @@ -434,11 +427,18 @@ proc start*(node: CodexNodeRef) {.async.} = return failure(error) trace "Fetching block for manifest", cid - # TODO: This will probably require a call to `getBlock` either way, - # since fetching of blocks will have to be selective according - # to a combination of parameters, such as node slot position - # and dataset geometry - if fetchErr =? (await node.fetchBatched(manifest, onBatch = onBatch, expiry = some request.expiry.toSecondsSince1970)).errorOption: + let expiry = request.expiry.toSecondsSince1970 + proc expiryUpdateOnBatch(blocks: seq[bt.Block]): Future[?!void] {.async.} = + let ensureExpiryFutures = blocks.mapIt(node.blockStore.ensureExpiry(it.cid, expiry)) + if updateExpiryErr =? (await allFutureResult(ensureExpiryFutures)).errorOption: + return failure(updateExpiryErr) + + if not onBatch.isNil and onBatchErr =? (await onBatch(blocks)).errorOption: + return failure(onBatchErr) + + return success() + + if fetchErr =? (await node.fetchBatched(manifest, onBatch = expiryUpdateOnBatch)).errorOption: let error = newException(CodexError, "Unable to retrieve blocks") error.parent = fetchErr return failure(error) @@ -465,7 +465,7 @@ proc start*(node: CodexNodeRef) {.async.} = try: await hostContracts.start() except CatchableError as error: - error "Unable to start host contract interactions: ", error=error.msg + error "Unable to start host contract interactions", error=error.msg node.contracts.host = HostInteractions.none if clientContracts =? node.contracts.client: diff --git a/codex/node/batch.nim b/codex/node/batch.nim index b6482989..bbe5f0dd 100644 --- a/codex/node/batch.nim +++ b/codex/node/batch.nim @@ -1,6 +1,7 @@ import pkg/chronos +import pkg/questionable/results import pkg/upraises import ../blocktype as bt type - BatchProc* = proc(blocks: seq[bt.Block]): Future[void] {.gcsafe, upraises:[].} + BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, upraises:[].} diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index 6298657c..f79f0d68 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -51,7 +51,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} reservationId = reservation.id availabilityId = reservation.availabilityId - proc onBatch(blocks: seq[bt.Block]) {.async.} = + proc onBatch(blocks: seq[bt.Block]): Future[?!void] {.async.} = # release batches of blocks as they are written to disk and # update availability size var bytes: uint = 0 @@ -59,13 +59,9 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} bytes += blk.data.len.uint trace "Releasing batch of bytes written to disk", bytes - let r = await reservations.release(reservation.id, + return await reservations.release(reservation.id, reservation.availabilityId, bytes) - # `tryGet` will raise the exception that occurred during release, if there - # was one. The exception will be caught in the closure and sent to the - # SaleErrored state. - r.tryGet() trace "Starting download" if err =? (await onStore(request, diff --git a/tests/codex/testnode.nim b/tests/codex/testnode.nim index 19a25e20..bb3876e3 100644 --- a/tests/codex/testnode.nim +++ b/tests/codex/testnode.nim @@ -1,6 +1,7 @@ import std/os import std/options import std/math +import std/times import pkg/asynctest import pkg/chronos @@ -8,6 +9,7 @@ import pkg/chronicles import pkg/stew/byteutils import pkg/datastore import pkg/questionable +import pkg/questionable/results import pkg/stint import pkg/nitro @@ -29,6 +31,9 @@ import ./helpers import ./helpers/mockmarket import ./helpers/mockclock +proc toTimesDuration(d: chronos.Duration): times.Duration = + initDuration(seconds=d.seconds) + asyncchecksuite "Test Node": let (path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name @@ -129,25 +134,11 @@ asyncchecksuite "Test Node": (await node.fetchBatched( manifest, batchSize = batchSize, - proc(blocks: seq[bt.Block]) {.gcsafe, async.} = + proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, async.} = check blocks.len > 0 and blocks.len <= batchSize + return success() )).tryGet() - test "Block Batching with expiry": - let - manifest = await Manifest.fetch(chunker) - # The blocks have set default TTL, so in order to update it we have to have larger TTL - expectedExpiry: SecondsSince1970 = clock.now + DefaultBlockTtl.seconds + 123 - - (await node.fetchBatched(manifest, expiry=some expectedExpiry)).tryGet() - - for index in 0..