Expiry per dataset

This commit is contained in:
Tomasz Bekas 2024-06-05 19:09:04 +02:00
parent 2771ca6319
commit e9972d1e11
No known key found for this signature in database
GPG Key ID: 4854E04C98824959
22 changed files with 519 additions and 643 deletions

View File

@ -88,12 +88,9 @@ The following options are available:
--api-bindaddr The REST API bind address [=127.0.0.1].
-p, --api-port The REST Api port [=8080].
--repo-kind Backend for main repo store (fs, sqlite) [=fs].
-q, --storage-quota The size of the total storage quota dedicated to the node [=8589934592].
-t, --block-ttl Default block timeout in seconds - 0 disables the ttl [=$DefaultBlockTtl].
--block-mi Time interval in seconds - determines frequency of block maintenance cycle: how
often blocks are checked for expiration and cleanup
[=$DefaultBlockMaintenanceInterval].
--block-mn Number of blocks to check every maintenance cycle [=1000].
-q, --storage-quota The size in bytes of the total storage quota dedicated to the node [=8589934592].
-t, --default-ttl Default dataset expiry [=1d].
--maintenance-interval Determines how frequently datasets are checked for expiration and cleanup [=5m].
-c, --cache-size The size of the block cache, 0 disables the cache - might help on slow hardrives
[=0].

View File

