diff --git a/codex/stores/repostore/operations.nim b/codex/stores/repostore/operations.nim index dcacbd62..125741e1 100644 --- a/codex/stores/repostore/operations.nim +++ b/codex/stores/repostore/operations.nim @@ -57,6 +57,17 @@ proc putLeafMetadata*( (md.some, res), ) +proc delLeafMetadata*( + self: RepoStore, treeCid: Cid, index: Natural +): Future[?!void] {.async.} = + without key =? createBlockCidAndProofMetadataKey(treeCid, index), err: + return failure(err) + + if err =? (await self.metaDs.delete(key)).errorOption: + return failure(err) + + success() + proc getLeafMetadata*( self: RepoStore, treeCid: Cid, index: Natural ): Future[?!LeafMetadata] {.async.} = @@ -205,9 +216,6 @@ proc storeBlock*( proc tryDeleteBlock*( self: RepoStore, cid: Cid, expiryLimit = SecondsSince1970.low ): Future[?!DeleteResult] {.async.} = - if cid.isEmpty: - return success(DeleteResult(kind: InUse)) - without metaKey =? createBlockExpirationMetadataKey(cid), err: return failure(err) diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim index 2b14d6b7..d7305107 100644 --- a/codex/stores/repostore/store.nim +++ b/codex/stores/repostore/store.nim @@ -186,13 +186,13 @@ method putBlock*( return success() -method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = - ## Delete a block from the blockstore when block refCount is 0 or block is expired - ## - +proc delBlockInternal(self: RepoStore, cid: Cid): Future[?!DeleteResultKind] {.async.} = logScope: cid = cid + if cid.isEmpty: + return success(Deleted) + trace "Attempting to delete a block" without res =? await self.tryDeleteBlock(cid, self.clock.now()), err: @@ -205,12 +205,28 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = if err =? (await self.updateQuotaUsage(minusUsed = res.released)).errorOption: return failure(err) - elif res.kind == InUse: - trace "Block in use, refCount > 0 and not expired" - else: - trace "Block not found in store" - return success() + success(res.kind) + +method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = + ## Delete a block from the blockstore when block refCount is 0 or block is expired + ## + + logScope: + cid = cid + + without outcome =? await self.delBlockInternal(cid), err: + return failure(err) + + case outcome + of InUse: + failure("Directly deleting a block that is part of a dataset is not allowed.") + of NotFound: + trace "Block not found, ignoring" + success() + of Deleted: + trace "Block already deleted" + success() method delBlock*( self: RepoStore, treeCid: Cid, index: Natural @@ -221,12 +237,19 @@ method delBlock*( else: return failure(err) + if err =? (await self.delLeafMetadata(treeCid, index)).errorOption: + error "Failed to delete leaf metadata, block will remain on disk.", err = err.msg + return failure(err) + if err =? (await self.updateBlockMetadata(leafMd.blkCid, minusRefCount = 1)).errorOption: if not (err of BlockNotFoundError): return failure(err) - await self.delBlock(leafMd.blkCid) # safe delete, only if refCount == 0 + without _ =? await self.delBlockInternal(leafMd.blkCid), err: + return failure(err) + + success() method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} = ## Check if the block exists in the blockstore @@ -295,6 +318,18 @@ proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query = let queryKey = ?createBlockExpirationMetadataQueryKey() success Query.init(queryKey, offset = offset, limit = maxNumber) +proc blockRefCount*(self: RepoStore, cid: Cid): Future[?!Natural] {.async.} = + ## Returns the reference count for a block. If the count is zero; + ## this means the block is eligible for garbage collection. + ## + without key =? createBlockExpirationMetadataKey(cid), err: + return failure(err) + + without md =? await get[BlockMetadata](self.metaDs, key), err: + return failure(err) + + return success(md.refCount) + method getBlockExpirations*( self: RepoStore, maxNumber: int, offset: int ): Future[?!AsyncIter[BlockExpiration]] {.async, base.} = diff --git a/tests/codex/examples.nim b/tests/codex/examples.nim index 6f15182f..22a411c2 100644 --- a/tests/codex/examples.nim +++ b/tests/codex/examples.nim @@ -37,8 +37,8 @@ proc example*(_: type SignedState): SignedState = proc example*(_: type Pricing): Pricing = Pricing(address: EthAddress.example, price: uint32.rand.u256) -proc example*(_: type bt.Block): bt.Block = - let length = rand(4096) +proc example*(_: type bt.Block, size: int = 4096): bt.Block = + let length = rand(size) let bytes = newSeqWith(length, rand(uint8)) bt.Block.new(bytes).tryGet() diff --git a/tests/codex/stores/testrepostore.nim b/tests/codex/stores/testrepostore.nim index dda4ed82..0279b56f 100644 --- a/tests/codex/stores/testrepostore.nim +++ b/tests/codex/stores/testrepostore.nim @@ -12,9 +12,11 @@ import pkg/datastore import pkg/codex/stores/cachestore import pkg/codex/chunker import pkg/codex/stores +import pkg/codex/stores/repostore/operations import pkg/codex/blocktype as bt import pkg/codex/clock import pkg/codex/utils/asynciter +import pkg/codex/merkletree/codex import ../../asynctest import ../helpers @@ -354,6 +356,119 @@ asyncchecksuite "RepoStore": check has.isOk check has.get + test "should set the reference count for orphan blocks to 0": + let blk = Block.example(size = 200) + (await repo.putBlock(blk)).tryGet() + check (await repo.blockRefCount(blk.cid)).tryGet() == 0.Natural + + test "should not allow non-orphan blocks to be deleted directly": + let + repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = + 1000'nb) + dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb) + blk = dataset[0] + (manifest, tree) = makeManifestAndTree(dataset).tryGet() + treeCid = tree.rootCid.tryGet() + proof = tree.getProof(0).tryGet() + + (await repo.putBlock(blk)).tryGet() + (await repo.putCidAndProof(treeCid, 0, blk.cid, proof)).tryGet() + + let err = (await repo.delBlock(blk.cid)).error() + check err.msg == + "Directly deleting a block that is part of a dataset is not allowed." + + test "should allow non-orphan blocks to be deleted by dataset reference": + let + repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = + 1000'nb) + dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb) + blk = dataset[0] + (manifest, tree) = makeManifestAndTree(dataset).tryGet() + treeCid = tree.rootCid.tryGet() + proof = tree.getProof(0).tryGet() + + (await repo.putBlock(blk)).tryGet() + (await repo.putCidAndProof(treeCid, 0, blk.cid, proof)).tryGet() + + (await repo.delBlock(treeCid, 0.Natural)).tryGet() + check not (await blk.cid in repo) + + test "should not delete a non-orphan block until it is deleted from all parent datasets": + let + repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = + 1000'nb) + blockPool = await makeRandomBlocks(datasetSize = 768, blockSize = 256'nb) + + let + dataset1 = @[blockPool[0], blockPool[1]] + dataset2 = @[blockPool[1], blockPool[2]] + + let sharedBlock = blockPool[1] + + let + (manifest1, tree1) = makeManifestAndTree(dataset1).tryGet() + treeCid1 = tree1.rootCid.tryGet() + (manifest2, tree2) = makeManifestAndTree(dataset2).tryGet() + treeCid2 = tree2.rootCid.tryGet() + + (await repo.putBlock(sharedBlock)).tryGet() + check (await repo.blockRefCount(sharedBlock.cid)).tryGet() == 0.Natural + + let + proof1 = tree1.getProof(1).tryGet() + proof2 = tree2.getProof(0).tryGet() + + (await repo.putCidAndProof(treeCid1, 1, sharedBlock.cid, proof1)).tryGet() + check (await repo.blockRefCount(sharedBlock.cid)).tryGet() == 1.Natural + + (await repo.putCidAndProof(treeCid2, 0, sharedBlock.cid, proof2)).tryGet() + check (await repo.blockRefCount(sharedBlock.cid)).tryGet() == 2.Natural + + (await repo.delBlock(treeCid1, 1.Natural)).tryGet() + check (await repo.blockRefCount(sharedBlock.cid)).tryGet() == 1.Natural + check (await sharedBlock.cid in repo) + + (await repo.delBlock(treeCid2, 0.Natural)).tryGet() + check not (await sharedBlock.cid in repo) + + test "should clear leaf metadata when block is deleted from dataset": + let + repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = + 1000'nb) + dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb) + blk = dataset[0] + (manifest, tree) = makeManifestAndTree(dataset).tryGet() + treeCid = tree.rootCid.tryGet() + proof = tree.getProof(1).tryGet() + + (await repo.putBlock(blk)).tryGet() + (await repo.putCidAndProof(treeCid, 0.Natural, blk.cid, proof)).tryGet() + + discard (await repo.getLeafMetadata(treeCid, 0.Natural)).tryGet() + + (await repo.delBlock(treeCid, 0.Natural)).tryGet() + + let err = (await repo.getLeafMetadata(treeCid, 0.Natural)).error() + check err of BlockNotFoundError + + test "should not fail when reinserting and deleting a previously deleted block (bug #1108)": + let + repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = + 1000'nb) + dataset = await makeRandomBlocks(datasetSize = 512, blockSize = 256'nb) + blk = dataset[0] + (manifest, tree) = makeManifestAndTree(dataset).tryGet() + treeCid = tree.rootCid.tryGet() + proof = tree.getProof(1).tryGet() + + (await repo.putBlock(blk)).tryGet() + (await repo.putCidAndProof(treeCid, 0, blk.cid, proof)).tryGet() + + (await repo.delBlock(treeCid, 0.Natural)).tryGet() + (await repo.putBlock(blk)).tryGet() + (await repo.delBlock(treeCid, 0.Natural)).tryGet() + commonBlockStoreTests( "RepoStore Sql backend", proc(): BlockStore =