From c0bec2f899f184c3af6a1180d742bb4ba704fc1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Uhl=C3=AD=C5=99?= Date: Mon, 6 Nov 2023 09:10:30 +0100 Subject: [PATCH] feat: ensure block expiry (#597) * feat: update block expiry * chore: feedback implementation * chore: feedback implementation * chore: feedback implementation --- codex/stores/blockstore.nim | 12 +++++ codex/stores/cachestore.nim | 11 ++++ codex/stores/networkstore.nim | 21 +++++++- codex/stores/repostore.nim | 61 +++++++++++++++++---- tests/codex/stores/testrepostore.nim | 81 ++++++++++++++++++++++++++++ 5 files changed, 173 insertions(+), 13 deletions(-) diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index bba95eac..49f97cb2 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -16,6 +16,7 @@ import pkg/libp2p import pkg/questionable import pkg/questionable/results +import ../clock import ../blocktype export blocktype @@ -54,6 +55,17 @@ method putBlock*( raiseAssert("Not implemented!") +method ensureExpiry*( + self: BlockStore, + cid: Cid, + expiry: SecondsSince1970 +): Future[?!void] {.base.} = + ## Ensure that block's assosicated expiry is at least given timestamp + ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact + ## + + raiseAssert("Not implemented!") + method delBlock*(self: BlockStore, cid: Cid): Future[?!void] {.base.} = ## Delete a block from the blockstore ## diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index a2fd7368..0a5acbc8 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -25,6 +25,7 @@ import ../units import ../chunker import ../errors import ../manifest +import ../clock export blockstore @@ -167,6 +168,16 @@ method putBlock*( discard self.putBlockSync(blk) return success() +method ensureExpiry*( + self: CacheStore, + cid: Cid, + expiry: SecondsSince1970 +): Future[?!void] {.async.} = + ## Updates block's assosicated TTL in store - not applicable for CacheStore + ## + + discard # CacheStore does not have notion of TTL + method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} = ## Delete a block from the blockstore ## diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index 9e815caa..10eb6933 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -17,6 +17,7 @@ import pkg/libp2p import ../blocktype as bt import ../utils/asyncheapqueue +import ../clock import ./blockstore import ../blockexchange @@ -63,6 +64,22 @@ method putBlock*( await self.engine.resolveBlocks(@[blk]) return success() +method ensureExpiry*( + self: NetworkStore, + cid: Cid, + expiry: SecondsSince1970 +): Future[?!void] {.async.} = + ## Ensure that block's assosicated expiry is at least given timestamp + ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact + ## + + if (await self.localStore.hasBlock(cid)).tryGet: + return await self.localStore.ensureExpiry(cid, expiry) + else: + trace "Updating expiry - block not in local store", cid + + return success() + method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] = ## Delete a block from the blockstore ## @@ -91,8 +108,8 @@ proc new*( engine: BlockExcEngine, localStore: BlockStore ): NetworkStore = - ## Create new instance of a NetworkStore - ## + ## Create new instance of a NetworkStore + ## NetworkStore( localStore: localStore, engine: engine) diff --git a/codex/stores/repostore.nim b/codex/stores/repostore.nim index 68789c0e..922e955c 100644 --- a/codex/stores/repostore.nim +++ b/codex/stores/repostore.nim @@ -107,23 +107,63 @@ method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} = trace "Got block for cid", cid return Block.new(cid, data) -proc getBlockExpirationTimestamp(self: RepoStore, ttl: ?Duration): SecondsSince1970 = - let duration = ttl |? self.blockTtl - self.clock.now() + duration.seconds - proc getBlockExpirationEntry( self: RepoStore, - batch: var seq[BatchEntry], cid: Cid, - ttl: ?Duration): ?!BatchEntry = - ## Get an expiration entry for a batch + ttl: SecondsSince1970): ?!BatchEntry = + ## Get an expiration entry for a batch with timestamp ## without key =? createBlockExpirationMetadataKey(cid), err: return failure(err) - let value = self.getBlockExpirationTimestamp(ttl).toBytes - return success((key, value)) + return success((key, ttl.toBytes)) + +proc getBlockExpirationEntry( + self: RepoStore, + cid: Cid, + ttl: ?Duration): ?!BatchEntry = + ## Get an expiration entry for a batch for duration since "now" + ## + + let duration = ttl |? self.blockTtl + self.getBlockExpirationEntry(cid, self.clock.now() + duration.seconds) + +method ensureExpiry*( + self: RepoStore, + cid: Cid, + expiry: SecondsSince1970 +): Future[?!void] {.async.} = + ## Ensure that block's assosicated expiry is at least given timestamp + ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact + ## + + logScope: + cid = cid + + if expiry <= 0: + return failure(newException(ValueError, "Expiry timestamp must be larger then zero")) + + without expiryKey =? createBlockExpirationMetadataKey(cid), err: + return failure(err) + + without currentExpiry =? await self.metaDs.get(expiryKey), err: + if err of DatastoreKeyNotFound: + error "No current expiry exists for the block" + return failure(newException(BlockNotFoundError, err.msg)) + else: + error "Could not read datastore key", err = err.msg + return failure(err) + + if expiry <= currentExpiry.toSecondsSince1970: + trace "Current expiry is larger then the specified one, no action needed" + return success() + + if err =? (await self.metaDs.put(expiryKey, expiry.toBytes)).errorOption: + trace "Error updating expiration metadata entry", err = err.msg + return failure(err) + + return success() proc persistTotalBlocksCount(self: RepoStore): Future[?!void] {.async.} = if err =? (await self.metaDs.put( @@ -175,7 +215,7 @@ method putBlock*( trace "Updating quota", used batch.add((QuotaUsedKey, @(used.uint64.toBytesBE))) - without blockExpEntry =? self.getBlockExpirationEntry(batch, blk.cid, ttl), err: + without blockExpEntry =? self.getBlockExpirationEntry(blk.cid, ttl), err: trace "Unable to create block expiration metadata key", err = err.msg return failure(err) batch.add(blockExpEntry) @@ -223,7 +263,6 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = trace "Deleting block" - if cid.isEmpty: trace "Empty block, ignoring" return success() diff --git a/tests/codex/stores/testrepostore.nim b/tests/codex/stores/testrepostore.nim index 3124c650..2ad34243 100644 --- a/tests/codex/stores/testrepostore.nim +++ b/tests/codex/stores/testrepostore.nim @@ -231,6 +231,87 @@ asyncchecksuite "RepoStore": !response[0].key == expectedKey response[0].data == expectedExpiration.toBytes + test "Should refuse update expiry with negative timestamp": + let + blk = createTestBlock(100) + expectedExpiration: SecondsSince1970 = now + 10 + expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet + + (await repo.putBlock(blk, some 10.seconds)).tryGet + + var response = await queryMetaDs(expectedKey) + + check: + response.len == 1 + !response[0].key == expectedKey + response[0].data == expectedExpiration.toBytes + + expect ValueError: + (await repo.ensureExpiry(blk.cid, -1)).tryGet + + expect ValueError: + (await repo.ensureExpiry(blk.cid, 0)).tryGet + + test "Should fail when updating expiry of non-existing block": + let + blk = createTestBlock(100) + + expect BlockNotFoundError: + (await repo.ensureExpiry(blk.cid, 10)).tryGet + + test "Should update block expiration timestamp when new expiration is farther": + let + duration = 10 + blk = createTestBlock(100) + expectedExpiration: SecondsSince1970 = now + duration + updatedExpectedExpiration: SecondsSince1970 = expectedExpiration + 10 + expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet + + (await repo.putBlock(blk, some duration.seconds)).tryGet + + var response = await queryMetaDs(expectedKey) + + check: + response.len == 1 + !response[0].key == expectedKey + response[0].data == expectedExpiration.toBytes + + (await repo.ensureExpiry(blk.cid, updatedExpectedExpiration)).tryGet + + response = await queryMetaDs(expectedKey) + + check: + response.len == 1 + !response[0].key == expectedKey + response[0].data == updatedExpectedExpiration.toBytes + + test "Should not update block expiration timestamp when current expiration is farther then new one": + let + duration = 10 + blk = createTestBlock(100) + expectedExpiration: SecondsSince1970 = now + duration + updatedExpectedExpiration: SecondsSince1970 = expectedExpiration - 10 + expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet + + + (await repo.putBlock(blk, some duration.seconds)).tryGet + + var response = await queryMetaDs(expectedKey) + + check: + response.len == 1 + !response[0].key == expectedKey + response[0].data == expectedExpiration.toBytes + + (await repo.ensureExpiry(blk.cid, updatedExpectedExpiration)).tryGet + + response = await queryMetaDs(expectedKey) + + check: + response.len == 1 + !response[0].key == expectedKey + response[0].data == expectedExpiration.toBytes + test "delBlock should remove expiration metadata": let blk = createTestBlock(100)