feat: ensure block expiry (#597)
* feat: update block expiry * chore: feedback implementation * chore: feedback implementation * chore: feedback implementation
This commit is contained in:
parent
11cd2c46ad
commit
c0bec2f899
|
@ -16,6 +16,7 @@ import pkg/libp2p
|
||||||
import pkg/questionable
|
import pkg/questionable
|
||||||
import pkg/questionable/results
|
import pkg/questionable/results
|
||||||
|
|
||||||
|
import ../clock
|
||||||
import ../blocktype
|
import ../blocktype
|
||||||
|
|
||||||
export blocktype
|
export blocktype
|
||||||
|
@ -54,6 +55,17 @@ method putBlock*(
|
||||||
|
|
||||||
raiseAssert("Not implemented!")
|
raiseAssert("Not implemented!")
|
||||||
|
|
||||||
|
method ensureExpiry*(
|
||||||
|
self: BlockStore,
|
||||||
|
cid: Cid,
|
||||||
|
expiry: SecondsSince1970
|
||||||
|
): Future[?!void] {.base.} =
|
||||||
|
## Ensure that block's assosicated expiry is at least given timestamp
|
||||||
|
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
|
||||||
|
##
|
||||||
|
|
||||||
|
raiseAssert("Not implemented!")
|
||||||
|
|
||||||
method delBlock*(self: BlockStore, cid: Cid): Future[?!void] {.base.} =
|
method delBlock*(self: BlockStore, cid: Cid): Future[?!void] {.base.} =
|
||||||
## Delete a block from the blockstore
|
## Delete a block from the blockstore
|
||||||
##
|
##
|
||||||
|
|
|
@ -25,6 +25,7 @@ import ../units
|
||||||
import ../chunker
|
import ../chunker
|
||||||
import ../errors
|
import ../errors
|
||||||
import ../manifest
|
import ../manifest
|
||||||
|
import ../clock
|
||||||
|
|
||||||
export blockstore
|
export blockstore
|
||||||
|
|
||||||
|
@ -167,6 +168,16 @@ method putBlock*(
|
||||||
discard self.putBlockSync(blk)
|
discard self.putBlockSync(blk)
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
|
method ensureExpiry*(
|
||||||
|
self: CacheStore,
|
||||||
|
cid: Cid,
|
||||||
|
expiry: SecondsSince1970
|
||||||
|
): Future[?!void] {.async.} =
|
||||||
|
## Updates block's assosicated TTL in store - not applicable for CacheStore
|
||||||
|
##
|
||||||
|
|
||||||
|
discard # CacheStore does not have notion of TTL
|
||||||
|
|
||||||
method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} =
|
method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} =
|
||||||
## Delete a block from the blockstore
|
## Delete a block from the blockstore
|
||||||
##
|
##
|
||||||
|
|
|
@ -17,6 +17,7 @@ import pkg/libp2p
|
||||||
|
|
||||||
import ../blocktype as bt
|
import ../blocktype as bt
|
||||||
import ../utils/asyncheapqueue
|
import ../utils/asyncheapqueue
|
||||||
|
import ../clock
|
||||||
|
|
||||||
import ./blockstore
|
import ./blockstore
|
||||||
import ../blockexchange
|
import ../blockexchange
|
||||||
|
@ -63,6 +64,22 @@ method putBlock*(
|
||||||
await self.engine.resolveBlocks(@[blk])
|
await self.engine.resolveBlocks(@[blk])
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
|
method ensureExpiry*(
|
||||||
|
self: NetworkStore,
|
||||||
|
cid: Cid,
|
||||||
|
expiry: SecondsSince1970
|
||||||
|
): Future[?!void] {.async.} =
|
||||||
|
## Ensure that block's assosicated expiry is at least given timestamp
|
||||||
|
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
|
||||||
|
##
|
||||||
|
|
||||||
|
if (await self.localStore.hasBlock(cid)).tryGet:
|
||||||
|
return await self.localStore.ensureExpiry(cid, expiry)
|
||||||
|
else:
|
||||||
|
trace "Updating expiry - block not in local store", cid
|
||||||
|
|
||||||
|
return success()
|
||||||
|
|
||||||
method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] =
|
method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] =
|
||||||
## Delete a block from the blockstore
|
## Delete a block from the blockstore
|
||||||
##
|
##
|
||||||
|
|
|
@ -107,23 +107,63 @@ method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
|
||||||
trace "Got block for cid", cid
|
trace "Got block for cid", cid
|
||||||
return Block.new(cid, data)
|
return Block.new(cid, data)
|
||||||
|
|
||||||
proc getBlockExpirationTimestamp(self: RepoStore, ttl: ?Duration): SecondsSince1970 =
|
|
||||||
let duration = ttl |? self.blockTtl
|
|
||||||
self.clock.now() + duration.seconds
|
|
||||||
|
|
||||||
proc getBlockExpirationEntry(
|
proc getBlockExpirationEntry(
|
||||||
self: RepoStore,
|
self: RepoStore,
|
||||||
batch: var seq[BatchEntry],
|
|
||||||
cid: Cid,
|
cid: Cid,
|
||||||
ttl: ?Duration): ?!BatchEntry =
|
ttl: SecondsSince1970): ?!BatchEntry =
|
||||||
## Get an expiration entry for a batch
|
## Get an expiration entry for a batch with timestamp
|
||||||
##
|
##
|
||||||
|
|
||||||
without key =? createBlockExpirationMetadataKey(cid), err:
|
without key =? createBlockExpirationMetadataKey(cid), err:
|
||||||
return failure(err)
|
return failure(err)
|
||||||
|
|
||||||
let value = self.getBlockExpirationTimestamp(ttl).toBytes
|
return success((key, ttl.toBytes))
|
||||||
return success((key, value))
|
|
||||||
|
proc getBlockExpirationEntry(
|
||||||
|
self: RepoStore,
|
||||||
|
cid: Cid,
|
||||||
|
ttl: ?Duration): ?!BatchEntry =
|
||||||
|
## Get an expiration entry for a batch for duration since "now"
|
||||||
|
##
|
||||||
|
|
||||||
|
let duration = ttl |? self.blockTtl
|
||||||
|
self.getBlockExpirationEntry(cid, self.clock.now() + duration.seconds)
|
||||||
|
|
||||||
|
method ensureExpiry*(
|
||||||
|
self: RepoStore,
|
||||||
|
cid: Cid,
|
||||||
|
expiry: SecondsSince1970
|
||||||
|
): Future[?!void] {.async.} =
|
||||||
|
## Ensure that block's assosicated expiry is at least given timestamp
|
||||||
|
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
|
||||||
|
##
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
cid = cid
|
||||||
|
|
||||||
|
if expiry <= 0:
|
||||||
|
return failure(newException(ValueError, "Expiry timestamp must be larger then zero"))
|
||||||
|
|
||||||
|
without expiryKey =? createBlockExpirationMetadataKey(cid), err:
|
||||||
|
return failure(err)
|
||||||
|
|
||||||
|
without currentExpiry =? await self.metaDs.get(expiryKey), err:
|
||||||
|
if err of DatastoreKeyNotFound:
|
||||||
|
error "No current expiry exists for the block"
|
||||||
|
return failure(newException(BlockNotFoundError, err.msg))
|
||||||
|
else:
|
||||||
|
error "Could not read datastore key", err = err.msg
|
||||||
|
return failure(err)
|
||||||
|
|
||||||
|
if expiry <= currentExpiry.toSecondsSince1970:
|
||||||
|
trace "Current expiry is larger then the specified one, no action needed"
|
||||||
|
return success()
|
||||||
|
|
||||||
|
if err =? (await self.metaDs.put(expiryKey, expiry.toBytes)).errorOption:
|
||||||
|
trace "Error updating expiration metadata entry", err = err.msg
|
||||||
|
return failure(err)
|
||||||
|
|
||||||
|
return success()
|
||||||
|
|
||||||
proc persistTotalBlocksCount(self: RepoStore): Future[?!void] {.async.} =
|
proc persistTotalBlocksCount(self: RepoStore): Future[?!void] {.async.} =
|
||||||
if err =? (await self.metaDs.put(
|
if err =? (await self.metaDs.put(
|
||||||
|
@ -175,7 +215,7 @@ method putBlock*(
|
||||||
trace "Updating quota", used
|
trace "Updating quota", used
|
||||||
batch.add((QuotaUsedKey, @(used.uint64.toBytesBE)))
|
batch.add((QuotaUsedKey, @(used.uint64.toBytesBE)))
|
||||||
|
|
||||||
without blockExpEntry =? self.getBlockExpirationEntry(batch, blk.cid, ttl), err:
|
without blockExpEntry =? self.getBlockExpirationEntry(blk.cid, ttl), err:
|
||||||
trace "Unable to create block expiration metadata key", err = err.msg
|
trace "Unable to create block expiration metadata key", err = err.msg
|
||||||
return failure(err)
|
return failure(err)
|
||||||
batch.add(blockExpEntry)
|
batch.add(blockExpEntry)
|
||||||
|
@ -223,7 +263,6 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
|
||||||
|
|
||||||
trace "Deleting block"
|
trace "Deleting block"
|
||||||
|
|
||||||
|
|
||||||
if cid.isEmpty:
|
if cid.isEmpty:
|
||||||
trace "Empty block, ignoring"
|
trace "Empty block, ignoring"
|
||||||
return success()
|
return success()
|
||||||
|
|
|
@ -231,6 +231,87 @@ asyncchecksuite "RepoStore":
|
||||||
!response[0].key == expectedKey
|
!response[0].key == expectedKey
|
||||||
response[0].data == expectedExpiration.toBytes
|
response[0].data == expectedExpiration.toBytes
|
||||||
|
|
||||||
|
test "Should refuse update expiry with negative timestamp":
|
||||||
|
let
|
||||||
|
blk = createTestBlock(100)
|
||||||
|
expectedExpiration: SecondsSince1970 = now + 10
|
||||||
|
expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet
|
||||||
|
|
||||||
|
(await repo.putBlock(blk, some 10.seconds)).tryGet
|
||||||
|
|
||||||
|
var response = await queryMetaDs(expectedKey)
|
||||||
|
|
||||||
|
check:
|
||||||
|
response.len == 1
|
||||||
|
!response[0].key == expectedKey
|
||||||
|
response[0].data == expectedExpiration.toBytes
|
||||||
|
|
||||||
|
expect ValueError:
|
||||||
|
(await repo.ensureExpiry(blk.cid, -1)).tryGet
|
||||||
|
|
||||||
|
expect ValueError:
|
||||||
|
(await repo.ensureExpiry(blk.cid, 0)).tryGet
|
||||||
|
|
||||||
|
test "Should fail when updating expiry of non-existing block":
|
||||||
|
let
|
||||||
|
blk = createTestBlock(100)
|
||||||
|
|
||||||
|
expect BlockNotFoundError:
|
||||||
|
(await repo.ensureExpiry(blk.cid, 10)).tryGet
|
||||||
|
|
||||||
|
test "Should update block expiration timestamp when new expiration is farther":
|
||||||
|
let
|
||||||
|
duration = 10
|
||||||
|
blk = createTestBlock(100)
|
||||||
|
expectedExpiration: SecondsSince1970 = now + duration
|
||||||
|
updatedExpectedExpiration: SecondsSince1970 = expectedExpiration + 10
|
||||||
|
expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet
|
||||||
|
|
||||||
|
(await repo.putBlock(blk, some duration.seconds)).tryGet
|
||||||
|
|
||||||
|
var response = await queryMetaDs(expectedKey)
|
||||||
|
|
||||||
|
check:
|
||||||
|
response.len == 1
|
||||||
|
!response[0].key == expectedKey
|
||||||
|
response[0].data == expectedExpiration.toBytes
|
||||||
|
|
||||||
|
(await repo.ensureExpiry(blk.cid, updatedExpectedExpiration)).tryGet
|
||||||
|
|
||||||
|
response = await queryMetaDs(expectedKey)
|
||||||
|
|
||||||
|
check:
|
||||||
|
response.len == 1
|
||||||
|
!response[0].key == expectedKey
|
||||||
|
response[0].data == updatedExpectedExpiration.toBytes
|
||||||
|
|
||||||
|
test "Should not update block expiration timestamp when current expiration is farther then new one":
|
||||||
|
let
|
||||||
|
duration = 10
|
||||||
|
blk = createTestBlock(100)
|
||||||
|
expectedExpiration: SecondsSince1970 = now + duration
|
||||||
|
updatedExpectedExpiration: SecondsSince1970 = expectedExpiration - 10
|
||||||
|
expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet
|
||||||
|
|
||||||
|
|
||||||
|
(await repo.putBlock(blk, some duration.seconds)).tryGet
|
||||||
|
|
||||||
|
var response = await queryMetaDs(expectedKey)
|
||||||
|
|
||||||
|
check:
|
||||||
|
response.len == 1
|
||||||
|
!response[0].key == expectedKey
|
||||||
|
response[0].data == expectedExpiration.toBytes
|
||||||
|
|
||||||
|
(await repo.ensureExpiry(blk.cid, updatedExpectedExpiration)).tryGet
|
||||||
|
|
||||||
|
response = await queryMetaDs(expectedKey)
|
||||||
|
|
||||||
|
check:
|
||||||
|
response.len == 1
|
||||||
|
!response[0].key == expectedKey
|
||||||
|
response[0].data == expectedExpiration.toBytes
|
||||||
|
|
||||||
test "delBlock should remove expiration metadata":
|
test "delBlock should remove expiration metadata":
|
||||||
let
|
let
|
||||||
blk = createTestBlock(100)
|
blk = createTestBlock(100)
|
||||||
|
|
Loading…
Reference in New Issue