@ -54,7 +54,7 @@ type
restServer: RestServerRef
codexNode: CodexNodeRef
repoStore: RepoStore
maintenance: BlockMaintainer
maintenance: DatasetMaintainer
taskpool: Taskpool
CodexPrivateKey* = libp2p.PrivateKey # alias
@ -246,6 +246,9 @@ proc new*(
wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch)
metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace)
.expect("Should create metadata store!")
repoData = case config.repoKind
of repoFS: Datastore(FSDatastore.new($config.dataDir, depth = 5)
.expect("Should create repo file data store!"))
@ -256,21 +259,21 @@ proc new*(
repoStore = RepoStore.new(
repoDs = repoData,
metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace)
.expect("Should create metadata store!"),
quotaMaxBytes = config.storageQuota,
blockTtl = config.blockTtl)
metaDs = metaDs,
quotaMaxBytes = config.storageQuota)
maintenance = BlockMaintainer.new(
maintenance = DatasetMaintainer.new(
repoStore,
interval = config.blockMaintenanceInterval,
numberOfBlocksPerInterval = config.blockMaintenanceNumberOfBlocks)
metaDs,
defaultExpiry = config.defaultExpiry,
interval = config.maintenanceInterval)
peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
blockDiscovery = DiscoveryEngine.new(repoStore, peerStore, network, discovery, pendingBlocks)
engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, peerStore, pendingBlocks)
store = NetworkStore.new(engine, repoStore)
prover = if config.prover:
if not fileAccessible($config.circomR1cs, {AccessFlags.Read}) and
endsWith($config.circomR1cs, ".r1cs"):
@ -306,6 +309,7 @@ proc new*(
codexNode = CodexNodeRef.new(
switch = switch,
networkStore = store,
maintenance = maintenance,
engine = engine,
prover = prover,
discovery = discovery,

View File

@ -42,9 +42,8 @@ export units, net, codextypes, logutils
export
DefaultQuotaBytes,
DefaultBlockTtl,
DefaultBlockMaintenanceInterval,
DefaultNumberOfBlocksToMaintainPerInterval
DefaultDefaultExpiry,
DefaultMaintenanceInterval
proc defaultDataDir*(): string =
let dataDir = when defined(windows):
@ -209,24 +208,18 @@ type
name: "storage-quota"
abbr: "q" }: NBytes
blockTtl* {.
desc: "Default block timeout in seconds - 0 disables the ttl"
defaultValue: DefaultBlockTtl
defaultValueDesc: $DefaultBlockTtl
name: "block-ttl"
defaultExpiry* {.
desc: "Default dataset expiry in seconds"
defaultValue: DefaultDefaultExpiry
defaultValueDesc: $DefaultDefaultExpiry
name: "default-ttl"
abbr: "t" }: Duration
blockMaintenanceInterval* {.
desc: "Time interval in seconds - determines frequency of block maintenance cycle: how often blocks are checked for expiration and cleanup"
defaultValue: DefaultBlockMaintenanceInterval
defaultValueDesc: $DefaultBlockMaintenanceInterval
name: "block-mi" }: Duration
blockMaintenanceNumberOfBlocks* {.
desc: "Number of blocks to check every maintenance cycle"
defaultValue: DefaultNumberOfBlocksToMaintainPerInterval
defaultValueDesc: $DefaultNumberOfBlocksToMaintainPerInterval
name: "block-mn" }: int
maintenanceInterval* {.
desc: "Determines how frequently datasets are checked for expiration and cleanup"
defaultValue: DefaultMaintenanceInterval
defaultValueDesc: $DefaultMaintenanceInterval
name: "maintenance-interval" }: Duration
cacheSize* {.
desc: "The size of the block cache, 0 disables the cache - might help on slow hardrives"

View File

@ -18,6 +18,8 @@ const
CodexMetaNamespace & "/ttl"
CodexBlockProofNamespace* = # Cid and Proof
CodexMetaNamespace & "/proof"
CodexDatasetMetadataNamespace* = # Dataset
CodexMetaNamespace & "/dataset"
CodexDhtNamespace* = "dht" # Dht namespace
CodexDhtProvidersNamespace* = # Dht providers namespace
CodexDhtNamespace & "/providers"

View File

@ -65,6 +65,7 @@ type
switch: Switch
networkId: PeerId
networkStore: NetworkStore
maintenance: DatasetMaintainer
engine: BlockExcEngine
prover: ?Prover
discovery: Discovery
@ -155,17 +156,8 @@ proc updateExpiry*(
trace "Unable to fetch manifest for cid", manifestCid
return failure(error)
try:
let
ensuringFutures = Iter[int].new(0..<manifest.blocksCount)
.mapIt(self.networkStore.localStore.ensureExpiry( manifest.treeCid, it, expiry ))
await allFuturesThrowing(ensuringFutures)
except CancelledError as exc:
raise exc
except CatchableError as exc:
return failure(exc.msg)
await self.maintenance.ensureExpiry(manifest.treeCid, expiry)
return success()
proc fetchBatched*(
self: CodexNodeRef,
@ -274,6 +266,13 @@ proc streamEntireDataset(
# Retrieve all blocks of the dataset sequentially from the local store or network
trace "Creating store stream for manifest", manifestCid
if err =? (await self.maintenance.trackExpiry(
manifest.treeCid,
manifest.blocksCount,
manifestsCids = @[manifestCid])).errorOption:
return failure(err)
LPStream(StoreStream.new(self.networkStore, manifest, pad = false)).success
proc retrieve*(
@ -361,6 +360,12 @@ proc store*(
error "Unable to store manifest"
return failure(err)
if err =? (await self.maintenance.trackExpiry(
treeCid,
manifest.blocksCount,
manifestsCids = @[manifestBlk.cid])).errorOption:
return failure(err)
info "Stored data", manifestCid = manifestBlk.cid,
treeCid = treeCid,
blocks = manifest.blocksCount,
@ -566,18 +571,17 @@ proc onStore(
trace "Slot index not in manifest", slotIdx
return failure(newException(CodexError, "Slot index not in manifest"))
proc updateExpiry(blocks: seq[bt.Block]): Future[?!void] {.async.} =
trace "Updating expiry for blocks", blocks = blocks.len
proc onBatch(blocks: seq[bt.Block]): Future[?!void] {.async.} =
if not blocksCb.isNil:
await blocksCb(blocks)
else:
success()
let ensureExpiryFutures = blocks.mapIt(self.networkStore.ensureExpiry(it.cid, expiry))
if updateExpiryErr =? (await allFutureResult(ensureExpiryFutures)).errorOption:
return failure(updateExpiryErr)
if not blocksCb.isNil and err =? (await blocksCb(blocks)).errorOption:
trace "Unable to process blocks", err = err.msg
return failure(err)
return success()
if err =? (await self.maintenance.trackExpiry(
manifest.treeCid,
manifest.blocksCount,
manifestsCids = @[cid])).errorOption:
return failure(err)
without indexer =? manifest.verifiableStrategy.init(
0, manifest.blocksCount - 1, manifest.numSlots).catch, err:
@ -591,7 +595,7 @@ proc onStore(
if err =? (await self.fetchBatched(
manifest.treeCid,
blksIter,
onBatch = updateExpiry)).errorOption:
onBatch = onBatch)).errorOption:
trace "Unable to fetch blocks", err = err.msg
return failure(err)
@ -607,6 +611,11 @@ proc onStore(
trace "Slot successfully retrieved and reconstructed"
if err =? (await self.maintenance.ensureExpiry(
manifest.treeCid,
expiry)).errorOption:
return failure(err)
return success()
proc onProve(
@ -771,6 +780,7 @@ proc new*(
T: type CodexNodeRef,
switch: Switch,
networkStore: NetworkStore,
maintenance: DatasetMaintainer,
engine: BlockExcEngine,
discovery: Discovery,
prover = Prover.none,
@ -782,6 +792,7 @@ proc new*(
CodexNodeRef(
switch: switch,
networkStore: networkStore,
maintenance: maintenance,
engine: engine,
prover: prover,
discovery: discovery,

View File

@ -61,9 +61,9 @@ method getBlockAndProof*(self: BlockStore, treeCid: Cid, index: Natural): Future
raiseAssert("getBlockAndProof not implemented!")
method putBlock*(
self: BlockStore,
blk: Block,
ttl = Duration.none): Future[?!void] {.base.} =
self: BlockStore,
blk: Block
): Future[?!void] {.base.} =
## Put a block to the blockstore
##
@ -89,27 +89,6 @@ method getCidAndProof*(
raiseAssert("getCidAndProof not implemented!")
method ensureExpiry*(
self: BlockStore,
cid: Cid,
expiry: SecondsSince1970): Future[?!void] {.base.} =
## Ensure that block's assosicated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
raiseAssert("Not implemented!")
method ensureExpiry*(
self: BlockStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970): Future[?!void] {.base.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
raiseAssert("Not implemented!")
method delBlock*(self: BlockStore, cid: Cid): Future[?!void] {.base.} =
## Delete a block from the blockstore
##
@ -134,6 +113,15 @@ method hasBlock*(self: BlockStore, tree: Cid, index: Natural): Future[?!bool] {.
raiseAssert("hasBlock not implemented!")
method hasCidAndProof*(
self: BlockStore,
treeCid: Cid,
index: Natural): Future[?!bool] {.base.} =
## Check if block cid and proof exists in the blockstore
##
raiseAssert("hasBlock not implemented!")
method listBlocks*(
self: BlockStore,
blockType = BlockType.Manifest): Future[?!AsyncIter[?Cid]] {.base.} =

View File

@ -186,8 +186,7 @@ func putBlockSync(self: CacheStore, blk: Block): bool =
method putBlock*(
self: CacheStore,
blk: Block,
ttl = Duration.none): Future[?!void] {.async.} =
blk: Block): Future[?!void] {.async.} =
## Put a block to the blockstore
##
@ -209,27 +208,6 @@ method putCidAndProof*(
self.cidAndProofCache[(treeCid, index)] = (blockCid, proof)
success()
method ensureExpiry*(
self: CacheStore,
cid: Cid,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Updates block's assosicated TTL in store - not applicable for CacheStore
##
discard # CacheStore does not have notion of TTL
method ensureExpiry*(
self: CacheStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Updates block's associated TTL in store - not applicable for CacheStore
##
discard # CacheStore does not have notion of TTL
method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} =
## Delete a block from the blockstore
##

View File

@ -25,6 +25,7 @@ const
CodexManifestKey* = Key.init(CodexManifestNamespace).tryGet
BlocksTtlKey* = Key.init(CodexBlocksTtlNamespace).tryGet
BlockProofKey* = Key.init(CodexBlockProofNamespace).tryGet
DatasetMetadataKey* = Key.init(CodexDatasetMetadataNamespace).tryGet
QuotaKey* = Key.init(CodexQuotaNamespace).tryGet
QuotaUsedKey* = (QuotaKey / "used").tryGet
QuotaReservedKey* = (QuotaKey / "reserved").tryGet
@ -47,3 +48,10 @@ proc createBlockExpirationMetadataQueryKey*(): ?!Key =
proc createBlockCidAndProofMetadataKey*(treeCid: Cid, index: Natural): ?!Key =
(BlockProofKey / $treeCid).flatMap((k: Key) => k / $index)
proc createDatasetMetadataKey*(treeCid: Cid): ?!Key =
DatasetMetadataKey / $treeCid
proc createDatasetMetadataQueryKey*(): ?!Key =
let queryString = ? (DatasetMetadataKey / "*")
Key.init(queryString)

View File

@ -10,92 +10,338 @@
## Store maintenance module
## Looks for and removes expired blocks from blockstores.
import std/sequtils
import pkg/chronos
import pkg/chronicles
import pkg/libp2p/cid
import pkg/serde/json
import pkg/datastore
import pkg/datastore/typedds
import pkg/questionable
import pkg/questionable/results
import ./repostore
import ./blockstore
import ./keyutils
import ./queryiterhelper
import ../utils/timer
import ../utils/asynciter
import ../utils/json
import ../utils/trackedfutures
import ../clock
import ../logutils
import ../systemclock
logScope:
topics = "codex maintenance"
const
DefaultBlockMaintenanceInterval* = 10.minutes
DefaultNumberOfBlocksToMaintainPerInterval* = 1000
DefaultDefaultExpiry* = 24.hours
DefaultMaintenanceInterval* = 5.minutes
TimestampUpdateCycle* = 1000
## Update timestamp after deleting a block that's index
## is a multiple of this number. The lower the number
## the update is more frequent.
##
DefaultRestartDelay* = 15.minutes
## If no progress was observed for this amount of time
## we're going to restart deletion of the dataset
##
type
BlockMaintainer* = ref object of RootObj
repoStore: RepoStore
DatasetMaintainer* = ref object of RootObj
blockStore: BlockStore
metaDs: TypedDatastore
interval: Duration
defaultExpiry: Duration
restartDelay: Duration
timer: Timer
clock: Clock
numberOfBlocksPerInterval: int
offset: int
trackedFutures: TrackedFutures
DatasetMetadata* {.serialize.} = object
## Represents metadata for a tracked dataset. Field `maintenanceTimestamp`
## reflect last update from the maintance routine
##
expiry*: SecondsSince1970
manifestsCids*: seq[Cid]
maintenanceTimestamp*: SecondsSince1970
MissingKey* = object of CodexError
proc new*(
T: type BlockMaintainer,
repoStore: RepoStore,
interval: Duration,
numberOfBlocksPerInterval = 100,
T: type DatasetMaintainer,
blockStore: BlockStore,
metaDs: Datastore,
defaultExpiry = DefaultDefaultExpiry,
interval = DefaultMaintenanceInterval,
restartDelay = DefaultRestartDelay,
timer = Timer.new(),
clock: Clock = SystemClock.new()
): BlockMaintainer =
## Create new BlockMaintainer instance
clock: Clock = SystemClock.new(),
trackedFutures = TrackedFutures.new()
): DatasetMaintainer =
## Create new DatasetMaintainer instance
##
## Call `start` to begin looking for for expired blocks
##
BlockMaintainer(
repoStore: repoStore,
DatasetMaintainer(
blockStore: blockStore,
metaDs: TypedDatastore.init(metaDs),
defaultExpiry: defaultExpiry,
interval: interval,
numberOfBlocksPerInterval: numberOfBlocksPerInterval,
restartDelay: restartDelay,
timer: timer,
clock: clock,
offset: 0)
trackedFutures: trackedFutures)
proc deleteExpiredBlock(self: BlockMaintainer, cid: Cid): Future[void] {.async.} =
if isErr (await self.repoStore.delBlock(cid)):
trace "Unable to delete block from repoStore"
proc encode(t: DatasetMetadata): seq[byte] = serializer.toJson(t).toBytes()
proc decode(T: type DatasetMetadata, bytes: seq[byte]): ?!T = T.fromJson(bytes)
proc processBlockExpiration(self: BlockMaintainer, be: BlockExpiration): Future[void] {.async} =
if be.expiry < self.clock.now:
await self.deleteExpiredBlock(be.cid)
else:
inc self.offset
proc trackExpiry*(
self: DatasetMaintainer,
treeCid: Cid,
expiry: SecondsSince1970,
manifestsCids: seq[Cid]
): Future[?!void] {.async.} =
# Starts tracking expiry of a given dataset
#
proc runBlockCheck(self: BlockMaintainer): Future[void] {.async.} =
let expirations = await self.repoStore.getBlockExpirations(
maxNumber = self.numberOfBlocksPerInterval,
offset = self.offset
)
trace "Tracking an expiry of a dataset", treeCid, expiry
without iter =? expirations, err:
trace "Unable to obtain blockExpirations iterator from repoStore"
return
without key =? createDatasetMetadataKey(treeCid), err:
return failure(err)
var numberReceived = 0
for beFut in iter:
let be = await beFut
inc numberReceived
await self.processBlockExpiration(be)
proc modifyFn(maybeCurrDatasetMd: ?DatasetMetadata): Future[?DatasetMetadata] {.async.} =
var md: DatasetMetadata
if currDatasetMd =? maybeCurrDatasetMd:
md.expiry = max(currDatasetMd.expiry, expiry)
md.manifestsCids = (currDatasetMd.manifestsCids & manifestsCids).deduplicate
md.maintenanceTimestamp = currDatasetMd.maintenanceTimestamp
else:
md.expiry = expiry
md.manifestsCids = manifestsCids
md.maintenanceTimestamp = 0
md.some
await modify[DatasetMetadata](self.metaDs, key, modifyFn)
proc trackExpiry*(
self: DatasetMaintainer,
treeCid: Cid,
manifestsCids: seq[Cid]
): Future[?!void] {.async.} =
await self.trackExpiry(treeCid, self.clock.now + self.defaultExpiry.seconds, manifestsCids)
proc trackExpiry*(
self: DatasetMaintainer,
cid: Cid,
manifestsCids: seq[Cid]
): Future[?!void] {.async.} =
await self.trackExpiry(cid, self.clock.now + self.defaultExpiry.seconds, manifestsCids)
proc findLeavesCount*(
self: DatasetMaintainer,
treeCid: Cid
): Future[?!Natural] {.async.} =
## Find out how many leaves are stored for a tree (visible for tests)
##
proc bisect(startIdx: Natural, endIdx: Natural): Future[?!Natural] {.async.} =
if startIdx < endIdx:
let midIdx = startIdx + (endIdx - startIdx) div 2.Natural
without leafPresent =? (await self.blockStore.hasCidAndProof(treeCid, midIdx)), err:
return failure(err)
if leafPresent:
return await bisect(midIdx + 1.Natural, endIdx)
else:
return await bisect(startIdx, midIdx)
else:
return success(startIdx)
return await bisect(Natural.low, Natural.high)
proc ensureExpiry*(
self: DatasetMaintainer,
treeCid: Cid,
minExpiry: SecondsSince1970): Future[?!void] {.async.} =
## Sets the dataset expiry to a max of two values: current expiry and `minExpiry`,
## if a dataset for given `treeCid` is not currently tracked a CatchableError is thrown
##
trace "Updating a dataset expiry", treeCid, minExpiry
without key =? createDatasetMetadataKey(treeCid), err:
return failure(err)
proc modifyFn(maybeCurrDatasetMd: ?DatasetMetadata): Future[?DatasetMetadata] {.async.} =
if currDatasetMd =? maybeCurrDatasetMd:
let datasetMd = DatasetMetadata(
expiry: max(currDatasetMd.expiry, minExpiry),
manifestsCids: currDatasetMd.manifestsCids,
maintenanceTimestamp: currDatasetMd.maintenanceTimestamp
)
return datasetMd.some
else:
raise newException(CatchableError, "DatasetMetadata for treeCid " & $treeCid & " not found")
await modify[DatasetMetadata](self.metaDs, key, modifyFn)
proc updateTimestamp*(self: DatasetMaintainer, treeCid: Cid, datasetMd: DatasetMetadata): Future[?!void] {.async.} =
without key =? createDatasetMetadataKey(treeCid), err:
return failure(err)
var datasetMd = datasetMd
datasetMd.maintenanceTimestamp = self.clock.now
proc modifyFn(maybeCurrDatasetMd: ?DatasetMetadata): Future[?DatasetMetadata] {.async.} =
if currDatasetMd =? maybeCurrDatasetMd:
if currDatasetMd.expiry != datasetMd.expiry or currDatasetMd.manifestsCids != datasetMd.manifestsCids:
raise newException(CatchableError, "Change in expiry detected, interrupting maintenance for dataset with treeCid " & $treeCid)
datasetMd.some
else:
raise newException(CatchableError, "Metadata for dataset with treeCid " & $treeCid & " not found")
await self.metaDs.modify(key, modifyFn)
proc deleteDatasetMetadata(self: DatasetMaintainer, treeCid: Cid, datasetMd: DatasetMetadata): Future[?!void] {.async.} =
without key =? createDatasetMetadataKey(treeCid), err:
return failure(err)
proc modifyFn(maybeCurrDatasetMd: ?DatasetMetadata): Future[?DatasetMetadata] {.async.} =
if currDatasetMd =? maybeCurrDatasetMd:
if currDatasetMd.expiry != datasetMd.expiry or currDatasetMd.manifestsCids != datasetMd.manifestsCids:
raise newException(CatchableError, "Change in expiry detected, interrupting maintenance for dataset with treeCid " & $treeCid)
DatasetMetadata.none
else:
raise newException(CatchableError, "Metadata for dataset with treeCid " & $treeCid & " not found")
await self.metaDs.modify(key, modifyFn)
proc deleteDataset(self: DatasetMaintainer, treeCid: Cid, datasetMd: DatasetMetadata): Future[?!void] {.async.} =
logScope:
treeCid = treeCid
manifestsCids = datasetMd.manifestsCids
if err =? (await self.updateTimestamp(treeCid, datasetMd)).errorOption:
return failure(err)
without leavesCount =? (await self.findLeavesCount(treeCid)), err:
return failure(err)
if leavesCount == 0:
trace "No leaves/blocks found to delete"
return success()
trace "Starting to delete leaves/blocks", leavesCount
var index = leavesCount
while index > 0:
index.dec
if err =? (await self.blockStore.delBlock(treeCid, index)).errorOption:
error "Error deleting a block", msg = err.msg, index
await sleepAsync(1.millis) # cooperative scheduling
# If we received fewer blockExpirations from the iterator than we asked for,
# We're at the end of the dataset and should start from 0 next time.
if numberReceived < self.numberOfBlocksPerInterval:
self.offset = 0
if (index mod TimestampUpdateCycle) == 0:
if err =? (await self.updateTimestamp(treeCid, datasetMd)).errorOption:
return failure(err)
trace "Finished deleting leaves/blocks", leavesCount
proc start*(self: BlockMaintainer) =
for manifestCid in datasetMd.manifestsCids:
if err =? (await self.blockStore.delBlock(manifestCid)).errorOption:
error "Error deleting manifest", cid = manifestCid
if err =? (await self.deleteDatasetMetadata(treeCid, datasetMd)).errorOption:
return failure(err)
else:
return success()
proc superviseDatasetDeletion(self: DatasetMaintainer, treeCid: Cid, datasetMd: DatasetMetadata): Future[void] {.async.} =
logScope:
treeCid = treeCid
manifestsCids = datasetMd.manifestsCids
expiry = datasetMd.expiry
try:
if err =? (await self.deleteDataset(treeCid, datasetMd)).errorOption:
error "Error occurred during deletion of a dataset", msg = err.msg
else:
trace "Dataset deletion complete"
except CancelledError as err:
raise err
except CatchableError as err:
error "Unexpected error during dataset deletion", msg = err.msg, treeCid = treeCid
proc listDatasetMetadata*(
self: DatasetMaintainer
): Future[?!AsyncIter[(Cid, DatasetMetadata)]] {.async.} =
without queryKey =? createDatasetMetadataQueryKey(), err:
return failure(err)
let mdQuery = Query.init(queryKey)
without queryIter =? await query[DatasetMetadata](self.metaDs, mdQuery), err:
error "Unable to execute block expirations query", err = err.msg
return failure(err)
without asyncQueryIter =? await queryIter.toAsyncIter(), err:
error "Unable to convert QueryIter to AsyncIter", err = err.msg
return failure(err)
let
filteredIter = await asyncQueryIter.filterSuccess()
datasetMdIter = await mapFilter[KeyVal[DatasetMetadata], (Cid, DatasetMetadata)](filteredIter,
proc (kv: KeyVal[DatasetMetadata]): Future[?(Cid, DatasetMetadata)] {.async.} =
without cid =? Cid.init(kv.key.value).mapFailure, err:
error "Failed decoding cid", err = err.msg
return (Cid, DatasetMetadata).none
(cid, kv.value).some
)
success(datasetMdIter)
proc checkDatasets(self: DatasetMaintainer): Future[?!void] {.async.} =
without iter =? await self.listDatasetMetadata(), err:
return failure(err)
for fut in iter:
let (treeCid, datasetMd) = await fut
if (datasetMd.expiry < self.clock.now) and
(datasetMd.maintenanceTimestamp + self.restartDelay.seconds < self.clock.now):
asyncSpawn self.superviseDatasetDeletion(treeCid, datasetMd).track(self)
else:
trace "Item either not expired or expired but already in maintenance", treeCid, expiry = datasetMd.expiry, timestamp = datasetMd.maintenanceTimestamp
success()
proc start*(self: DatasetMaintainer) =
proc onTimer(): Future[void] {.async.} =
try:
await self.runBlockCheck()
except CancelledError as error:
raise error
except CatchableError as exc:
error "Unexpected exception in BlockMaintainer.onTimer(): ", msg=exc.msg
if err =? (await self.checkDatasets()).errorOption:
error "Error when checking datasets", msg = err.msg
except CancelledError as err:
raise err
except CatchableError as err:
error "Error when checking datasets", msg = err.msg
self.timer.start(onTimer, self.interval)
if self.interval.seconds > 0:
self.timer.start(onTimer, self.interval)
proc stop*(self: BlockMaintainer): Future[void] {.async.} =
proc stop*(self: DatasetMaintainer): Future[void] {.async.} =
await self.timer.stop()
await self.trackedFutures.cancelTracked()

View File

@ -61,11 +61,13 @@ method getBlock*(self: NetworkStore, treeCid: Cid, index: Natural): Future[?!Blo
method putBlock*(
self: NetworkStore,
blk: Block,
ttl = Duration.none): Future[?!void] {.async.} =
blk: Block): Future[?!void] {.async.} =
## Store block locally and notify the network
##
let res = await self.localStore.putBlock(blk, ttl)
trace "Putting block into network store", cid = blk.cid
let res = await self.localStore.putBlock(blk)
if res.isErr:
return res
@ -89,43 +91,6 @@ method getCidAndProof*(
self.localStore.getCidAndProof(treeCid, index)
method ensureExpiry*(
self: NetworkStore,
cid: Cid,
expiry: SecondsSince1970): Future[?!void] {.async.} =
## Ensure that block's assosicated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
without blockCheck =? await self.localStore.hasBlock(cid), err:
return failure(err)
if blockCheck:
return await self.localStore.ensureExpiry(cid, expiry)
else:
trace "Updating expiry - block not in local store", cid
return success()
method ensureExpiry*(
self: NetworkStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970): Future[?!void] {.async.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
without blockCheck =? await self.localStore.hasBlock(treeCid, index), err:
return failure(err)
if blockCheck:
return await self.localStore.ensureExpiry(treeCid, index, expiry)
else:
trace "Updating expiry - block not in local store", treeCid, index
return success()
method listBlocks*(
self: NetworkStore,
blockType = BlockType.Manifest): Future[?!AsyncIter[?Cid]] =

View File

@ -64,6 +64,12 @@ proc getLeafMetadata*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!L
success(leafMd)
proc hasLeafMetadata*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!bool] {.async.} =
without key =? createBlockCidAndProofMetadataKey(treeCid, index), err:
return failure(err)
return await self.metaDs.has(key)
proc updateTotalBlocksCount*(self: RepoStore, plusCount: Natural = 0, minusCount: Natural = 0): Future[?!void] {.async.} =
await self.metaDs.modify(CodexTotalBlocksKey,
proc (maybeCurrCount: ?Natural): Future[?Natural] {.async.} =
@ -109,8 +115,7 @@ proc updateBlockMetadata*(
self: RepoStore,
cid: Cid,
plusRefCount: Natural = 0,
minusRefCount: Natural = 0,
minExpiry: SecondsSince1970 = 0
minusRefCount: Natural = 0
): Future[?!void] {.async.} =
if cid.isEmpty:
return success()
@ -123,14 +128,13 @@ proc updateBlockMetadata*(
if currBlockMd =? maybeCurrBlockMd:
BlockMetadata(
size: currBlockMd.size,
expiry: max(currBlockMd.expiry, minExpiry),
refCount: currBlockMd.refCount + plusRefCount - minusRefCount
).some
else:
raise newException(BlockNotFoundError, "Metadata for block with cid " & $cid & " not found")
)
proc storeBlock*(self: RepoStore, blk: Block, minExpiry: SecondsSince1970): Future[?!StoreResult] {.async.} =
proc storeBlock*(self: RepoStore, blk: Block): Future[?!StoreResult] {.async.} =
if blk.isEmpty:
return success(StoreResult(kind: AlreadyInStore))
@ -148,7 +152,7 @@ proc storeBlock*(self: RepoStore, blk: Block, minExpiry: SecondsSince1970): Futu
if currMd =? maybeCurrMd:
if currMd.size == blk.data.len.NBytes:
md = BlockMetadata(size: currMd.size, expiry: max(currMd.expiry, minExpiry), refCount: currMd.refCount)
md = BlockMetadata(size: currMd.size, refCount: currMd.refCount)
res = StoreResult(kind: AlreadyInStore)
# making sure that the block acutally is stored in the repoDs
@ -162,7 +166,7 @@ proc storeBlock*(self: RepoStore, blk: Block, minExpiry: SecondsSince1970): Futu
else:
raise newException(CatchableError, "Repo already stores a block with the same cid but with a different size, cid: " & $blk.cid)
else:
md = BlockMetadata(size: blk.data.len.NBytes, expiry: minExpiry, refCount: 0)
md = BlockMetadata(size: blk.data.len.NBytes, refCount: 0)
res = StoreResult(kind: Stored, used: blk.data.len.NBytes)
if err =? (await self.repoDs.put(blkKey, blk.data)).errorOption:
raise err
@ -170,7 +174,7 @@ proc storeBlock*(self: RepoStore, blk: Block, minExpiry: SecondsSince1970): Futu
(md.some, res)
)
proc tryDeleteBlock*(self: RepoStore, cid: Cid, expiryLimit = SecondsSince1970.low): Future[?!DeleteResult] {.async.} =
proc tryDeleteBlock*(self: RepoStore, cid: Cid): Future[?!DeleteResult] {.async.} =
if cid.isEmpty:
return success(DeleteResult(kind: InUse))
@ -187,7 +191,7 @@ proc tryDeleteBlock*(self: RepoStore, cid: Cid, expiryLimit = SecondsSince1970.l
res: DeleteResult
if currMd =? maybeCurrMd:
if currMd.refCount == 0 or currMd.expiry < expiryLimit:
if currMd.refCount == 0:
maybeMeta = BlockMetadata.none
res = DeleteResult(kind: Deleted, released: currMd.size)

View File

@ -85,35 +85,6 @@ method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] =
else:
self.getBlock(address.cid)
method ensureExpiry*(
self: RepoStore,
cid: Cid,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
if expiry <= 0:
return failure(newException(ValueError, "Expiry timestamp must be larger then zero"))
await self.updateBlockMetadata(cid, minExpiry = expiry)
method ensureExpiry*(
self: RepoStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
return failure(err)
await self.ensureExpiry(leafMd.blkCid, expiry)
method putCidAndProof*(
self: RepoStore,
treeCid: Cid,
@ -166,17 +137,14 @@ method getCid*(
method putBlock*(
self: RepoStore,
blk: Block,
ttl = Duration.none): Future[?!void] {.async.} =
blk: Block): Future[?!void] {.async.} =
## Put a block to the blockstore
##
logScope:
cid = blk.cid
let expiry = self.clock.now() + (ttl |? self.blockTtl).seconds
without res =? await self.storeBlock(blk, expiry), err:
without res =? await self.storeBlock(blk), err:
return failure(err)
if res.kind == Stored:
@ -195,7 +163,7 @@ method putBlock*(
return success()
method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
## Delete a block from the blockstore when block refCount is 0 or block is expired
## Delete a block from the blockstore when block refCount is 0
##
logScope:
@ -203,7 +171,7 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
trace "Attempting to delete a block"
without res =? await self.tryDeleteBlock(cid, self.clock.now()), err:
without res =? await self.tryDeleteBlock(cid), err:
return failure(err)
if res.kind == Deleted:
@ -214,7 +182,7 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
if err =? (await self.updateQuotaUsage(minusUsed = res.released)).errorOption:
return failure(err)
elif res.kind == InUse:
trace "Block in use, refCount > 0 and not expired"
trace "Block in use, refCount > 0"
else:
trace "Block not found in store"
@ -259,6 +227,15 @@ method hasBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!bool]
await self.hasBlock(leafMd.blkCid)
method hasCidAndProof*(
self: RepoStore,
treeCid: Cid,
index: Natural): Future[?!bool] {.async.} =
## Check if block cid and proof exists in the blockstore
##
await self.hasLeafMetadata(treeCid, index)
method listBlocks*(
self: RepoStore,
blockType = BlockType.Manifest
@ -296,42 +273,6 @@ method listBlocks*(
iter.next = next
return success iter
proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query =
let queryKey = ? createBlockExpirationMetadataQueryKey()
success Query.init(queryKey, offset = offset, limit = maxNumber)
method getBlockExpirations*(
self: RepoStore,
maxNumber: int,
offset: int): Future[?!AsyncIter[BlockExpiration]] {.async, base.} =
## Get iterator with block expirations
##
without beQuery =? createBlockExpirationQuery(maxNumber, offset), err:
error "Unable to format block expirations query", err = err.msg
return failure(err)
without queryIter =? await query[BlockMetadata](self.metaDs, beQuery), err:
error "Unable to execute block expirations query", err = err.msg
return failure(err)
without asyncQueryIter =? await queryIter.toAsyncIter(), err:
error "Unable to convert QueryIter to AsyncIter", err = err.msg
return failure(err)
let
filteredIter = await asyncQueryIter.filterSuccess()
blockExpIter = await mapFilter[KeyVal[BlockMetadata], BlockExpiration](filteredIter,
proc (kv: KeyVal[BlockMetadata]): Future[?BlockExpiration] {.async.} =
without cid =? Cid.init(kv.key.value).mapFailure, err:
error "Failed decoding cid", err = err.msg
return BlockExpiration.none
BlockExpiration(cid: cid, expiry: kv.value.expiry).some
)
success(blockExpIter)
method close*(self: RepoStore): Future[void] {.async.} =
## Close the blockstore, cleaning up resources managed by it.
## For some implementations this may be a no-op

View File

@ -30,11 +30,9 @@ type
postFixLen*: int
repoDs*: Datastore
metaDs*: TypedDatastore
clock*: Clock
quotaMaxBytes*: NBytes
quotaUsage*: QuotaUsage
totalBlocks*: Natural
blockTtl*: Duration
started*: bool
QuotaUsage* {.serialize.} = object
@ -42,7 +40,6 @@ type
reserved*: NBytes
BlockMetadata* {.serialize.} = object
expiry*: SecondsSince1970
size*: NBytes
refCount*: Natural
@ -50,13 +47,9 @@ type
blkCid*: Cid
proof*: CodexProof
BlockExpiration* {.serialize.} = object
cid*: Cid
expiry*: SecondsSince1970
DeleteResultKind* {.serialize.} = enum
Deleted = 0, # block removed from store
InUse = 1, # block not removed, refCount > 0 and not expired
InUse = 1, # block not removed, refCount > 0
NotFound = 2 # block not found in store
DeleteResult* {.serialize.} = object
@ -90,18 +83,14 @@ func new*(
T: type RepoStore,
repoDs: Datastore,
metaDs: Datastore,
clock: Clock = SystemClock.new(),
postFixLen = 2,
quotaMaxBytes = DefaultQuotaBytes,
blockTtl = DefaultBlockTtl
quotaMaxBytes = DefaultQuotaBytes
): RepoStore =
## Create new instance of a RepoStore
##
RepoStore(
repoDs: repoDs,
metaDs: TypedDatastore.init(metaDs),
clock: clock,
postFixLen: postFixLen,
quotaMaxBytes: quotaMaxBytes,
blockTtl: blockTtl
quotaMaxBytes: quotaMaxBytes
)

View File

@ -10,6 +10,7 @@ import pkg/codex/merkletree
import pkg/codex/blockexchange
import pkg/codex/rng
import pkg/codex/utils
import pkg/codex/units
import ./helpers/nodeutils
import ./helpers/randomchunker
@ -106,6 +107,10 @@ proc storeDataGetManifest*(store: BlockStore, chunker: Chunker): Future[Manifest
return manifest
proc storeDataGetManifest*(store: BlockStore, blocksCount = 10): Future[Manifest] {.async.} =
let chunker = RandomChunker.new(rng = Rng.instance(), size = blocksCount.KiBs, chunkSize = 1.KiBs)
await storeDataGetManifest(store, chunker)
proc makeRandomBlocks*(
datasetSize: int, blockSize: NBytes): Future[seq[Block]] {.async.} =

View File

@ -23,7 +23,6 @@ type
getBeMaxNumber*: int
getBeOffset*: int
testBlockExpirations*: seq[BlockExpiration]
getBlockExpirationsThrows*: bool
method delBlock*(self: MockRepoStore, cid: Cid): Future[?!void] {.async.} =

View File

@ -75,6 +75,7 @@ template setupAndTearDown*() {.dirty.} =
localStore: RepoStore
localStoreRepoDs: DataStore
localStoreMetaDs: DataStore
maintenance: DatasetMaintainer
engine: BlockExcEngine
store: NetworkStore
node: CodexNodeRef
@ -99,8 +100,9 @@ template setupAndTearDown*() {.dirty.} =
clock = SystemClock.new()
localStoreMetaDs = metaTmp.newDb()
localStoreRepoDs = repoTmp.newDb()
localStore = RepoStore.new(localStoreRepoDs, localStoreMetaDs, clock = clock)
localStore = RepoStore.new(localStoreRepoDs, localStoreMetaDs)
await localStore.start()
maintenance = DatasetMaintainer.new(localStore, localStoreMetaDs)
blockDiscovery = Discovery.new(
switch.peerInfo.privateKey,
@ -115,6 +117,7 @@ template setupAndTearDown*() {.dirty.} =
node = CodexNodeRef.new(
switch = switch,
networkStore = store,
maintenance = maintenance,
engine = engine,
prover = Prover.none,
discovery = blockDiscovery,

View File

@ -101,23 +101,6 @@ asyncchecksuite "Test Node - Host contracts":
test "onExpiryUpdate callback is set":
check sales.onExpiryUpdate.isSome
test "onExpiryUpdate callback":
let
# The blocks have set default TTL, so in order to update it we have to have larger TTL
expectedExpiry: SecondsSince1970 = clock.now + DefaultBlockTtl.seconds + 11123
expiryUpdateCallback = !sales.onExpiryUpdate
(await expiryUpdateCallback(manifestCidStr, expectedExpiry)).tryGet()
for index in 0..<manifest.blocksCount:
let
blk = (await localStore.getBlock(manifest.treeCid, index)).tryGet
key = (createBlockExpirationMetadataKey(blk.cid)).tryGet
bytes = (await localStoreMetaDs.get(key)).tryGet
blkMd = BlockMetadata.decode(bytes).tryGet
check blkMd.expiry == expectedExpiry
test "onStore callback is set":
check sales.onStore.isSome
@ -125,7 +108,7 @@ asyncchecksuite "Test Node - Host contracts":
let onStore = !sales.onStore
var request = StorageRequest.example
request.content.cid = $verifiableBlock.cid
request.expiry = (getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix.u256
request.expiry = (getTime() + 1.hours).toUnix.u256
var fetchedBytes: uint = 0
let onBlocks = proc(blocks: seq[bt.Block]): Future[?!void] {.async.} =
@ -134,16 +117,5 @@ asyncchecksuite "Test Node - Host contracts":
return success()
(await onStore(request, 1.u256, onBlocks)).tryGet()
check fetchedBytes == 12 * DefaultBlockSize.uint
let indexer = verifiable.protectedStrategy.init(
0, verifiable.numSlotBlocks() - 1, verifiable.numSlots)
for index in indexer.getIndicies(1):
let
blk = (await localStore.getBlock(verifiable.treeCid, index)).tryGet
key = (createBlockExpirationMetadataKey(blk.cid)).tryGet
bytes = (await localStoreMetaDs.get(key)).tryGet
blkMd = BlockMetadata.decode(bytes).tryGet
check blkMd.expiry == request.expiry.toSecondsSince1970

View File

@ -69,7 +69,7 @@ asyncchecksuite "Test Node - Basic":
# https://github.com/codex-storage/nim-codex/issues/699
let
cstore = CountingStore.new(engine, localStore)
node = CodexNodeRef.new(switch, cstore, engine, blockDiscovery)
node = CodexNodeRef.new(switch, cstore, maintenance, engine, blockDiscovery)
missingCid = Cid.init(
"zDvZRwzmCvtiyubW9AecnxgLnXK8GrBvpQJBDzToxmzDN6Nrc2CZ").get()

View File

@ -28,7 +28,6 @@ checksuite "Test coders":
proc rand(T: type BlockMetadata): T =
BlockMetadata(
expiry: rand(SecondsSince1970),
size: rand(NBytes),
refCount: rand(Natural)
)

View File

@ -7,182 +7,120 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/random
import pkg/chronos
import pkg/questionable/results
import pkg/codex/blocktype as bt
import pkg/codex/stores/repostore
import pkg/codex/blocktype
import pkg/codex/stores
import pkg/codex/clock
import pkg/codex/rng
import pkg/datastore
import ../../asynctest
import ../helpers
import ../helpers/mocktimer
import ../helpers/mockrepostore
import ../helpers/mockclock
import ../examples
import codex/stores/maintenance
checksuite "BlockMaintainer":
var mockRepoStore: MockRepoStore
var interval: Duration
var mockTimer: MockTimer
var mockClock: MockClock
asyncchecksuite "DatasetMaintainer":
var blockMaintainer: BlockMaintainer
var testBe1: BlockExpiration
var testBe2: BlockExpiration
var testBe3: BlockExpiration
proc createTestExpiration(expiry: SecondsSince1970): BlockExpiration =
BlockExpiration(
cid: bt.Block.example.cid,
expiry: expiry
)
var
clock: MockClock
timer: MockTimer
metaDs: Datastore
blockStore: BlockStore
maintenance: DatasetMaintainer
setup:
mockClock = MockClock.new()
mockClock.set(100)
clock = MockClock.new()
timer = MockTimer.new()
metaDs = SQLiteDatastore.new(Memory).tryGet()
blockStore = RepoStore.new(metaDs, metaDs)
maintenance = DatasetMaintainer.new(
blockStore = blockStore,
metaDs = metaDs,
timer = timer,
clock = clock,
defaultExpiry = 100.seconds,
interval = 10.seconds,
restartDelay = 10.seconds
)
testBe1 = createTestExpiration(200)
testBe2 = createTestExpiration(300)
testBe3 = createTestExpiration(400)
maintenance.start()
clock.set(0)
mockRepoStore = MockRepoStore.new()
mockRepoStore.testBlockExpirations.add(testBe1)
mockRepoStore.testBlockExpirations.add(testBe2)
mockRepoStore.testBlockExpirations.add(testBe3)
teardown:
await maintenance.stop()
interval = 1.days
mockTimer = MockTimer.new()
proc listStoredBlocks(manifest: Manifest): Future[seq[int]] {.async.} =
var indicies = newSeq[int]()
blockMaintainer = BlockMaintainer.new(
mockRepoStore,
interval,
numberOfBlocksPerInterval = 2,
mockTimer,
mockClock)
for i in 0..<manifest.blocksCount:
let address = BlockAddress.init(manifest.treeCid, i)
if (await address in blockStore):
indicies.add(i)
test "Start should start timer at provided interval":
blockMaintainer.start()
check mockTimer.startCalled == 1
check mockTimer.mockInterval == interval
indicies
test "Stop should stop timer":
await blockMaintainer.stop()
check mockTimer.stopCalled == 1
test "Should not delete dataset":
let manifest = await storeDataGetManifest(blockStore, blocksCount = 5)
(await maintenance.trackExpiry(manifest.treeCid, 100.SecondsSince1970, @[Cid.example])).tryGet()
test "Timer callback should call getBlockExpirations on RepoStore":
blockMaintainer.start()
await mockTimer.invokeCallback()
clock.advance(50)
await timer.invokeCallback()
await sleepAsync(1.seconds)
check:
mockRepoStore.getBeMaxNumber == 2
mockRepoStore.getBeOffset == 0
@[0, 1, 2, 3, 4] == await listStoredBlocks(manifest)
test "Timer callback should handle Catachable errors":
mockRepoStore.getBlockExpirationsThrows = true
blockMaintainer.start()
await mockTimer.invokeCallback()
test "Should delete expired dataset":
let manifest = await storeDataGetManifest(blockStore, blocksCount = 5)
(await maintenance.trackExpiry(manifest.treeCid, 100.SecondsSince1970, @[Cid.example])).tryGet()
test "Subsequent timer callback should call getBlockExpirations on RepoStore with offset":
blockMaintainer.start()
await mockTimer.invokeCallback()
await mockTimer.invokeCallback()
clock.advance(150)
await timer.invokeCallback()
await sleepAsync(1.seconds)
check:
mockRepoStore.getBeMaxNumber == 2
mockRepoStore.getBeOffset == 2
newSeq[int]() == await listStoredBlocks(manifest)
test "Timer callback should delete no blocks if none are expired":
blockMaintainer.start()
await mockTimer.invokeCallback()
test "Should not delete dataset with prolonged expiry":
let manifest = await storeDataGetManifest(blockStore, blocksCount = 5)
(await maintenance.trackExpiry(manifest.treeCid, 100.SecondsSince1970, @[Cid.example])).tryGet()
(await maintenance.ensureExpiry(manifest.treeCid, 200.SecondsSince1970)).tryGet()
clock.advance(150)
await timer.invokeCallback()
await sleepAsync(1.seconds)
check:
mockRepoStore.delBlockCids.len == 0
@[0, 1, 2, 3, 4] == await listStoredBlocks(manifest)
test "Timer callback should delete one block if it is expired":
mockClock.set(250)
blockMaintainer.start()
await mockTimer.invokeCallback()
test "Should delete dataset without prolonged expiry":
let manifest = await storeDataGetManifest(blockStore, blocksCount = 5)
(await maintenance.trackExpiry(manifest.treeCid, 100.SecondsSince1970, @[Cid.example])).tryGet()
(await maintenance.ensureExpiry(manifest.treeCid, 100.SecondsSince1970)).tryGet()
clock.advance(150)
await timer.invokeCallback()
await sleepAsync(1.seconds)
check:
mockRepoStore.delBlockCids == [testBe1.cid]
newSeq[int]() == await listStoredBlocks(manifest)
test "Timer callback should delete multiple blocks if they are expired":
mockClock.set(500)
blockMaintainer.start()
await mockTimer.invokeCallback()
test "Should find correct number of leaves/blocks":
let
storedLeavesCount = rand(10..1000)
manifest = await storeDataGetManifest(blockStore, blocksCount = storedLeavesCount)
let leavesCount = (await maintenance.findLeavesCount(manifest.treeCid)).tryGet()
check:
mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid]
test "After deleting a block, subsequent timer callback should decrease offset by the number of deleted blocks":
mockClock.set(250)
blockMaintainer.start()
await mockTimer.invokeCallback()
check mockRepoStore.delBlockCids == [testBe1.cid]
# Because one block was deleted, the offset used in the next call should be 2 minus 1.
await mockTimer.invokeCallback()
check:
mockRepoStore.getBeMaxNumber == 2
mockRepoStore.getBeOffset == 1
test "Should delete all blocks if expired, in two timer callbacks":
mockClock.set(500)
blockMaintainer.start()
await mockTimer.invokeCallback()
await mockTimer.invokeCallback()
check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid]
test "Iteration offset should loop":
blockMaintainer.start()
await mockTimer.invokeCallback()
check mockRepoStore.getBeOffset == 0
await mockTimer.invokeCallback()
check mockRepoStore.getBeOffset == 2
await mockTimer.invokeCallback()
check mockRepoStore.getBeOffset == 0
test "Should handle new blocks":
proc invokeTimerManyTimes(): Future[void] {.async.} =
for i in countup(0, 10):
await mockTimer.invokeCallback()
blockMaintainer.start()
await invokeTimerManyTimes()
# no blocks have expired
check mockRepoStore.delBlockCids == []
mockClock.set(250)
await invokeTimerManyTimes()
# one block has expired
check mockRepoStore.delBlockCids == [testBe1.cid]
# new blocks are added
let testBe4 = createTestExpiration(600)
let testBe5 = createTestExpiration(700)
mockRepoStore.testBlockExpirations.add(testBe4)
mockRepoStore.testBlockExpirations.add(testBe5)
mockClock.set(500)
await invokeTimerManyTimes()
# All blocks have expired
check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid]
mockClock.set(650)
await invokeTimerManyTimes()
# First new block has expired
check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid, testBe4.cid]
mockClock.set(750)
await invokeTimerManyTimes()
# Second new block has expired
check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid, testBe4.cid, testBe5.cid]
leavesCount == storedLeavesCount

View File

@ -14,12 +14,10 @@ import pkg/codex/stores/cachestore
import pkg/codex/chunker
import pkg/codex/stores
import pkg/codex/blocktype as bt
import pkg/codex/clock
import pkg/codex/utils/asynciter
import ../../asynctest
import ../helpers
import ../helpers/mockclock
import ../examples
import ./commonstoretests
@ -62,20 +60,13 @@ asyncchecksuite "RepoStore":
var
repoDs: Datastore
metaDs: Datastore
mockClock: MockClock
repo: RepoStore
let
now: SecondsSince1970 = 123
setup:
repoDs = SQLiteDatastore.new(Memory).tryGet()
metaDs = SQLiteDatastore.new(Memory).tryGet()
mockClock = MockClock.new()
mockClock.set(now)
repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = 200'nb)
repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb)
teardown:
(await repoDs.close()).tryGet
@ -179,165 +170,6 @@ asyncchecksuite "RepoStore":
repo.quotaUsedBytes == 0'nb
repo.quotaReservedBytes == 100'nb
proc getExpirations(): Future[seq[BlockExpiration]] {.async.} =
let iter = (await repo.getBlockExpirations(100, 0)).tryGet()
var res = newSeq[BlockExpiration]()
for fut in iter:
let be = await fut
res.add(be)
res
test "Should store block expiration timestamp":
let
duration = 10.seconds
blk = createTestBlock(100)
let
expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10)
(await repo.putBlock(blk, duration.some)).tryGet
let expirations = await getExpirations()
check:
expectedExpiration in expirations
test "Should store block with default expiration timestamp when not provided":
let
blk = createTestBlock(100)
let
expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + DefaultBlockTtl.seconds)
(await repo.putBlock(blk)).tryGet
let expirations = await getExpirations()
check:
expectedExpiration in expirations
test "Should refuse update expiry with negative timestamp":
let
blk = createTestBlock(100)
expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10)
(await repo.putBlock(blk, some 10.seconds)).tryGet
let expirations = await getExpirations()
check:
expectedExpiration in expirations
expect ValueError:
(await repo.ensureExpiry(blk.cid, -1)).tryGet
expect ValueError:
(await repo.ensureExpiry(blk.cid, 0)).tryGet
test "Should fail when updating expiry of non-existing block":
let
blk = createTestBlock(100)
expect BlockNotFoundError:
(await repo.ensureExpiry(blk.cid, 10)).tryGet
test "Should update block expiration timestamp when new expiration is farther":
let
blk = createTestBlock(100)
expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10)
updatedExpectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 20)
(await repo.putBlock(blk, some 10.seconds)).tryGet
let expirations = await getExpirations()
check:
expectedExpiration in expirations
(await repo.ensureExpiry(blk.cid, now + 20)).tryGet
let updatedExpirations = await getExpirations()
check:
expectedExpiration notin updatedExpirations
updatedExpectedExpiration in updatedExpirations
test "Should not update block expiration timestamp when current expiration is farther then new one":
let
blk = createTestBlock(100)
expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10)
updatedExpectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 5)
(await repo.putBlock(blk, some 10.seconds)).tryGet
let expirations = await getExpirations()
check:
expectedExpiration in expirations
(await repo.ensureExpiry(blk.cid, now + 5)).tryGet
let updatedExpirations = await getExpirations()
check:
expectedExpiration in updatedExpirations
updatedExpectedExpiration notin updatedExpirations
test "delBlock should remove expiration metadata":
let
blk = createTestBlock(100)
expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet
(await repo.putBlock(blk, 10.seconds.some)).tryGet
(await repo.delBlock(blk.cid)).tryGet
let expirations = await getExpirations()
check:
expirations.len == 0
test "Should retrieve block expiration information":
proc unpack(beIter: Future[?!AsyncIter[BlockExpiration]]): Future[seq[BlockExpiration]] {.async.} =
var expirations = newSeq[BlockExpiration](0)
without iter =? (await beIter), err:
return expirations
for beFut in toSeq(iter):
let value = await beFut
expirations.add(value)
return expirations
let
duration = 10.seconds
blk1 = createTestBlock(10)
blk2 = createTestBlock(11)
blk3 = createTestBlock(12)
let
expectedExpiration: SecondsSince1970 = now + 10
proc assertExpiration(be: BlockExpiration, expectedBlock: bt.Block) =
check:
be.cid == expectedBlock.cid
be.expiry == expectedExpiration
(await repo.putBlock(blk1, duration.some)).tryGet
(await repo.putBlock(blk2, duration.some)).tryGet
(await repo.putBlock(blk3, duration.some)).tryGet
let
blockExpirations1 = await unpack(repo.getBlockExpirations(maxNumber=2, offset=0))
blockExpirations2 = await unpack(repo.getBlockExpirations(maxNumber=2, offset=2))
check blockExpirations1.len == 2
assertExpiration(blockExpirations1[0], blk2)
assertExpiration(blockExpirations1[1], blk1)
check blockExpirations2.len == 1
assertExpiration(blockExpirations2[0], blk3)
test "should put empty blocks":
let blk = Cid.example.emptyBlock.tryGet()
check (await repo.putBlock(blk)).isOk
@ -365,8 +197,7 @@ commonBlockStoreTests(
BlockStore(
RepoStore.new(
SQLiteDatastore.new(Memory).tryGet(),
SQLiteDatastore.new(Memory).tryGet(),
clock = MockClock.new())))
SQLiteDatastore.new(Memory).tryGet())))
const
path = currentSourcePath().parentDir / "test"
@ -385,7 +216,6 @@ commonBlockStoreTests(
BlockStore(
RepoStore.new(
FSDatastore.new(path, depth).tryGet(),
SQLiteDatastore.new(Memory).tryGet(),
clock = MockClock.new())),
SQLiteDatastore.new(Memory).tryGet())),
before = before,
after = after)

View File

@ -22,7 +22,7 @@ ethersuite "Node block expiration tests":
dataDir.removeDir()
proc startTestNode(blockTtlSeconds: int) =
proc startTestNode(ttlSeconds: int) =
node = startNode([
"--api-port=8080",
"--data-dir=" & dataDir,
@ -30,9 +30,8 @@ ethersuite "Node block expiration tests":
"--listen-addrs=/ip4/127.0.0.1/tcp/0",
"--disc-ip=127.0.0.1",
"--disc-port=8090",
"--block-ttl=" & $blockTtlSeconds,
"--block-mi=1",
"--block-mn=10"
"--default-ttl=" & $ttlSeconds,
"--maintenance-interval=1"
], debug = false)
node.waitUntilStarted()
@ -61,7 +60,7 @@ ethersuite "Node block expiration tests":
content.code == Http200
test "node retains not-expired file":
startTestNode(blockTtlSeconds = 10)
startTestNode(ttlSeconds = 10)
let contentId = uploadTestFile()
@ -74,10 +73,15 @@ ethersuite "Node block expiration tests":
response.body == content
test "node deletes expired file":
startTestNode(blockTtlSeconds = 1)
startTestNode(ttlSeconds = 2)
let contentId = uploadTestFile()
await sleepAsync(1.seconds)
# check:
# hasFile(contentId)
await sleepAsync(3.seconds)
check: