From 0ab4b1cf9ed801ab75d199e1c8b7032f3293f692 Mon Sep 17 00:00:00 2001 From: Tomasz Bekas Date: Wed, 5 Jun 2024 19:09:04 +0200 Subject: [PATCH] Expiry per dataset --- README.md | 9 +- codex/codex.nim | 20 +- codex/conf.nim | 31 +- codex/namespaces.nim | 2 + codex/node.nim | 59 ++-- codex/stores/blockstore.nim | 27 +- codex/stores/cachestore.nim | 24 +- codex/stores/keyutils.nim | 8 + codex/stores/maintenance.nim | 333 +++++++++++++++++--- codex/stores/networkstore.nim | 45 +-- codex/stores/repostore/operations.nim | 14 +- codex/stores/repostore/store.nim | 78 +---- codex/stores/repostore/types.nim | 17 +- tests/codex/helpers.nim | 5 + tests/codex/helpers/mockrepostore.nim | 1 - tests/codex/node/helpers.nim | 5 +- tests/codex/node/testcontracts.nim | 32 +- tests/codex/node/testnode.nim | 2 +- tests/codex/stores/repostore/testcoders.nim | 1 - tests/codex/stores/testmaintenance.nim | 216 +++++-------- tests/codex/stores/testrepostore.nim | 176 +---------- tests/integration/testblockexpiration.nim | 16 +- 22 files changed, 473 insertions(+), 648 deletions(-) diff --git a/README.md b/README.md index 22cbe219..70803d36 100644 --- a/README.md +++ b/README.md @@ -88,12 +88,9 @@ The following options are available: --api-bindaddr The REST API bind address [=127.0.0.1]. -p, --api-port The REST Api port [=8080]. --repo-kind Backend for main repo store (fs, sqlite) [=fs]. - -q, --storage-quota The size of the total storage quota dedicated to the node [=8589934592]. - -t, --block-ttl Default block timeout in seconds - 0 disables the ttl [=$DefaultBlockTtl]. - --block-mi Time interval in seconds - determines frequency of block maintenance cycle: how - often blocks are checked for expiration and cleanup - [=$DefaultBlockMaintenanceInterval]. - --block-mn Number of blocks to check every maintenance cycle [=1000]. + -q, --storage-quota The size in bytes of the total storage quota dedicated to the node [=8589934592]. + -t, --default-ttl Default dataset expiry [=1d]. + --maintenance-interval Determines frequently datasets are checked for expiration and cleanup [=5m]. -c, --cache-size The size of the block cache, 0 disables the cache - might help on slow hardrives [=0]. diff --git a/codex/codex.nim b/codex/codex.nim index 0b9182fb..5a377ca5 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -54,7 +54,7 @@ type restServer: RestServerRef codexNode: CodexNodeRef repoStore: RepoStore - maintenance: BlockMaintainer + maintenance: DatasetMaintainer taskpool: Taskpool CodexPrivateKey* = libp2p.PrivateKey # alias @@ -246,6 +246,9 @@ proc new*( wallet = WalletRef.new(EthPrivateKey.random()) network = BlockExcNetwork.new(switch) + metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace) + .expect("Should create metadata store!") + repoData = case config.repoKind of repoFS: Datastore(FSDatastore.new($config.dataDir, depth = 5) .expect("Should create repo file data store!")) @@ -256,21 +259,21 @@ proc new*( repoStore = RepoStore.new( repoDs = repoData, - metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace) - .expect("Should create metadata store!"), - quotaMaxBytes = config.storageQuota, - blockTtl = config.blockTtl) + metaDs = metaDs, + quotaMaxBytes = config.storageQuota) - maintenance = BlockMaintainer.new( + maintenance = DatasetMaintainer.new( repoStore, - interval = config.blockMaintenanceInterval, - numberOfBlocksPerInterval = config.blockMaintenanceNumberOfBlocks) + metaDs, + defaultExpiry = config.defaultExpiry, + interval = config.maintenanceInterval) peerStore = PeerCtxStore.new() pendingBlocks = PendingBlocksManager.new() blockDiscovery = DiscoveryEngine.new(repoStore, peerStore, network, discovery, pendingBlocks) engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, peerStore, pendingBlocks) store = NetworkStore.new(engine, repoStore) + prover = if config.prover: if not fileAccessible($config.circomR1cs, {AccessFlags.Read}) and endsWith($config.circomR1cs, ".r1cs"): @@ -306,6 +309,7 @@ proc new*( codexNode = CodexNodeRef.new( switch = switch, networkStore = store, + maintenance = maintenance, engine = engine, prover = prover, discovery = discovery, diff --git a/codex/conf.nim b/codex/conf.nim index fb7548c7..306f9185 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -42,9 +42,8 @@ export units, net, codextypes, logutils export DefaultQuotaBytes, - DefaultBlockTtl, - DefaultBlockMaintenanceInterval, - DefaultNumberOfBlocksToMaintainPerInterval + DefaultDefaultExpiry, + DefaultMaintenanceInterval proc defaultDataDir*(): string = let dataDir = when defined(windows): @@ -209,24 +208,18 @@ type name: "storage-quota" abbr: "q" }: NBytes - blockTtl* {. - desc: "Default block timeout in seconds - 0 disables the ttl" - defaultValue: DefaultBlockTtl - defaultValueDesc: $DefaultBlockTtl - name: "block-ttl" + defaultExpiry* {. + desc: "Default dataset expiry in seconds" + defaultValue: DefaultDefaultExpiry + defaultValueDesc: $DefaultDefaultExpiry + name: "default-ttl" abbr: "t" }: Duration - blockMaintenanceInterval* {. - desc: "Time interval in seconds - determines frequency of block maintenance cycle: how often blocks are checked for expiration and cleanup" - defaultValue: DefaultBlockMaintenanceInterval - defaultValueDesc: $DefaultBlockMaintenanceInterval - name: "block-mi" }: Duration - - blockMaintenanceNumberOfBlocks* {. - desc: "Number of blocks to check every maintenance cycle" - defaultValue: DefaultNumberOfBlocksToMaintainPerInterval - defaultValueDesc: $DefaultNumberOfBlocksToMaintainPerInterval - name: "block-mn" }: int + maintenanceInterval* {. + desc: "Maintenance interval in seconds - determines frequency of maintenance cycle: how often datasets are checked for expiration and cleanup" + defaultValue: DefaultMaintenanceInterval + defaultValueDesc: $DefaultMaintenanceInterval + name: "maintenance-interval" }: Duration cacheSize* {. desc: "The size of the block cache, 0 disables the cache - might help on slow hardrives" diff --git a/codex/namespaces.nim b/codex/namespaces.nim index 42f5684a..8b8c7403 100644 --- a/codex/namespaces.nim +++ b/codex/namespaces.nim @@ -18,6 +18,8 @@ const CodexMetaNamespace & "/ttl" CodexBlockProofNamespace* = # Cid and Proof CodexMetaNamespace & "/proof" + CodexDatasetMetadataNamespace* = # Dataset + CodexMetaNamespace & "/dataset" CodexDhtNamespace* = "dht" # Dht namespace CodexDhtProvidersNamespace* = # Dht providers namespace CodexDhtNamespace & "/providers" diff --git a/codex/node.nim b/codex/node.nim index e5156a69..8742c568 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -65,6 +65,7 @@ type switch: Switch networkId: PeerId networkStore: NetworkStore + maintenance: DatasetMaintainer engine: BlockExcEngine prover: ?Prover discovery: Discovery @@ -155,17 +156,8 @@ proc updateExpiry*( trace "Unable to fetch manifest for cid", manifestCid return failure(error) - try: - let - ensuringFutures = Iter[int].new(0.. k / $index) + +proc createDatasetMetadataKey*(treeCid: Cid): ?!Key = + DatasetMetadataKey / $treeCid + +proc createDatasetMetadataQueryKey*(): ?!Key = + let queryString = ? (DatasetMetadataKey / "*") + Key.init(queryString) \ No newline at end of file diff --git a/codex/stores/maintenance.nim b/codex/stores/maintenance.nim index 63c6ba40..74e38518 100644 --- a/codex/stores/maintenance.nim +++ b/codex/stores/maintenance.nim @@ -10,92 +10,323 @@ ## Store maintenance module ## Looks for and removes expired blocks from blockstores. +import std/sequtils + import pkg/chronos +import pkg/chronicles +import pkg/libp2p/cid +import pkg/serde/json +import pkg/datastore +import pkg/datastore/typedds import pkg/questionable import pkg/questionable/results -import ./repostore +import ./blockstore +import ./keyutils +import ./queryiterhelper import ../utils/timer import ../utils/asynciter +import ../utils/json +import ../utils/trackedfutures import ../clock import ../logutils import ../systemclock +logScope: + topics = "codex maintenance" + const - DefaultBlockMaintenanceInterval* = 10.minutes - DefaultNumberOfBlocksToMaintainPerInterval* = 1000 + DefaultDefaultExpiry* = 24.hours + DefaultMaintenanceInterval* = 5.minutes + DefaultBatchSize* = 1000 + + # if no progress was observed for this amount of time + # we're going to retry deletion of the dataset + DefaultRetryDelay* = 15.minutes type - BlockMaintainer* = ref object of RootObj - repoStore: RepoStore + DatasetMaintainer* = ref object of RootObj + blockStore: BlockStore + metaDs: TypedDatastore interval: Duration + defaultExpiry: Duration + batchSize: int + retryDelay: Duration timer: Timer clock: Clock - numberOfBlocksPerInterval: int - offset: int + trackedFutures: TrackedFutures + + Checkpoint* {.serialize.} = object + timestamp*: SecondsSince1970 + progress*: Natural + + DatasetMetadata* {.serialize.} = object + expiry*: SecondsSince1970 + leavesCount*: Natural + manifestsCids*: seq[Cid] + checkpoint*: Checkpoint + + MissingKey* = object of CodexError proc new*( - T: type BlockMaintainer, - repoStore: RepoStore, - interval: Duration, - numberOfBlocksPerInterval = 100, + T: type DatasetMaintainer, + blockStore: BlockStore, + metaDs: Datastore, + defaultExpiry = DefaultDefaultExpiry, + interval = DefaultMaintenanceInterval, + batchSize = DefaultBatchSize, + retryDelay = DefaultRetryDelay, timer = Timer.new(), - clock: Clock = SystemClock.new() -): BlockMaintainer = - ## Create new BlockMaintainer instance + clock: Clock = SystemClock.new(), + trackedFutures = TrackedFutures.new() +): DatasetMaintainer = + ## Create new DatasetMaintainer instance ## ## Call `start` to begin looking for for expired blocks ## - BlockMaintainer( - repoStore: repoStore, + DatasetMaintainer( + blockStore: blockStore, + metaDs: TypedDatastore.init(metaDs), + defaultExpiry: defaultExpiry, interval: interval, - numberOfBlocksPerInterval: numberOfBlocksPerInterval, + batchSize: batchSize, + retryDelay: retryDelay, timer: timer, clock: clock, - offset: 0) + trackedFutures: trackedFutures) -proc deleteExpiredBlock(self: BlockMaintainer, cid: Cid): Future[void] {.async.} = - if isErr (await self.repoStore.delBlock(cid)): - trace "Unable to delete block from repoStore" +proc encode(t: Checkpoint): seq[byte] = serializer.toJson(t).toBytes() +proc decode(T: type Checkpoint, bytes: seq[byte]): ?!T = T.fromJson(bytes) -proc processBlockExpiration(self: BlockMaintainer, be: BlockExpiration): Future[void] {.async} = - if be.expiry < self.clock.now: - await self.deleteExpiredBlock(be.cid) - else: - inc self.offset +proc encode(t: DatasetMetadata): seq[byte] = serializer.toJson(t).toBytes() +proc decode(T: type DatasetMetadata, bytes: seq[byte]): ?!T = T.fromJson(bytes) -proc runBlockCheck(self: BlockMaintainer): Future[void] {.async.} = - let expirations = await self.repoStore.getBlockExpirations( - maxNumber = self.numberOfBlocksPerInterval, - offset = self.offset +proc trackExpiry*( + self: DatasetMaintainer, + treeCid: Cid, + leavesCount: Natural, + expiry: SecondsSince1970, + manifestsCids: seq[Cid] = @[] +): Future[?!void] {.async.} = + # Starts tracking expiry of a given dataset + # + + trace "Tracking an expiry of a dataset", treeCid, leavesCount, expiry + + without key =? createDatasetMetadataKey(treeCid), err: + return failure(err) + + await modify[DatasetMetadata](self.metaDs, key, + proc (maybeCurrDatasetMd: ?DatasetMetadata): Future[?DatasetMetadata] {.async.} = + var md: DatasetMetadata + + if currDatasetMd =? maybeCurrDatasetMd: + md.expiry = max(currDatasetMd.expiry, expiry) + + if currDatasetMd.leavesCount != leavesCount: + raise newException(CatchableError, "DatasetMetadata for treeCid " & $treeCid & " is already stored with leavesCount " & + $currDatasetMd.leavesCount & ", cannot override it with leavesCount " & $leavesCount) + + md.leavesCount = leavesCount + md.manifestsCids = (currDatasetMd.manifestsCids & manifestsCids).deduplicate + md.checkpoint = Checkpoint(progress: 0, timestamp: 0) + else: + md.expiry = expiry + md.leavesCount = leavesCount + md.manifestsCids = manifestsCids + md.checkpoint = Checkpoint(progress: 0, timestamp: 0) + + md.some ) - without iter =? expirations, err: - trace "Unable to obtain blockExpirations iterator from repoStore" - return +proc trackExpiry*( + self: DatasetMaintainer, + treeCid: Cid, + leavesCount: Natural, + manifestsCids: seq[Cid] = @[] +): Future[?!void] {.async.} = + await self.trackExpiry(treeCid, leavesCount, self.clock.now + self.defaultExpiry.seconds, manifestsCids) - var numberReceived = 0 - for beFut in iter: - let be = await beFut - inc numberReceived - await self.processBlockExpiration(be) - await sleepAsync(1.millis) # cooperative scheduling +proc trackExpiry*( + self: DatasetMaintainer, + cid: Cid, + manifestsCids: seq[Cid] = @[] +): Future[?!void] {.async.} = + await self.trackExpiry(cid, 0, self.clock.now + self.defaultExpiry.seconds, manifestsCids) - # If we received fewer blockExpirations from the iterator than we asked for, - # We're at the end of the dataset and should start from 0 next time. - if numberReceived < self.numberOfBlocksPerInterval: - self.offset = 0 +proc ensureExpiry*( + self: DatasetMaintainer, + treeCid: Cid, + minExpiry: SecondsSince1970): Future[?!void] {.async.} = + ## Sets the dataset expiry to a max of two values: current expiry and `minExpiry`, + ## if a dataset for given `treeCid` is not currently tracked a CatchableError is thrown + ## -proc start*(self: BlockMaintainer) = + trace "Updating a dataset expiry", treeCid, minExpiry + + without key =? createDatasetMetadataKey(treeCid), err: + return failure(err) + + await modify[DatasetMetadata](self.metaDs, key, + proc (maybeCurrDatasetMd: ?DatasetMetadata): Future[?DatasetMetadata] {.async.} = + if currDatasetMd =? maybeCurrDatasetMd: + let datasetMd = DatasetMetadata( + expiry: max(currDatasetMd.expiry, minExpiry), + leavesCount: currDatasetMd.leavesCount, + manifestsCids: currDatasetMd.manifestsCids, + checkpoint: currDatasetMd.checkpoint + ) + return datasetMd.some + else: + raise newException(CatchableError, "DatasetMetadata for treeCid " & $treeCid & " not found") + ) + + +proc recordCheckpoint*(self: DatasetMaintainer, treeCid: Cid, datasetMd: DatasetMetadata): Future[?!void] {.async.} = + # Saves progress or deletes dataset metadata if progress > leavesCount + # + + without key =? createDatasetMetadataKey(treeCid), err: + return failure(err) + + await self.metaDs.modify(key, + proc (maybeCurrDatasetMd: ?DatasetMetadata): Future[?DatasetMetadata] {.async.} = + if currDatasetMd =? maybeCurrDatasetMd: + if currDatasetMd.expiry != datasetMd.expiry or currDatasetMd.manifestsCids != datasetMd.manifestsCids: + raise newException(CatchableError, "Change in expiry detected, interrupting maintenance for dataset with treeCid " & $treeCid) + + if currDatasetMd.checkpoint.progress > datasetMd.checkpoint.progress: + raise newException(CatchableError, "Progress should be increasing only, treeCid " & $treeCid) + + if currDatasetMd.leavesCount <= datasetMd.checkpoint.progress: + DatasetMetadata.none + else: + datasetMd.some + else: + raise newException(CatchableError, "Metadata for dataset with treeCid " & $treeCid & " not found") + ) + +proc deleteBatch(self: DatasetMaintainer, treeCid: Cid, datasetMd: DatasetMetadata): Future[?!void] {.async.} = + var datasetMd = datasetMd + + datasetMd.checkpoint.timestamp = self.clock.now + + if err =? (await self.recordCheckpoint(treeCid, datasetMd)).errorOption: + return failure(err) + + logScope: + treeCid = treeCid + manifestsCids = datasetMd.manifestsCids + startingIndex = datasetMd.checkpoint.progress + + trace "Deleting a batch of blocks", size = self.batchSize + + var index = datasetMd.checkpoint.progress + while (index < datasetMd.checkpoint.progress + self.batchSize) and + (index < datasetMd.leavesCount): + if err =? (await self.blockStore.delBlock(treeCid, index)).errorOption: + error "Error deleting a block", msg = err.msg, index + + index.inc + + await sleepAsync(50.millis) # cooperative scheduling + + if index >= datasetMd.leavesCount: + trace "All blocks deleted from a dataset", leavesCount = datasetMd.leavesCount + + for manifestCid in datasetMd.manifestsCids: + if err =? (await self.blockStore.delBlock(manifestCid)).errorOption: + error "Error deleting manifest", cid = manifestCid + + if err =? (await self.blockStore.delBlock(treeCid)).errorOption: + error "Error deleting block", cid = treeCid + + if err =? (await self.recordCheckpoint(treeCid, datasetMd)).errorOption: + return failure(err) + + return success() + else: + datasetMd.checkpoint.progress = index + return await self.deleteBatch(treeCid, datasetMd) + +proc superviseDatasetDeletion(self: DatasetMaintainer, treeCid: Cid, datasetMd: DatasetMetadata): Future[void] {.async.} = + logScope: + treeCid = treeCid + manifestsCids = datasetMd.manifestsCids + expiry = datasetMd.expiry + leavesCount = datasetMd.leavesCount + + try: + if datasetMd.checkpoint.progress == 0 and datasetMd.checkpoint.timestamp == 0: + info "Initiating deletion of a dataset" + else: + info "Retrying deletion of a dataset", progress = datasetMd.checkpoint.progress, timestamp = datasetMd.checkpoint.timestamp + + if err =? (await self.deleteBatch(treeCid, datasetMd)).errorOption: + error "Error occurred during deletion of a dataset", msg = err.msg + else: + info "Dataset deletion complete" + except CancelledError as err: + raise err + except CatchableError as err: + error "Unexpected error during dataset deletion", msg = err.msg, treeCid = treeCid + +proc listDatasetMetadata*( + self: DatasetMaintainer +): Future[?!AsyncIter[(Cid, DatasetMetadata)]] {.async.} = + without queryKey =? createDatasetMetadataQueryKey(), err: + return failure(err) + + without queryIter =? await query[DatasetMetadata](self.metaDs, Query.init(queryKey)), err: + error "Unable to execute block expirations query", err = err.msg + return failure(err) + + without asyncQueryIter =? await queryIter.toAsyncIter(), err: + error "Unable to convert QueryIter to AsyncIter", err = err.msg + return failure(err) + + let + filteredIter = await asyncQueryIter.filterSuccess() + + datasetMdIter = await mapFilter[KeyVal[DatasetMetadata], (Cid, DatasetMetadata)](filteredIter, + proc (kv: KeyVal[DatasetMetadata]): Future[?(Cid, DatasetMetadata)] {.async.} = + without cid =? Cid.init(kv.key.value).mapFailure, err: + error "Failed decoding cid", err = err.msg + return (Cid, DatasetMetadata).none + + (cid, kv.value).some + ) + + success(datasetMdIter) + + +proc checkDatasets(self: DatasetMaintainer): Future[?!void] {.async.} = + without iter =? await self.listDatasetMetadata(), err: + return failure(err) + + for fut in iter: + let (treeCid, datasetMd) = await fut + + if (datasetMd.expiry < self.clock.now) and + (datasetMd.checkpoint.timestamp + self.retryDelay.seconds < self.clock.now): + asyncSpawn self.superviseDatasetDeletion(treeCid, datasetMd).track(self) + else: + trace "Item either not expired or expired but already in maintenance", treeCid, expiry = datasetMd.expiry, timestamp = datasetMd.checkpoint.timestamp + success() + +proc start*(self: DatasetMaintainer) = proc onTimer(): Future[void] {.async.} = try: - await self.runBlockCheck() - except CancelledError as error: - raise error - except CatchableError as exc: - error "Unexpected exception in BlockMaintainer.onTimer(): ", msg=exc.msg + if err =? (await self.checkDatasets()).errorOption: + error "Error when checking datasets", msg = err.msg + except CancelledError as err: + raise err + except CatchableError as err: + error "Error when checking datasets", msg = err.msg - self.timer.start(onTimer, self.interval) + if self.interval.seconds > 0: + self.timer.start(onTimer, self.interval) -proc stop*(self: BlockMaintainer): Future[void] {.async.} = +proc stop*(self: DatasetMaintainer): Future[void] {.async.} = await self.timer.stop() + await self.trackedFutures.cancelTracked() diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index 40758b94..96bc3c17 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -61,11 +61,13 @@ method getBlock*(self: NetworkStore, treeCid: Cid, index: Natural): Future[?!Blo method putBlock*( self: NetworkStore, - blk: Block, - ttl = Duration.none): Future[?!void] {.async.} = + blk: Block): Future[?!void] {.async.} = ## Store block locally and notify the network ## - let res = await self.localStore.putBlock(blk, ttl) + + trace "Putting block into network store", cid = blk.cid + + let res = await self.localStore.putBlock(blk) if res.isErr: return res @@ -89,43 +91,6 @@ method getCidAndProof*( self.localStore.getCidAndProof(treeCid, index) -method ensureExpiry*( - self: NetworkStore, - cid: Cid, - expiry: SecondsSince1970): Future[?!void] {.async.} = - ## Ensure that block's assosicated expiry is at least given timestamp - ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact - ## - - without blockCheck =? await self.localStore.hasBlock(cid), err: - return failure(err) - - if blockCheck: - return await self.localStore.ensureExpiry(cid, expiry) - else: - trace "Updating expiry - block not in local store", cid - - return success() - -method ensureExpiry*( - self: NetworkStore, - treeCid: Cid, - index: Natural, - expiry: SecondsSince1970): Future[?!void] {.async.} = - ## Ensure that block's associated expiry is at least given timestamp - ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact - ## - - without blockCheck =? await self.localStore.hasBlock(treeCid, index), err: - return failure(err) - - if blockCheck: - return await self.localStore.ensureExpiry(treeCid, index, expiry) - else: - trace "Updating expiry - block not in local store", treeCid, index - - return success() - method listBlocks*( self: NetworkStore, blockType = BlockType.Manifest): Future[?!AsyncIter[?Cid]] = diff --git a/codex/stores/repostore/operations.nim b/codex/stores/repostore/operations.nim index e000bb0a..ec32b523 100644 --- a/codex/stores/repostore/operations.nim +++ b/codex/stores/repostore/operations.nim @@ -109,8 +109,7 @@ proc updateBlockMetadata*( self: RepoStore, cid: Cid, plusRefCount: Natural = 0, - minusRefCount: Natural = 0, - minExpiry: SecondsSince1970 = 0 + minusRefCount: Natural = 0 ): Future[?!void] {.async.} = if cid.isEmpty: return success() @@ -123,14 +122,13 @@ proc updateBlockMetadata*( if currBlockMd =? maybeCurrBlockMd: BlockMetadata( size: currBlockMd.size, - expiry: max(currBlockMd.expiry, minExpiry), refCount: currBlockMd.refCount + plusRefCount - minusRefCount ).some else: raise newException(BlockNotFoundError, "Metadata for block with cid " & $cid & " not found") ) -proc storeBlock*(self: RepoStore, blk: Block, minExpiry: SecondsSince1970): Future[?!StoreResult] {.async.} = +proc storeBlock*(self: RepoStore, blk: Block): Future[?!StoreResult] {.async.} = if blk.isEmpty: return success(StoreResult(kind: AlreadyInStore)) @@ -148,7 +146,7 @@ proc storeBlock*(self: RepoStore, blk: Block, minExpiry: SecondsSince1970): Futu if currMd =? maybeCurrMd: if currMd.size == blk.data.len.NBytes: - md = BlockMetadata(size: currMd.size, expiry: max(currMd.expiry, minExpiry), refCount: currMd.refCount) + md = BlockMetadata(size: currMd.size, refCount: currMd.refCount) res = StoreResult(kind: AlreadyInStore) # making sure that the block acutally is stored in the repoDs @@ -162,7 +160,7 @@ proc storeBlock*(self: RepoStore, blk: Block, minExpiry: SecondsSince1970): Futu else: raise newException(CatchableError, "Repo already stores a block with the same cid but with a different size, cid: " & $blk.cid) else: - md = BlockMetadata(size: blk.data.len.NBytes, expiry: minExpiry, refCount: 0) + md = BlockMetadata(size: blk.data.len.NBytes, refCount: 0) res = StoreResult(kind: Stored, used: blk.data.len.NBytes) if err =? (await self.repoDs.put(blkKey, blk.data)).errorOption: raise err @@ -170,7 +168,7 @@ proc storeBlock*(self: RepoStore, blk: Block, minExpiry: SecondsSince1970): Futu (md.some, res) ) -proc tryDeleteBlock*(self: RepoStore, cid: Cid, expiryLimit = SecondsSince1970.low): Future[?!DeleteResult] {.async.} = +proc tryDeleteBlock*(self: RepoStore, cid: Cid): Future[?!DeleteResult] {.async.} = if cid.isEmpty: return success(DeleteResult(kind: InUse)) @@ -187,7 +185,7 @@ proc tryDeleteBlock*(self: RepoStore, cid: Cid, expiryLimit = SecondsSince1970.l res: DeleteResult if currMd =? maybeCurrMd: - if currMd.refCount == 0 or currMd.expiry < expiryLimit: + if currMd.refCount == 0: maybeMeta = BlockMetadata.none res = DeleteResult(kind: Deleted, released: currMd.size) diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim index 7d629131..6b3f38e9 100644 --- a/codex/stores/repostore/store.nim +++ b/codex/stores/repostore/store.nim @@ -85,35 +85,6 @@ method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] = else: self.getBlock(address.cid) -method ensureExpiry*( - self: RepoStore, - cid: Cid, - expiry: SecondsSince1970 -): Future[?!void] {.async.} = - ## Ensure that block's associated expiry is at least given timestamp - ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact - ## - - if expiry <= 0: - return failure(newException(ValueError, "Expiry timestamp must be larger then zero")) - - await self.updateBlockMetadata(cid, minExpiry = expiry) - -method ensureExpiry*( - self: RepoStore, - treeCid: Cid, - index: Natural, - expiry: SecondsSince1970 -): Future[?!void] {.async.} = - ## Ensure that block's associated expiry is at least given timestamp - ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact - ## - - without leafMd =? await self.getLeafMetadata(treeCid, index), err: - return failure(err) - - await self.ensureExpiry(leafMd.blkCid, expiry) - method putCidAndProof*( self: RepoStore, treeCid: Cid, @@ -166,17 +137,14 @@ method getCid*( method putBlock*( self: RepoStore, - blk: Block, - ttl = Duration.none): Future[?!void] {.async.} = + blk: Block): Future[?!void] {.async.} = ## Put a block to the blockstore ## logScope: cid = blk.cid - let expiry = self.clock.now() + (ttl |? self.blockTtl).seconds - - without res =? await self.storeBlock(blk, expiry), err: + without res =? await self.storeBlock(blk), err: return failure(err) if res.kind == Stored: @@ -195,7 +163,7 @@ method putBlock*( return success() method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = - ## Delete a block from the blockstore when block refCount is 0 or block is expired + ## Delete a block from the blockstore when block refCount is 0 ## logScope: @@ -203,7 +171,7 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = trace "Attempting to delete a block" - without res =? await self.tryDeleteBlock(cid, self.clock.now()), err: + without res =? await self.tryDeleteBlock(cid), err: return failure(err) if res.kind == Deleted: @@ -214,7 +182,7 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = if err =? (await self.updateQuotaUsage(minusUsed = res.released)).errorOption: return failure(err) elif res.kind == InUse: - trace "Block in use, refCount > 0 and not expired" + trace "Block in use, refCount > 0" else: trace "Block not found in store" @@ -296,42 +264,6 @@ method listBlocks*( iter.next = next return success iter -proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query = - let queryKey = ? createBlockExpirationMetadataQueryKey() - success Query.init(queryKey, offset = offset, limit = maxNumber) - -method getBlockExpirations*( - self: RepoStore, - maxNumber: int, - offset: int): Future[?!AsyncIter[BlockExpiration]] {.async, base.} = - ## Get iterator with block expirations - ## - - without beQuery =? createBlockExpirationQuery(maxNumber, offset), err: - error "Unable to format block expirations query", err = err.msg - return failure(err) - - without queryIter =? await query[BlockMetadata](self.metaDs, beQuery), err: - error "Unable to execute block expirations query", err = err.msg - return failure(err) - - without asyncQueryIter =? await queryIter.toAsyncIter(), err: - error "Unable to convert QueryIter to AsyncIter", err = err.msg - return failure(err) - - let - filteredIter = await asyncQueryIter.filterSuccess() - blockExpIter = await mapFilter[KeyVal[BlockMetadata], BlockExpiration](filteredIter, - proc (kv: KeyVal[BlockMetadata]): Future[?BlockExpiration] {.async.} = - without cid =? Cid.init(kv.key.value).mapFailure, err: - error "Failed decoding cid", err = err.msg - return BlockExpiration.none - - BlockExpiration(cid: cid, expiry: kv.value.expiry).some - ) - - success(blockExpIter) - method close*(self: RepoStore): Future[void] {.async.} = ## Close the blockstore, cleaning up resources managed by it. ## For some implementations this may be a no-op diff --git a/codex/stores/repostore/types.nim b/codex/stores/repostore/types.nim index 4338e63a..2910e2e0 100644 --- a/codex/stores/repostore/types.nim +++ b/codex/stores/repostore/types.nim @@ -30,11 +30,9 @@ type postFixLen*: int repoDs*: Datastore metaDs*: TypedDatastore - clock*: Clock quotaMaxBytes*: NBytes quotaUsage*: QuotaUsage totalBlocks*: Natural - blockTtl*: Duration started*: bool QuotaUsage* {.serialize.} = object @@ -42,7 +40,6 @@ type reserved*: NBytes BlockMetadata* {.serialize.} = object - expiry*: SecondsSince1970 size*: NBytes refCount*: Natural @@ -50,13 +47,9 @@ type blkCid*: Cid proof*: CodexProof - BlockExpiration* {.serialize.} = object - cid*: Cid - expiry*: SecondsSince1970 - DeleteResultKind* {.serialize.} = enum Deleted = 0, # block removed from store - InUse = 1, # block not removed, refCount > 0 and not expired + InUse = 1, # block not removed, refCount > 0 NotFound = 2 # block not found in store DeleteResult* {.serialize.} = object @@ -90,18 +83,14 @@ func new*( T: type RepoStore, repoDs: Datastore, metaDs: Datastore, - clock: Clock = SystemClock.new(), postFixLen = 2, - quotaMaxBytes = DefaultQuotaBytes, - blockTtl = DefaultBlockTtl + quotaMaxBytes = DefaultQuotaBytes ): RepoStore = ## Create new instance of a RepoStore ## RepoStore( repoDs: repoDs, metaDs: TypedDatastore.init(metaDs), - clock: clock, postFixLen: postFixLen, - quotaMaxBytes: quotaMaxBytes, - blockTtl: blockTtl + quotaMaxBytes: quotaMaxBytes ) diff --git a/tests/codex/helpers.nim b/tests/codex/helpers.nim index 89aeafd1..9df0d670 100644 --- a/tests/codex/helpers.nim +++ b/tests/codex/helpers.nim @@ -10,6 +10,7 @@ import pkg/codex/merkletree import pkg/codex/blockexchange import pkg/codex/rng import pkg/codex/utils +import pkg/codex/units import ./helpers/nodeutils import ./helpers/randomchunker @@ -106,6 +107,10 @@ proc storeDataGetManifest*(store: BlockStore, chunker: Chunker): Future[Manifest return manifest +proc storeDataGetManifest*(store: BlockStore, blocksCount = 10): Future[Manifest] {.async.} = + let chunker = RandomChunker.new(rng = Rng.instance(), size = blocksCount.KiBs, chunkSize = 1.KiBs) + await storeDataGetManifest(store, chunker) + proc makeRandomBlocks*( datasetSize: int, blockSize: NBytes): Future[seq[Block]] {.async.} = diff --git a/tests/codex/helpers/mockrepostore.nim b/tests/codex/helpers/mockrepostore.nim index 86f881e0..971a72b2 100644 --- a/tests/codex/helpers/mockrepostore.nim +++ b/tests/codex/helpers/mockrepostore.nim @@ -23,7 +23,6 @@ type getBeMaxNumber*: int getBeOffset*: int - testBlockExpirations*: seq[BlockExpiration] getBlockExpirationsThrows*: bool method delBlock*(self: MockRepoStore, cid: Cid): Future[?!void] {.async.} = diff --git a/tests/codex/node/helpers.nim b/tests/codex/node/helpers.nim index 498ea45b..39079ef4 100644 --- a/tests/codex/node/helpers.nim +++ b/tests/codex/node/helpers.nim @@ -75,6 +75,7 @@ template setupAndTearDown*() {.dirty.} = localStore: RepoStore localStoreRepoDs: DataStore localStoreMetaDs: DataStore + maintenance: DatasetMaintainer engine: BlockExcEngine store: NetworkStore node: CodexNodeRef @@ -99,8 +100,9 @@ template setupAndTearDown*() {.dirty.} = clock = SystemClock.new() localStoreMetaDs = metaTmp.newDb() localStoreRepoDs = repoTmp.newDb() - localStore = RepoStore.new(localStoreRepoDs, localStoreMetaDs, clock = clock) + localStore = RepoStore.new(localStoreRepoDs, localStoreMetaDs) await localStore.start() + maintenance = DatasetMaintainer.new(localStore, localStoreMetaDs) blockDiscovery = Discovery.new( switch.peerInfo.privateKey, @@ -115,6 +117,7 @@ template setupAndTearDown*() {.dirty.} = node = CodexNodeRef.new( switch = switch, networkStore = store, + maintenance = maintenance, engine = engine, prover = Prover.none, discovery = blockDiscovery, diff --git a/tests/codex/node/testcontracts.nim b/tests/codex/node/testcontracts.nim index 49557f2c..8db047c1 100644 --- a/tests/codex/node/testcontracts.nim +++ b/tests/codex/node/testcontracts.nim @@ -101,23 +101,6 @@ asyncchecksuite "Test Node - Host contracts": test "onExpiryUpdate callback is set": check sales.onExpiryUpdate.isSome - test "onExpiryUpdate callback": - let - # The blocks have set default TTL, so in order to update it we have to have larger TTL - expectedExpiry: SecondsSince1970 = clock.now + DefaultBlockTtl.seconds + 11123 - expiryUpdateCallback = !sales.onExpiryUpdate - - (await expiryUpdateCallback(manifestCidStr, expectedExpiry)).tryGet() - - for index in 0..