mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-14 19:33:10 +00:00
Merge branch 'master' into blockexchange-uses-merkle-tree
This commit is contained in:
commit
8e40cafdb9
@ -16,6 +16,7 @@ import pkg/libp2p
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
||||
import ../clock
|
||||
import ../blocktype
|
||||
import ../merkletree
|
||||
import ../utils
|
||||
@ -76,6 +77,17 @@ method putBlockCidAndProof*(
|
||||
|
||||
raiseAssert("putBlockCidAndProof not implemented!")
|
||||
|
||||
method ensureExpiry*(
|
||||
self: BlockStore,
|
||||
cid: Cid,
|
||||
expiry: SecondsSince1970
|
||||
): Future[?!void] {.base.} =
|
||||
## Ensure that block's assosicated expiry is at least given timestamp
|
||||
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
|
||||
##
|
||||
|
||||
raiseAssert("Not implemented!")
|
||||
|
||||
method delBlock*(self: BlockStore, cid: Cid): Future[?!void] {.base.} =
|
||||
## Delete a block from the blockstore
|
||||
##
|
||||
|
||||
@ -27,6 +27,7 @@ import ../errors
|
||||
import ../manifest
|
||||
import ../merkletree
|
||||
import ../utils
|
||||
import ../clock
|
||||
|
||||
export blockstore
|
||||
|
||||
@ -219,6 +220,16 @@ method putBlockCidAndProof*(
|
||||
self.cidAndProofCache[(treeCid, index)] = (blockCid, proof)
|
||||
success()
|
||||
|
||||
method ensureExpiry*(
|
||||
self: CacheStore,
|
||||
cid: Cid,
|
||||
expiry: SecondsSince1970
|
||||
): Future[?!void] {.async.} =
|
||||
## Updates block's assosicated TTL in store - not applicable for CacheStore
|
||||
##
|
||||
|
||||
discard # CacheStore does not have notion of TTL
|
||||
|
||||
method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} =
|
||||
## Delete a block from the blockstore
|
||||
##
|
||||
|
||||
@ -20,6 +20,7 @@ import pkg/libp2p
|
||||
import ../blocktype
|
||||
import ../utils/asyncheapqueue
|
||||
import ../utils/asynciter
|
||||
import ../clock
|
||||
|
||||
import ./blockstore
|
||||
import ../blockexchange
|
||||
@ -91,6 +92,22 @@ method putBlockCidAndProof*(
|
||||
): Future[?!void] =
|
||||
self.localStore.putBlockCidAndProof(treeCid, index, blockCid, proof)
|
||||
|
||||
method ensureExpiry*(
|
||||
self: NetworkStore,
|
||||
cid: Cid,
|
||||
expiry: SecondsSince1970
|
||||
): Future[?!void] {.async.} =
|
||||
## Ensure that block's assosicated expiry is at least given timestamp
|
||||
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
|
||||
##
|
||||
|
||||
if (await self.localStore.hasBlock(cid)).tryGet:
|
||||
return await self.localStore.ensureExpiry(cid, expiry)
|
||||
else:
|
||||
trace "Updating expiry - block not in local store", cid
|
||||
|
||||
return success()
|
||||
|
||||
method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] =
|
||||
## Delete a block from the blockstore
|
||||
##
|
||||
|
||||
@ -185,23 +185,63 @@ method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] =
|
||||
else:
|
||||
self.getBlock(address.cid)
|
||||
|
||||
proc getBlockExpirationTimestamp(self: RepoStore, ttl: ?Duration): SecondsSince1970 =
|
||||
let duration = ttl |? self.blockTtl
|
||||
self.clock.now() + duration.seconds
|
||||
|
||||
proc getBlockExpirationEntry(
|
||||
self: RepoStore,
|
||||
batch: var seq[BatchEntry],
|
||||
cid: Cid,
|
||||
ttl: ?Duration): ?!BatchEntry =
|
||||
## Get an expiration entry for a batch
|
||||
ttl: SecondsSince1970): ?!BatchEntry =
|
||||
## Get an expiration entry for a batch with timestamp
|
||||
##
|
||||
|
||||
without key =? createBlockExpirationMetadataKey(cid), err:
|
||||
return failure(err)
|
||||
|
||||
let value = self.getBlockExpirationTimestamp(ttl).toBytes
|
||||
return success((key, value))
|
||||
return success((key, ttl.toBytes))
|
||||
|
||||
proc getBlockExpirationEntry(
|
||||
self: RepoStore,
|
||||
cid: Cid,
|
||||
ttl: ?Duration): ?!BatchEntry =
|
||||
## Get an expiration entry for a batch for duration since "now"
|
||||
##
|
||||
|
||||
let duration = ttl |? self.blockTtl
|
||||
self.getBlockExpirationEntry(cid, self.clock.now() + duration.seconds)
|
||||
|
||||
method ensureExpiry*(
|
||||
self: RepoStore,
|
||||
cid: Cid,
|
||||
expiry: SecondsSince1970
|
||||
): Future[?!void] {.async.} =
|
||||
## Ensure that block's assosicated expiry is at least given timestamp
|
||||
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
|
||||
##
|
||||
|
||||
logScope:
|
||||
cid = cid
|
||||
|
||||
if expiry <= 0:
|
||||
return failure(newException(ValueError, "Expiry timestamp must be larger then zero"))
|
||||
|
||||
without expiryKey =? createBlockExpirationMetadataKey(cid), err:
|
||||
return failure(err)
|
||||
|
||||
without currentExpiry =? await self.metaDs.get(expiryKey), err:
|
||||
if err of DatastoreKeyNotFound:
|
||||
error "No current expiry exists for the block"
|
||||
return failure(newException(BlockNotFoundError, err.msg))
|
||||
else:
|
||||
error "Could not read datastore key", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
if expiry <= currentExpiry.toSecondsSince1970:
|
||||
trace "Current expiry is larger then the specified one, no action needed"
|
||||
return success()
|
||||
|
||||
if err =? (await self.metaDs.put(expiryKey, expiry.toBytes)).errorOption:
|
||||
trace "Error updating expiration metadata entry", err = err.msg
|
||||
return failure(err)
|
||||
|
||||
return success()
|
||||
|
||||
proc persistTotalBlocksCount(self: RepoStore): Future[?!void] {.async.} =
|
||||
if err =? (await self.metaDs.put(
|
||||
@ -253,7 +293,7 @@ method putBlock*(
|
||||
trace "Updating quota", used
|
||||
batch.add((QuotaUsedKey, @(used.uint64.toBytesBE)))
|
||||
|
||||
without blockExpEntry =? self.getBlockExpirationEntry(batch, blk.cid, ttl), err:
|
||||
without blockExpEntry =? self.getBlockExpirationEntry(blk.cid, ttl), err:
|
||||
trace "Unable to create block expiration metadata key", err = err.msg
|
||||
return failure(err)
|
||||
batch.add(blockExpEntry)
|
||||
@ -301,7 +341,6 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
|
||||
|
||||
trace "Deleting block"
|
||||
|
||||
|
||||
if cid.isEmpty:
|
||||
trace "Empty block, ignoring"
|
||||
return success()
|
||||
|
||||
@ -232,6 +232,87 @@ asyncchecksuite "RepoStore":
|
||||
!response[0].key == expectedKey
|
||||
response[0].data == expectedExpiration.toBytes
|
||||
|
||||
test "Should refuse update expiry with negative timestamp":
|
||||
let
|
||||
blk = createTestBlock(100)
|
||||
expectedExpiration: SecondsSince1970 = now + 10
|
||||
expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet
|
||||
|
||||
(await repo.putBlock(blk, some 10.seconds)).tryGet
|
||||
|
||||
var response = await queryMetaDs(expectedKey)
|
||||
|
||||
check:
|
||||
response.len == 1
|
||||
!response[0].key == expectedKey
|
||||
response[0].data == expectedExpiration.toBytes
|
||||
|
||||
expect ValueError:
|
||||
(await repo.ensureExpiry(blk.cid, -1)).tryGet
|
||||
|
||||
expect ValueError:
|
||||
(await repo.ensureExpiry(blk.cid, 0)).tryGet
|
||||
|
||||
test "Should fail when updating expiry of non-existing block":
|
||||
let
|
||||
blk = createTestBlock(100)
|
||||
|
||||
expect BlockNotFoundError:
|
||||
(await repo.ensureExpiry(blk.cid, 10)).tryGet
|
||||
|
||||
test "Should update block expiration timestamp when new expiration is farther":
|
||||
let
|
||||
duration = 10
|
||||
blk = createTestBlock(100)
|
||||
expectedExpiration: SecondsSince1970 = now + duration
|
||||
updatedExpectedExpiration: SecondsSince1970 = expectedExpiration + 10
|
||||
expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet
|
||||
|
||||
(await repo.putBlock(blk, some duration.seconds)).tryGet
|
||||
|
||||
var response = await queryMetaDs(expectedKey)
|
||||
|
||||
check:
|
||||
response.len == 1
|
||||
!response[0].key == expectedKey
|
||||
response[0].data == expectedExpiration.toBytes
|
||||
|
||||
(await repo.ensureExpiry(blk.cid, updatedExpectedExpiration)).tryGet
|
||||
|
||||
response = await queryMetaDs(expectedKey)
|
||||
|
||||
check:
|
||||
response.len == 1
|
||||
!response[0].key == expectedKey
|
||||
response[0].data == updatedExpectedExpiration.toBytes
|
||||
|
||||
test "Should not update block expiration timestamp when current expiration is farther then new one":
|
||||
let
|
||||
duration = 10
|
||||
blk = createTestBlock(100)
|
||||
expectedExpiration: SecondsSince1970 = now + duration
|
||||
updatedExpectedExpiration: SecondsSince1970 = expectedExpiration - 10
|
||||
expectedKey = Key.init((BlocksTtlKey / $blk.cid).tryGet).tryGet
|
||||
|
||||
|
||||
(await repo.putBlock(blk, some duration.seconds)).tryGet
|
||||
|
||||
var response = await queryMetaDs(expectedKey)
|
||||
|
||||
check:
|
||||
response.len == 1
|
||||
!response[0].key == expectedKey
|
||||
response[0].data == expectedExpiration.toBytes
|
||||
|
||||
(await repo.ensureExpiry(blk.cid, updatedExpectedExpiration)).tryGet
|
||||
|
||||
response = await queryMetaDs(expectedKey)
|
||||
|
||||
check:
|
||||
response.len == 1
|
||||
!response[0].key == expectedKey
|
||||
response[0].data == expectedExpiration.toBytes
|
||||
|
||||
test "delBlock should remove expiration metadata":
|
||||
let
|
||||
blk = createTestBlock(100)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user