Expiry per dataset

This commit is contained in:
Tomasz Bekas 2024-06-05 19:09:04 +02:00
parent 8138ef5afd
commit 0ab4b1cf9e
22 changed files with 473 additions and 648 deletions

View File

@ -88,12 +88,9 @@ The following options are available:
--api-bindaddr The REST API bind address [=127.0.0.1]. --api-bindaddr The REST API bind address [=127.0.0.1].
-p, --api-port The REST Api port [=8080]. -p, --api-port The REST Api port [=8080].
--repo-kind Backend for main repo store (fs, sqlite) [=fs]. --repo-kind Backend for main repo store (fs, sqlite) [=fs].
-q, --storage-quota The size of the total storage quota dedicated to the node [=8589934592]. -q, --storage-quota The size in bytes of the total storage quota dedicated to the node [=8589934592].
-t, --block-ttl Default block timeout in seconds - 0 disables the ttl [=$DefaultBlockTtl]. -t, --default-ttl Default dataset expiry [=1d].
--block-mi Time interval in seconds - determines frequency of block maintenance cycle: how --maintenance-interval Determines frequently datasets are checked for expiration and cleanup [=5m].
often blocks are checked for expiration and cleanup
[=$DefaultBlockMaintenanceInterval].
--block-mn Number of blocks to check every maintenance cycle [=1000].
-c, --cache-size The size of the block cache, 0 disables the cache - might help on slow hardrives -c, --cache-size The size of the block cache, 0 disables the cache - might help on slow hardrives
[=0]. [=0].

View File

@ -54,7 +54,7 @@ type
restServer: RestServerRef restServer: RestServerRef
codexNode: CodexNodeRef codexNode: CodexNodeRef
repoStore: RepoStore repoStore: RepoStore
maintenance: BlockMaintainer maintenance: DatasetMaintainer
taskpool: Taskpool taskpool: Taskpool
CodexPrivateKey* = libp2p.PrivateKey # alias CodexPrivateKey* = libp2p.PrivateKey # alias
@ -246,6 +246,9 @@ proc new*(
wallet = WalletRef.new(EthPrivateKey.random()) wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch) network = BlockExcNetwork.new(switch)
metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace)
.expect("Should create metadata store!")
repoData = case config.repoKind repoData = case config.repoKind
of repoFS: Datastore(FSDatastore.new($config.dataDir, depth = 5) of repoFS: Datastore(FSDatastore.new($config.dataDir, depth = 5)
.expect("Should create repo file data store!")) .expect("Should create repo file data store!"))
@ -256,21 +259,21 @@ proc new*(
repoStore = RepoStore.new( repoStore = RepoStore.new(
repoDs = repoData, repoDs = repoData,
metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace) metaDs = metaDs,
.expect("Should create metadata store!"), quotaMaxBytes = config.storageQuota)
quotaMaxBytes = config.storageQuota,
blockTtl = config.blockTtl)
maintenance = BlockMaintainer.new( maintenance = DatasetMaintainer.new(
repoStore, repoStore,
interval = config.blockMaintenanceInterval, metaDs,
numberOfBlocksPerInterval = config.blockMaintenanceNumberOfBlocks) defaultExpiry = config.defaultExpiry,
interval = config.maintenanceInterval)
peerStore = PeerCtxStore.new() peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new() pendingBlocks = PendingBlocksManager.new()
blockDiscovery = DiscoveryEngine.new(repoStore, peerStore, network, discovery, pendingBlocks) blockDiscovery = DiscoveryEngine.new(repoStore, peerStore, network, discovery, pendingBlocks)
engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, peerStore, pendingBlocks) engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, peerStore, pendingBlocks)
store = NetworkStore.new(engine, repoStore) store = NetworkStore.new(engine, repoStore)
prover = if config.prover: prover = if config.prover:
if not fileAccessible($config.circomR1cs, {AccessFlags.Read}) and if not fileAccessible($config.circomR1cs, {AccessFlags.Read}) and
endsWith($config.circomR1cs, ".r1cs"): endsWith($config.circomR1cs, ".r1cs"):
@ -306,6 +309,7 @@ proc new*(
codexNode = CodexNodeRef.new( codexNode = CodexNodeRef.new(
switch = switch, switch = switch,
networkStore = store, networkStore = store,
maintenance = maintenance,
engine = engine, engine = engine,
prover = prover, prover = prover,
discovery = discovery, discovery = discovery,

View File

@ -42,9 +42,8 @@ export units, net, codextypes, logutils
export export
DefaultQuotaBytes, DefaultQuotaBytes,
DefaultBlockTtl, DefaultDefaultExpiry,
DefaultBlockMaintenanceInterval, DefaultMaintenanceInterval
DefaultNumberOfBlocksToMaintainPerInterval
proc defaultDataDir*(): string = proc defaultDataDir*(): string =
let dataDir = when defined(windows): let dataDir = when defined(windows):
@ -209,24 +208,18 @@ type
name: "storage-quota" name: "storage-quota"
abbr: "q" }: NBytes abbr: "q" }: NBytes
blockTtl* {. defaultExpiry* {.
desc: "Default block timeout in seconds - 0 disables the ttl" desc: "Default dataset expiry in seconds"
defaultValue: DefaultBlockTtl defaultValue: DefaultDefaultExpiry
defaultValueDesc: $DefaultBlockTtl defaultValueDesc: $DefaultDefaultExpiry
name: "block-ttl" name: "default-ttl"
abbr: "t" }: Duration abbr: "t" }: Duration
blockMaintenanceInterval* {. maintenanceInterval* {.
desc: "Time interval in seconds - determines frequency of block maintenance cycle: how often blocks are checked for expiration and cleanup" desc: "Maintenance interval in seconds - determines frequency of maintenance cycle: how often datasets are checked for expiration and cleanup"
defaultValue: DefaultBlockMaintenanceInterval defaultValue: DefaultMaintenanceInterval
defaultValueDesc: $DefaultBlockMaintenanceInterval defaultValueDesc: $DefaultMaintenanceInterval
name: "block-mi" }: Duration name: "maintenance-interval" }: Duration
blockMaintenanceNumberOfBlocks* {.
desc: "Number of blocks to check every maintenance cycle"
defaultValue: DefaultNumberOfBlocksToMaintainPerInterval
defaultValueDesc: $DefaultNumberOfBlocksToMaintainPerInterval
name: "block-mn" }: int
cacheSize* {. cacheSize* {.
desc: "The size of the block cache, 0 disables the cache - might help on slow hardrives" desc: "The size of the block cache, 0 disables the cache - might help on slow hardrives"

View File

@ -18,6 +18,8 @@ const
CodexMetaNamespace & "/ttl" CodexMetaNamespace & "/ttl"
CodexBlockProofNamespace* = # Cid and Proof CodexBlockProofNamespace* = # Cid and Proof
CodexMetaNamespace & "/proof" CodexMetaNamespace & "/proof"
CodexDatasetMetadataNamespace* = # Dataset
CodexMetaNamespace & "/dataset"
CodexDhtNamespace* = "dht" # Dht namespace CodexDhtNamespace* = "dht" # Dht namespace
CodexDhtProvidersNamespace* = # Dht providers namespace CodexDhtProvidersNamespace* = # Dht providers namespace
CodexDhtNamespace & "/providers" CodexDhtNamespace & "/providers"

View File

@ -65,6 +65,7 @@ type
switch: Switch switch: Switch
networkId: PeerId networkId: PeerId
networkStore: NetworkStore networkStore: NetworkStore
maintenance: DatasetMaintainer
engine: BlockExcEngine engine: BlockExcEngine
prover: ?Prover prover: ?Prover
discovery: Discovery discovery: Discovery
@ -155,17 +156,8 @@ proc updateExpiry*(
trace "Unable to fetch manifest for cid", manifestCid trace "Unable to fetch manifest for cid", manifestCid
return failure(error) return failure(error)
try: await self.maintenance.ensureExpiry(manifest.treeCid, expiry)
let
ensuringFutures = Iter[int].new(0..<manifest.blocksCount)
.mapIt(self.networkStore.localStore.ensureExpiry( manifest.treeCid, it, expiry ))
await allFuturesThrowing(ensuringFutures)
except CancelledError as exc:
raise exc
except CatchableError as exc:
return failure(exc.msg)
return success()
proc fetchBatched*( proc fetchBatched*(
self: CodexNodeRef, self: CodexNodeRef,
@ -240,7 +232,7 @@ proc streamEntireDataset(
self: CodexNodeRef, self: CodexNodeRef,
manifest: Manifest, manifest: Manifest,
manifestCid: Cid, manifestCid: Cid,
): ?!LPStream = ): Future[?!LPStream] {.async.} =
## Streams the contents of the entire dataset described by the manifest. ## Streams the contents of the entire dataset described by the manifest.
## ##
trace "Retrieving blocks from manifest", manifestCid trace "Retrieving blocks from manifest", manifestCid
@ -265,6 +257,13 @@ proc streamEntireDataset(
# Retrieve all blocks of the dataset sequentially from the local store or network # Retrieve all blocks of the dataset sequentially from the local store or network
trace "Creating store stream for manifest", manifestCid trace "Creating store stream for manifest", manifestCid
if err =? (await self.maintenance.trackExpiry(
manifest.treeCid,
manifest.blocksCount,
manifestsCids = @[manifestCid])).errorOption:
return failure(err)
LPStream(StoreStream.new(self.networkStore, manifest, pad = false)).success LPStream(StoreStream.new(self.networkStore, manifest, pad = false)).success
proc retrieve*( proc retrieve*(
@ -283,7 +282,7 @@ proc retrieve*(
return await self.streamSingleBlock(cid) return await self.streamSingleBlock(cid)
self.streamEntireDataset(manifest, cid) await self.streamEntireDataset(manifest, cid)
proc store*( proc store*(
self: CodexNodeRef, self: CodexNodeRef,
@ -352,6 +351,12 @@ proc store*(
error "Unable to store manifest" error "Unable to store manifest"
return failure(err) return failure(err)
if err =? (await self.maintenance.trackExpiry(
treeCid,
manifest.blocksCount,
manifestsCids = @[manifestBlk.cid])).errorOption:
return failure(err)
info "Stored data", manifestCid = manifestBlk.cid, info "Stored data", manifestCid = manifestBlk.cid,
treeCid = treeCid, treeCid = treeCid,
blocks = manifest.blocksCount, blocks = manifest.blocksCount,
@ -548,19 +553,18 @@ proc onStore(
trace "Slot index not in manifest", slotIdx trace "Slot index not in manifest", slotIdx
return failure(newException(CodexError, "Slot index not in manifest")) return failure(newException(CodexError, "Slot index not in manifest"))
proc updateExpiry(blocks: seq[bt.Block]): Future[?!void] {.async.} = proc onBatch(blocks: seq[bt.Block]): Future[?!void] {.async.} =
trace "Updating expiry for blocks", blocks = blocks.len if not blocksCb.isNil:
await blocksCb(blocks)
else:
success()
let ensureExpiryFutures = blocks.mapIt(self.networkStore.ensureExpiry(it.cid, expiry)) if err =? (await self.maintenance.trackExpiry(
if updateExpiryErr =? (await allFutureResult(ensureExpiryFutures)).errorOption: manifest.treeCid,
return failure(updateExpiryErr) manifest.blocksCount,
manifestsCids = @[cid])).errorOption:
if not blocksCb.isNil and err =? (await blocksCb(blocks)).errorOption:
trace "Unable to process blocks", err = err.msg
return failure(err) return failure(err)
return success()
without indexer =? manifest.verifiableStrategy.init( without indexer =? manifest.verifiableStrategy.init(
0, manifest.blocksCount - 1, manifest.numSlots).catch, err: 0, manifest.blocksCount - 1, manifest.numSlots).catch, err:
trace "Unable to create indexing strategy from protected manifest", err = err.msg trace "Unable to create indexing strategy from protected manifest", err = err.msg
@ -573,7 +577,7 @@ proc onStore(
if err =? (await self.fetchBatched( if err =? (await self.fetchBatched(
manifest.treeCid, manifest.treeCid,
blksIter, blksIter,
onBatch = updateExpiry)).errorOption: onBatch = onBatch)).errorOption:
trace "Unable to fetch blocks", err = err.msg trace "Unable to fetch blocks", err = err.msg
return failure(err) return failure(err)
@ -589,6 +593,11 @@ proc onStore(
trace "Slot successfully retrieved and reconstructed" trace "Slot successfully retrieved and reconstructed"
if err =? (await self.maintenance.ensureExpiry(
manifest.treeCid,
expiry)).errorOption:
return failure(err)
return success() return success()
proc onProve( proc onProve(
@ -753,6 +762,7 @@ proc new*(
T: type CodexNodeRef, T: type CodexNodeRef,
switch: Switch, switch: Switch,
networkStore: NetworkStore, networkStore: NetworkStore,
maintenance: DatasetMaintainer,
engine: BlockExcEngine, engine: BlockExcEngine,
discovery: Discovery, discovery: Discovery,
prover = Prover.none, prover = Prover.none,
@ -764,6 +774,7 @@ proc new*(
CodexNodeRef( CodexNodeRef(
switch: switch, switch: switch,
networkStore: networkStore, networkStore: networkStore,
maintenance: maintenance,
engine: engine, engine: engine,
prover: prover, prover: prover,
discovery: discovery, discovery: discovery,

View File

@ -62,8 +62,8 @@ method getBlockAndProof*(self: BlockStore, treeCid: Cid, index: Natural): Future
method putBlock*( method putBlock*(
self: BlockStore, self: BlockStore,
blk: Block, blk: Block
ttl = Duration.none): Future[?!void] {.base.} = ): Future[?!void] {.base.} =
## Put a block to the blockstore ## Put a block to the blockstore
## ##
@ -89,27 +89,6 @@ method getCidAndProof*(
raiseAssert("getCidAndProof not implemented!") raiseAssert("getCidAndProof not implemented!")
method ensureExpiry*(
self: BlockStore,
cid: Cid,
expiry: SecondsSince1970): Future[?!void] {.base.} =
## Ensure that block's assosicated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
raiseAssert("Not implemented!")
method ensureExpiry*(
self: BlockStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970): Future[?!void] {.base.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
raiseAssert("Not implemented!")
method delBlock*(self: BlockStore, cid: Cid): Future[?!void] {.base.} = method delBlock*(self: BlockStore, cid: Cid): Future[?!void] {.base.} =
## Delete a block from the blockstore ## Delete a block from the blockstore
## ##

View File

@ -186,8 +186,7 @@ func putBlockSync(self: CacheStore, blk: Block): bool =
method putBlock*( method putBlock*(
self: CacheStore, self: CacheStore,
blk: Block, blk: Block): Future[?!void] {.async.} =
ttl = Duration.none): Future[?!void] {.async.} =
## Put a block to the blockstore ## Put a block to the blockstore
## ##
@ -209,27 +208,6 @@ method putCidAndProof*(
self.cidAndProofCache[(treeCid, index)] = (blockCid, proof) self.cidAndProofCache[(treeCid, index)] = (blockCid, proof)
success() success()
method ensureExpiry*(
self: CacheStore,
cid: Cid,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Updates block's assosicated TTL in store - not applicable for CacheStore
##
discard # CacheStore does not have notion of TTL
method ensureExpiry*(
self: CacheStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Updates block's associated TTL in store - not applicable for CacheStore
##
discard # CacheStore does not have notion of TTL
method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} = method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} =
## Delete a block from the blockstore ## Delete a block from the blockstore
## ##

View File

@ -25,6 +25,7 @@ const
CodexManifestKey* = Key.init(CodexManifestNamespace).tryGet CodexManifestKey* = Key.init(CodexManifestNamespace).tryGet
BlocksTtlKey* = Key.init(CodexBlocksTtlNamespace).tryGet BlocksTtlKey* = Key.init(CodexBlocksTtlNamespace).tryGet
BlockProofKey* = Key.init(CodexBlockProofNamespace).tryGet BlockProofKey* = Key.init(CodexBlockProofNamespace).tryGet
DatasetMetadataKey* = Key.init(CodexDatasetMetadataNamespace).tryGet
QuotaKey* = Key.init(CodexQuotaNamespace).tryGet QuotaKey* = Key.init(CodexQuotaNamespace).tryGet
QuotaUsedKey* = (QuotaKey / "used").tryGet QuotaUsedKey* = (QuotaKey / "used").tryGet
QuotaReservedKey* = (QuotaKey / "reserved").tryGet QuotaReservedKey* = (QuotaKey / "reserved").tryGet
@ -47,3 +48,10 @@ proc createBlockExpirationMetadataQueryKey*(): ?!Key =
proc createBlockCidAndProofMetadataKey*(treeCid: Cid, index: Natural): ?!Key = proc createBlockCidAndProofMetadataKey*(treeCid: Cid, index: Natural): ?!Key =
(BlockProofKey / $treeCid).flatMap((k: Key) => k / $index) (BlockProofKey / $treeCid).flatMap((k: Key) => k / $index)
proc createDatasetMetadataKey*(treeCid: Cid): ?!Key =
DatasetMetadataKey / $treeCid
proc createDatasetMetadataQueryKey*(): ?!Key =
let queryString = ? (DatasetMetadataKey / "*")
Key.init(queryString)

View File

@ -10,92 +10,323 @@
## Store maintenance module ## Store maintenance module
## Looks for and removes expired blocks from blockstores. ## Looks for and removes expired blocks from blockstores.
import std/sequtils
import pkg/chronos import pkg/chronos
import pkg/chronicles
import pkg/libp2p/cid
import pkg/serde/json
import pkg/datastore
import pkg/datastore/typedds
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import ./repostore import ./blockstore
import ./keyutils
import ./queryiterhelper
import ../utils/timer import ../utils/timer
import ../utils/asynciter import ../utils/asynciter
import ../utils/json
import ../utils/trackedfutures
import ../clock import ../clock
import ../logutils import ../logutils
import ../systemclock import ../systemclock
logScope:
topics = "codex maintenance"
const const
DefaultBlockMaintenanceInterval* = 10.minutes DefaultDefaultExpiry* = 24.hours
DefaultNumberOfBlocksToMaintainPerInterval* = 1000 DefaultMaintenanceInterval* = 5.minutes
DefaultBatchSize* = 1000
# if no progress was observed for this amount of time
# we're going to retry deletion of the dataset
DefaultRetryDelay* = 15.minutes
type type
BlockMaintainer* = ref object of RootObj DatasetMaintainer* = ref object of RootObj
repoStore: RepoStore blockStore: BlockStore
metaDs: TypedDatastore
interval: Duration interval: Duration
defaultExpiry: Duration
batchSize: int
retryDelay: Duration
timer: Timer timer: Timer
clock: Clock clock: Clock
numberOfBlocksPerInterval: int trackedFutures: TrackedFutures
offset: int
Checkpoint* {.serialize.} = object
timestamp*: SecondsSince1970
progress*: Natural
DatasetMetadata* {.serialize.} = object
expiry*: SecondsSince1970
leavesCount*: Natural
manifestsCids*: seq[Cid]
checkpoint*: Checkpoint
MissingKey* = object of CodexError
proc new*( proc new*(
T: type BlockMaintainer, T: type DatasetMaintainer,
repoStore: RepoStore, blockStore: BlockStore,
interval: Duration, metaDs: Datastore,
numberOfBlocksPerInterval = 100, defaultExpiry = DefaultDefaultExpiry,
interval = DefaultMaintenanceInterval,
batchSize = DefaultBatchSize,
retryDelay = DefaultRetryDelay,
timer = Timer.new(), timer = Timer.new(),
clock: Clock = SystemClock.new() clock: Clock = SystemClock.new(),
): BlockMaintainer = trackedFutures = TrackedFutures.new()
## Create new BlockMaintainer instance ): DatasetMaintainer =
## Create new DatasetMaintainer instance
## ##
## Call `start` to begin looking for for expired blocks ## Call `start` to begin looking for for expired blocks
## ##
BlockMaintainer( DatasetMaintainer(
repoStore: repoStore, blockStore: blockStore,
metaDs: TypedDatastore.init(metaDs),
defaultExpiry: defaultExpiry,
interval: interval, interval: interval,
numberOfBlocksPerInterval: numberOfBlocksPerInterval, batchSize: batchSize,
retryDelay: retryDelay,
timer: timer, timer: timer,
clock: clock, clock: clock,
offset: 0) trackedFutures: trackedFutures)
proc deleteExpiredBlock(self: BlockMaintainer, cid: Cid): Future[void] {.async.} = proc encode(t: Checkpoint): seq[byte] = serializer.toJson(t).toBytes()
if isErr (await self.repoStore.delBlock(cid)): proc decode(T: type Checkpoint, bytes: seq[byte]): ?!T = T.fromJson(bytes)
trace "Unable to delete block from repoStore"
proc processBlockExpiration(self: BlockMaintainer, be: BlockExpiration): Future[void] {.async} = proc encode(t: DatasetMetadata): seq[byte] = serializer.toJson(t).toBytes()
if be.expiry < self.clock.now: proc decode(T: type DatasetMetadata, bytes: seq[byte]): ?!T = T.fromJson(bytes)
await self.deleteExpiredBlock(be.cid)
proc trackExpiry*(
self: DatasetMaintainer,
treeCid: Cid,
leavesCount: Natural,
expiry: SecondsSince1970,
manifestsCids: seq[Cid] = @[]
): Future[?!void] {.async.} =
# Starts tracking expiry of a given dataset
#
trace "Tracking an expiry of a dataset", treeCid, leavesCount, expiry
without key =? createDatasetMetadataKey(treeCid), err:
return failure(err)
await modify[DatasetMetadata](self.metaDs, key,
proc (maybeCurrDatasetMd: ?DatasetMetadata): Future[?DatasetMetadata] {.async.} =
var md: DatasetMetadata
if currDatasetMd =? maybeCurrDatasetMd:
md.expiry = max(currDatasetMd.expiry, expiry)
if currDatasetMd.leavesCount != leavesCount:
raise newException(CatchableError, "DatasetMetadata for treeCid " & $treeCid & " is already stored with leavesCount " &
$currDatasetMd.leavesCount & ", cannot override it with leavesCount " & $leavesCount)
md.leavesCount = leavesCount
md.manifestsCids = (currDatasetMd.manifestsCids & manifestsCids).deduplicate
md.checkpoint = Checkpoint(progress: 0, timestamp: 0)
else: else:
inc self.offset md.expiry = expiry
md.leavesCount = leavesCount
md.manifestsCids = manifestsCids
md.checkpoint = Checkpoint(progress: 0, timestamp: 0)
proc runBlockCheck(self: BlockMaintainer): Future[void] {.async.} = md.some
let expirations = await self.repoStore.getBlockExpirations(
maxNumber = self.numberOfBlocksPerInterval,
offset = self.offset
) )
without iter =? expirations, err: proc trackExpiry*(
trace "Unable to obtain blockExpirations iterator from repoStore" self: DatasetMaintainer,
return treeCid: Cid,
leavesCount: Natural,
manifestsCids: seq[Cid] = @[]
): Future[?!void] {.async.} =
await self.trackExpiry(treeCid, leavesCount, self.clock.now + self.defaultExpiry.seconds, manifestsCids)
var numberReceived = 0 proc trackExpiry*(
for beFut in iter: self: DatasetMaintainer,
let be = await beFut cid: Cid,
inc numberReceived manifestsCids: seq[Cid] = @[]
await self.processBlockExpiration(be) ): Future[?!void] {.async.} =
await sleepAsync(1.millis) # cooperative scheduling await self.trackExpiry(cid, 0, self.clock.now + self.defaultExpiry.seconds, manifestsCids)
# If we received fewer blockExpirations from the iterator than we asked for, proc ensureExpiry*(
# We're at the end of the dataset and should start from 0 next time. self: DatasetMaintainer,
if numberReceived < self.numberOfBlocksPerInterval: treeCid: Cid,
self.offset = 0 minExpiry: SecondsSince1970): Future[?!void] {.async.} =
## Sets the dataset expiry to a max of two values: current expiry and `minExpiry`,
## if a dataset for given `treeCid` is not currently tracked a CatchableError is thrown
##
proc start*(self: BlockMaintainer) = trace "Updating a dataset expiry", treeCid, minExpiry
without key =? createDatasetMetadataKey(treeCid), err:
return failure(err)
await modify[DatasetMetadata](self.metaDs, key,
proc (maybeCurrDatasetMd: ?DatasetMetadata): Future[?DatasetMetadata] {.async.} =
if currDatasetMd =? maybeCurrDatasetMd:
let datasetMd = DatasetMetadata(
expiry: max(currDatasetMd.expiry, minExpiry),
leavesCount: currDatasetMd.leavesCount,
manifestsCids: currDatasetMd.manifestsCids,
checkpoint: currDatasetMd.checkpoint
)
return datasetMd.some
else:
raise newException(CatchableError, "DatasetMetadata for treeCid " & $treeCid & " not found")
)
proc recordCheckpoint*(self: DatasetMaintainer, treeCid: Cid, datasetMd: DatasetMetadata): Future[?!void] {.async.} =
# Saves progress or deletes dataset metadata if progress > leavesCount
#
without key =? createDatasetMetadataKey(treeCid), err:
return failure(err)
await self.metaDs.modify(key,
proc (maybeCurrDatasetMd: ?DatasetMetadata): Future[?DatasetMetadata] {.async.} =
if currDatasetMd =? maybeCurrDatasetMd:
if currDatasetMd.expiry != datasetMd.expiry or currDatasetMd.manifestsCids != datasetMd.manifestsCids:
raise newException(CatchableError, "Change in expiry detected, interrupting maintenance for dataset with treeCid " & $treeCid)
if currDatasetMd.checkpoint.progress > datasetMd.checkpoint.progress:
raise newException(CatchableError, "Progress should be increasing only, treeCid " & $treeCid)
if currDatasetMd.leavesCount <= datasetMd.checkpoint.progress:
DatasetMetadata.none
else:
datasetMd.some
else:
raise newException(CatchableError, "Metadata for dataset with treeCid " & $treeCid & " not found")
)
proc deleteBatch(self: DatasetMaintainer, treeCid: Cid, datasetMd: DatasetMetadata): Future[?!void] {.async.} =
var datasetMd = datasetMd
datasetMd.checkpoint.timestamp = self.clock.now
if err =? (await self.recordCheckpoint(treeCid, datasetMd)).errorOption:
return failure(err)
logScope:
treeCid = treeCid
manifestsCids = datasetMd.manifestsCids
startingIndex = datasetMd.checkpoint.progress
trace "Deleting a batch of blocks", size = self.batchSize
var index = datasetMd.checkpoint.progress
while (index < datasetMd.checkpoint.progress + self.batchSize) and
(index < datasetMd.leavesCount):
if err =? (await self.blockStore.delBlock(treeCid, index)).errorOption:
error "Error deleting a block", msg = err.msg, index
index.inc
await sleepAsync(50.millis) # cooperative scheduling
if index >= datasetMd.leavesCount:
trace "All blocks deleted from a dataset", leavesCount = datasetMd.leavesCount
for manifestCid in datasetMd.manifestsCids:
if err =? (await self.blockStore.delBlock(manifestCid)).errorOption:
error "Error deleting manifest", cid = manifestCid
if err =? (await self.blockStore.delBlock(treeCid)).errorOption:
error "Error deleting block", cid = treeCid
if err =? (await self.recordCheckpoint(treeCid, datasetMd)).errorOption:
return failure(err)
return success()
else:
datasetMd.checkpoint.progress = index
return await self.deleteBatch(treeCid, datasetMd)
proc superviseDatasetDeletion(self: DatasetMaintainer, treeCid: Cid, datasetMd: DatasetMetadata): Future[void] {.async.} =
logScope:
treeCid = treeCid
manifestsCids = datasetMd.manifestsCids
expiry = datasetMd.expiry
leavesCount = datasetMd.leavesCount
try:
if datasetMd.checkpoint.progress == 0 and datasetMd.checkpoint.timestamp == 0:
info "Initiating deletion of a dataset"
else:
info "Retrying deletion of a dataset", progress = datasetMd.checkpoint.progress, timestamp = datasetMd.checkpoint.timestamp
if err =? (await self.deleteBatch(treeCid, datasetMd)).errorOption:
error "Error occurred during deletion of a dataset", msg = err.msg
else:
info "Dataset deletion complete"
except CancelledError as err:
raise err
except CatchableError as err:
error "Unexpected error during dataset deletion", msg = err.msg, treeCid = treeCid
proc listDatasetMetadata*(
self: DatasetMaintainer
): Future[?!AsyncIter[(Cid, DatasetMetadata)]] {.async.} =
without queryKey =? createDatasetMetadataQueryKey(), err:
return failure(err)
without queryIter =? await query[DatasetMetadata](self.metaDs, Query.init(queryKey)), err:
error "Unable to execute block expirations query", err = err.msg
return failure(err)
without asyncQueryIter =? await queryIter.toAsyncIter(), err:
error "Unable to convert QueryIter to AsyncIter", err = err.msg
return failure(err)
let
filteredIter = await asyncQueryIter.filterSuccess()
datasetMdIter = await mapFilter[KeyVal[DatasetMetadata], (Cid, DatasetMetadata)](filteredIter,
proc (kv: KeyVal[DatasetMetadata]): Future[?(Cid, DatasetMetadata)] {.async.} =
without cid =? Cid.init(kv.key.value).mapFailure, err:
error "Failed decoding cid", err = err.msg
return (Cid, DatasetMetadata).none
(cid, kv.value).some
)
success(datasetMdIter)
proc checkDatasets(self: DatasetMaintainer): Future[?!void] {.async.} =
without iter =? await self.listDatasetMetadata(), err:
return failure(err)
for fut in iter:
let (treeCid, datasetMd) = await fut
if (datasetMd.expiry < self.clock.now) and
(datasetMd.checkpoint.timestamp + self.retryDelay.seconds < self.clock.now):
asyncSpawn self.superviseDatasetDeletion(treeCid, datasetMd).track(self)
else:
trace "Item either not expired or expired but already in maintenance", treeCid, expiry = datasetMd.expiry, timestamp = datasetMd.checkpoint.timestamp
success()
proc start*(self: DatasetMaintainer) =
proc onTimer(): Future[void] {.async.} = proc onTimer(): Future[void] {.async.} =
try: try:
await self.runBlockCheck() if err =? (await self.checkDatasets()).errorOption:
except CancelledError as error: error "Error when checking datasets", msg = err.msg
raise error except CancelledError as err:
except CatchableError as exc: raise err
error "Unexpected exception in BlockMaintainer.onTimer(): ", msg=exc.msg except CatchableError as err:
error "Error when checking datasets", msg = err.msg
if self.interval.seconds > 0:
self.timer.start(onTimer, self.interval) self.timer.start(onTimer, self.interval)
proc stop*(self: BlockMaintainer): Future[void] {.async.} = proc stop*(self: DatasetMaintainer): Future[void] {.async.} =
await self.timer.stop() await self.timer.stop()
await self.trackedFutures.cancelTracked()

View File

@ -61,11 +61,13 @@ method getBlock*(self: NetworkStore, treeCid: Cid, index: Natural): Future[?!Blo
method putBlock*( method putBlock*(
self: NetworkStore, self: NetworkStore,
blk: Block, blk: Block): Future[?!void] {.async.} =
ttl = Duration.none): Future[?!void] {.async.} =
## Store block locally and notify the network ## Store block locally and notify the network
## ##
let res = await self.localStore.putBlock(blk, ttl)
trace "Putting block into network store", cid = blk.cid
let res = await self.localStore.putBlock(blk)
if res.isErr: if res.isErr:
return res return res
@ -89,43 +91,6 @@ method getCidAndProof*(
self.localStore.getCidAndProof(treeCid, index) self.localStore.getCidAndProof(treeCid, index)
method ensureExpiry*(
self: NetworkStore,
cid: Cid,
expiry: SecondsSince1970): Future[?!void] {.async.} =
## Ensure that block's assosicated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
without blockCheck =? await self.localStore.hasBlock(cid), err:
return failure(err)
if blockCheck:
return await self.localStore.ensureExpiry(cid, expiry)
else:
trace "Updating expiry - block not in local store", cid
return success()
method ensureExpiry*(
self: NetworkStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970): Future[?!void] {.async.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
without blockCheck =? await self.localStore.hasBlock(treeCid, index), err:
return failure(err)
if blockCheck:
return await self.localStore.ensureExpiry(treeCid, index, expiry)
else:
trace "Updating expiry - block not in local store", treeCid, index
return success()
method listBlocks*( method listBlocks*(
self: NetworkStore, self: NetworkStore,
blockType = BlockType.Manifest): Future[?!AsyncIter[?Cid]] = blockType = BlockType.Manifest): Future[?!AsyncIter[?Cid]] =

View File

@ -109,8 +109,7 @@ proc updateBlockMetadata*(
self: RepoStore, self: RepoStore,
cid: Cid, cid: Cid,
plusRefCount: Natural = 0, plusRefCount: Natural = 0,
minusRefCount: Natural = 0, minusRefCount: Natural = 0
minExpiry: SecondsSince1970 = 0
): Future[?!void] {.async.} = ): Future[?!void] {.async.} =
if cid.isEmpty: if cid.isEmpty:
return success() return success()
@ -123,14 +122,13 @@ proc updateBlockMetadata*(
if currBlockMd =? maybeCurrBlockMd: if currBlockMd =? maybeCurrBlockMd:
BlockMetadata( BlockMetadata(
size: currBlockMd.size, size: currBlockMd.size,
expiry: max(currBlockMd.expiry, minExpiry),
refCount: currBlockMd.refCount + plusRefCount - minusRefCount refCount: currBlockMd.refCount + plusRefCount - minusRefCount
).some ).some
else: else:
raise newException(BlockNotFoundError, "Metadata for block with cid " & $cid & " not found") raise newException(BlockNotFoundError, "Metadata for block with cid " & $cid & " not found")
) )
proc storeBlock*(self: RepoStore, blk: Block, minExpiry: SecondsSince1970): Future[?!StoreResult] {.async.} = proc storeBlock*(self: RepoStore, blk: Block): Future[?!StoreResult] {.async.} =
if blk.isEmpty: if blk.isEmpty:
return success(StoreResult(kind: AlreadyInStore)) return success(StoreResult(kind: AlreadyInStore))
@ -148,7 +146,7 @@ proc storeBlock*(self: RepoStore, blk: Block, minExpiry: SecondsSince1970): Futu
if currMd =? maybeCurrMd: if currMd =? maybeCurrMd:
if currMd.size == blk.data.len.NBytes: if currMd.size == blk.data.len.NBytes:
md = BlockMetadata(size: currMd.size, expiry: max(currMd.expiry, minExpiry), refCount: currMd.refCount) md = BlockMetadata(size: currMd.size, refCount: currMd.refCount)
res = StoreResult(kind: AlreadyInStore) res = StoreResult(kind: AlreadyInStore)
# making sure that the block acutally is stored in the repoDs # making sure that the block acutally is stored in the repoDs
@ -162,7 +160,7 @@ proc storeBlock*(self: RepoStore, blk: Block, minExpiry: SecondsSince1970): Futu
else: else:
raise newException(CatchableError, "Repo already stores a block with the same cid but with a different size, cid: " & $blk.cid) raise newException(CatchableError, "Repo already stores a block with the same cid but with a different size, cid: " & $blk.cid)
else: else:
md = BlockMetadata(size: blk.data.len.NBytes, expiry: minExpiry, refCount: 0) md = BlockMetadata(size: blk.data.len.NBytes, refCount: 0)
res = StoreResult(kind: Stored, used: blk.data.len.NBytes) res = StoreResult(kind: Stored, used: blk.data.len.NBytes)
if err =? (await self.repoDs.put(blkKey, blk.data)).errorOption: if err =? (await self.repoDs.put(blkKey, blk.data)).errorOption:
raise err raise err
@ -170,7 +168,7 @@ proc storeBlock*(self: RepoStore, blk: Block, minExpiry: SecondsSince1970): Futu
(md.some, res) (md.some, res)
) )
proc tryDeleteBlock*(self: RepoStore, cid: Cid, expiryLimit = SecondsSince1970.low): Future[?!DeleteResult] {.async.} = proc tryDeleteBlock*(self: RepoStore, cid: Cid): Future[?!DeleteResult] {.async.} =
if cid.isEmpty: if cid.isEmpty:
return success(DeleteResult(kind: InUse)) return success(DeleteResult(kind: InUse))
@ -187,7 +185,7 @@ proc tryDeleteBlock*(self: RepoStore, cid: Cid, expiryLimit = SecondsSince1970.l
res: DeleteResult res: DeleteResult
if currMd =? maybeCurrMd: if currMd =? maybeCurrMd:
if currMd.refCount == 0 or currMd.expiry < expiryLimit: if currMd.refCount == 0:
maybeMeta = BlockMetadata.none maybeMeta = BlockMetadata.none
res = DeleteResult(kind: Deleted, released: currMd.size) res = DeleteResult(kind: Deleted, released: currMd.size)

View File

@ -85,35 +85,6 @@ method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] =
else: else:
self.getBlock(address.cid) self.getBlock(address.cid)
method ensureExpiry*(
self: RepoStore,
cid: Cid,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
if expiry <= 0:
return failure(newException(ValueError, "Expiry timestamp must be larger then zero"))
await self.updateBlockMetadata(cid, minExpiry = expiry)
method ensureExpiry*(
self: RepoStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##
without leafMd =? await self.getLeafMetadata(treeCid, index), err:
return failure(err)
await self.ensureExpiry(leafMd.blkCid, expiry)
method putCidAndProof*( method putCidAndProof*(
self: RepoStore, self: RepoStore,
treeCid: Cid, treeCid: Cid,
@ -166,17 +137,14 @@ method getCid*(
method putBlock*( method putBlock*(
self: RepoStore, self: RepoStore,
blk: Block, blk: Block): Future[?!void] {.async.} =
ttl = Duration.none): Future[?!void] {.async.} =
## Put a block to the blockstore ## Put a block to the blockstore
## ##
logScope: logScope:
cid = blk.cid cid = blk.cid
let expiry = self.clock.now() + (ttl |? self.blockTtl).seconds without res =? await self.storeBlock(blk), err:
without res =? await self.storeBlock(blk, expiry), err:
return failure(err) return failure(err)
if res.kind == Stored: if res.kind == Stored:
@ -195,7 +163,7 @@ method putBlock*(
return success() return success()
method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
## Delete a block from the blockstore when block refCount is 0 or block is expired ## Delete a block from the blockstore when block refCount is 0
## ##
logScope: logScope:
@ -203,7 +171,7 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
trace "Attempting to delete a block" trace "Attempting to delete a block"
without res =? await self.tryDeleteBlock(cid, self.clock.now()), err: without res =? await self.tryDeleteBlock(cid), err:
return failure(err) return failure(err)
if res.kind == Deleted: if res.kind == Deleted:
@ -214,7 +182,7 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
if err =? (await self.updateQuotaUsage(minusUsed = res.released)).errorOption: if err =? (await self.updateQuotaUsage(minusUsed = res.released)).errorOption:
return failure(err) return failure(err)
elif res.kind == InUse: elif res.kind == InUse:
trace "Block in use, refCount > 0 and not expired" trace "Block in use, refCount > 0"
else: else:
trace "Block not found in store" trace "Block not found in store"
@ -296,42 +264,6 @@ method listBlocks*(
iter.next = next iter.next = next
return success iter return success iter
proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query =
let queryKey = ? createBlockExpirationMetadataQueryKey()
success Query.init(queryKey, offset = offset, limit = maxNumber)
method getBlockExpirations*(
self: RepoStore,
maxNumber: int,
offset: int): Future[?!AsyncIter[BlockExpiration]] {.async, base.} =
## Get iterator with block expirations
##
without beQuery =? createBlockExpirationQuery(maxNumber, offset), err:
error "Unable to format block expirations query", err = err.msg
return failure(err)
without queryIter =? await query[BlockMetadata](self.metaDs, beQuery), err:
error "Unable to execute block expirations query", err = err.msg
return failure(err)
without asyncQueryIter =? await queryIter.toAsyncIter(), err:
error "Unable to convert QueryIter to AsyncIter", err = err.msg
return failure(err)
let
filteredIter = await asyncQueryIter.filterSuccess()
blockExpIter = await mapFilter[KeyVal[BlockMetadata], BlockExpiration](filteredIter,
proc (kv: KeyVal[BlockMetadata]): Future[?BlockExpiration] {.async.} =
without cid =? Cid.init(kv.key.value).mapFailure, err:
error "Failed decoding cid", err = err.msg
return BlockExpiration.none
BlockExpiration(cid: cid, expiry: kv.value.expiry).some
)
success(blockExpIter)
method close*(self: RepoStore): Future[void] {.async.} = method close*(self: RepoStore): Future[void] {.async.} =
## Close the blockstore, cleaning up resources managed by it. ## Close the blockstore, cleaning up resources managed by it.
## For some implementations this may be a no-op ## For some implementations this may be a no-op

View File

@ -30,11 +30,9 @@ type
postFixLen*: int postFixLen*: int
repoDs*: Datastore repoDs*: Datastore
metaDs*: TypedDatastore metaDs*: TypedDatastore
clock*: Clock
quotaMaxBytes*: NBytes quotaMaxBytes*: NBytes
quotaUsage*: QuotaUsage quotaUsage*: QuotaUsage
totalBlocks*: Natural totalBlocks*: Natural
blockTtl*: Duration
started*: bool started*: bool
QuotaUsage* {.serialize.} = object QuotaUsage* {.serialize.} = object
@ -42,7 +40,6 @@ type
reserved*: NBytes reserved*: NBytes
BlockMetadata* {.serialize.} = object BlockMetadata* {.serialize.} = object
expiry*: SecondsSince1970
size*: NBytes size*: NBytes
refCount*: Natural refCount*: Natural
@ -50,13 +47,9 @@ type
blkCid*: Cid blkCid*: Cid
proof*: CodexProof proof*: CodexProof
BlockExpiration* {.serialize.} = object
cid*: Cid
expiry*: SecondsSince1970
DeleteResultKind* {.serialize.} = enum DeleteResultKind* {.serialize.} = enum
Deleted = 0, # block removed from store Deleted = 0, # block removed from store
InUse = 1, # block not removed, refCount > 0 and not expired InUse = 1, # block not removed, refCount > 0
NotFound = 2 # block not found in store NotFound = 2 # block not found in store
DeleteResult* {.serialize.} = object DeleteResult* {.serialize.} = object
@ -90,18 +83,14 @@ func new*(
T: type RepoStore, T: type RepoStore,
repoDs: Datastore, repoDs: Datastore,
metaDs: Datastore, metaDs: Datastore,
clock: Clock = SystemClock.new(),
postFixLen = 2, postFixLen = 2,
quotaMaxBytes = DefaultQuotaBytes, quotaMaxBytes = DefaultQuotaBytes
blockTtl = DefaultBlockTtl
): RepoStore = ): RepoStore =
## Create new instance of a RepoStore ## Create new instance of a RepoStore
## ##
RepoStore( RepoStore(
repoDs: repoDs, repoDs: repoDs,
metaDs: TypedDatastore.init(metaDs), metaDs: TypedDatastore.init(metaDs),
clock: clock,
postFixLen: postFixLen, postFixLen: postFixLen,
quotaMaxBytes: quotaMaxBytes, quotaMaxBytes: quotaMaxBytes
blockTtl: blockTtl
) )

View File

@ -10,6 +10,7 @@ import pkg/codex/merkletree
import pkg/codex/blockexchange import pkg/codex/blockexchange
import pkg/codex/rng import pkg/codex/rng
import pkg/codex/utils import pkg/codex/utils
import pkg/codex/units
import ./helpers/nodeutils import ./helpers/nodeutils
import ./helpers/randomchunker import ./helpers/randomchunker
@ -106,6 +107,10 @@ proc storeDataGetManifest*(store: BlockStore, chunker: Chunker): Future[Manifest
return manifest return manifest
proc storeDataGetManifest*(store: BlockStore, blocksCount = 10): Future[Manifest] {.async.} =
let chunker = RandomChunker.new(rng = Rng.instance(), size = blocksCount.KiBs, chunkSize = 1.KiBs)
await storeDataGetManifest(store, chunker)
proc makeRandomBlocks*( proc makeRandomBlocks*(
datasetSize: int, blockSize: NBytes): Future[seq[Block]] {.async.} = datasetSize: int, blockSize: NBytes): Future[seq[Block]] {.async.} =

View File

@ -23,7 +23,6 @@ type
getBeMaxNumber*: int getBeMaxNumber*: int
getBeOffset*: int getBeOffset*: int
testBlockExpirations*: seq[BlockExpiration]
getBlockExpirationsThrows*: bool getBlockExpirationsThrows*: bool
method delBlock*(self: MockRepoStore, cid: Cid): Future[?!void] {.async.} = method delBlock*(self: MockRepoStore, cid: Cid): Future[?!void] {.async.} =

View File

@ -75,6 +75,7 @@ template setupAndTearDown*() {.dirty.} =
localStore: RepoStore localStore: RepoStore
localStoreRepoDs: DataStore localStoreRepoDs: DataStore
localStoreMetaDs: DataStore localStoreMetaDs: DataStore
maintenance: DatasetMaintainer
engine: BlockExcEngine engine: BlockExcEngine
store: NetworkStore store: NetworkStore
node: CodexNodeRef node: CodexNodeRef
@ -99,8 +100,9 @@ template setupAndTearDown*() {.dirty.} =
clock = SystemClock.new() clock = SystemClock.new()
localStoreMetaDs = metaTmp.newDb() localStoreMetaDs = metaTmp.newDb()
localStoreRepoDs = repoTmp.newDb() localStoreRepoDs = repoTmp.newDb()
localStore = RepoStore.new(localStoreRepoDs, localStoreMetaDs, clock = clock) localStore = RepoStore.new(localStoreRepoDs, localStoreMetaDs)
await localStore.start() await localStore.start()
maintenance = DatasetMaintainer.new(localStore, localStoreMetaDs)
blockDiscovery = Discovery.new( blockDiscovery = Discovery.new(
switch.peerInfo.privateKey, switch.peerInfo.privateKey,
@ -115,6 +117,7 @@ template setupAndTearDown*() {.dirty.} =
node = CodexNodeRef.new( node = CodexNodeRef.new(
switch = switch, switch = switch,
networkStore = store, networkStore = store,
maintenance = maintenance,
engine = engine, engine = engine,
prover = Prover.none, prover = Prover.none,
discovery = blockDiscovery, discovery = blockDiscovery,

View File

@ -101,23 +101,6 @@ asyncchecksuite "Test Node - Host contracts":
test "onExpiryUpdate callback is set": test "onExpiryUpdate callback is set":
check sales.onExpiryUpdate.isSome check sales.onExpiryUpdate.isSome
test "onExpiryUpdate callback":
let
# The blocks have set default TTL, so in order to update it we have to have larger TTL
expectedExpiry: SecondsSince1970 = clock.now + DefaultBlockTtl.seconds + 11123
expiryUpdateCallback = !sales.onExpiryUpdate
(await expiryUpdateCallback(manifestCidStr, expectedExpiry)).tryGet()
for index in 0..<manifest.blocksCount:
let
blk = (await localStore.getBlock(manifest.treeCid, index)).tryGet
key = (createBlockExpirationMetadataKey(blk.cid)).tryGet
bytes = (await localStoreMetaDs.get(key)).tryGet
blkMd = BlockMetadata.decode(bytes).tryGet
check blkMd.expiry == expectedExpiry
test "onStore callback is set": test "onStore callback is set":
check sales.onStore.isSome check sales.onStore.isSome
@ -125,7 +108,7 @@ asyncchecksuite "Test Node - Host contracts":
let onStore = !sales.onStore let onStore = !sales.onStore
var request = StorageRequest.example var request = StorageRequest.example
request.content.cid = $verifiableBlock.cid request.content.cid = $verifiableBlock.cid
request.expiry = (getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix.u256 request.expiry = (getTime() + 1.hours).toUnix.u256
var fetchedBytes: uint = 0 var fetchedBytes: uint = 0
let onBlocks = proc(blocks: seq[bt.Block]): Future[?!void] {.async.} = let onBlocks = proc(blocks: seq[bt.Block]): Future[?!void] {.async.} =
@ -134,16 +117,5 @@ asyncchecksuite "Test Node - Host contracts":
return success() return success()
(await onStore(request, 1.u256, onBlocks)).tryGet() (await onStore(request, 1.u256, onBlocks)).tryGet()
check fetchedBytes == 12 * DefaultBlockSize.uint check fetchedBytes == 12 * DefaultBlockSize.uint
let indexer = verifiable.protectedStrategy.init(
0, verifiable.numSlotBlocks() - 1, verifiable.numSlots)
for index in indexer.getIndicies(1):
let
blk = (await localStore.getBlock(verifiable.treeCid, index)).tryGet
key = (createBlockExpirationMetadataKey(blk.cid)).tryGet
bytes = (await localStoreMetaDs.get(key)).tryGet
blkMd = BlockMetadata.decode(bytes).tryGet
check blkMd.expiry == request.expiry.toSecondsSince1970

View File

@ -69,7 +69,7 @@ asyncchecksuite "Test Node - Basic":
# https://github.com/codex-storage/nim-codex/issues/699 # https://github.com/codex-storage/nim-codex/issues/699
let let
cstore = CountingStore.new(engine, localStore) cstore = CountingStore.new(engine, localStore)
node = CodexNodeRef.new(switch, cstore, engine, blockDiscovery) node = CodexNodeRef.new(switch, cstore, maintenance, engine, blockDiscovery)
missingCid = Cid.init( missingCid = Cid.init(
"zDvZRwzmCvtiyubW9AecnxgLnXK8GrBvpQJBDzToxmzDN6Nrc2CZ").get() "zDvZRwzmCvtiyubW9AecnxgLnXK8GrBvpQJBDzToxmzDN6Nrc2CZ").get()

View File

@ -28,7 +28,6 @@ checksuite "Test coders":
proc rand(T: type BlockMetadata): T = proc rand(T: type BlockMetadata): T =
BlockMetadata( BlockMetadata(
expiry: rand(SecondsSince1970),
size: rand(NBytes), size: rand(NBytes),
refCount: rand(Natural) refCount: rand(Natural)
) )

View File

@ -9,180 +9,106 @@
import pkg/chronos import pkg/chronos
import pkg/questionable/results import pkg/questionable/results
import pkg/codex/blocktype as bt import pkg/codex/blocktype
import pkg/codex/stores/repostore import pkg/codex/stores
import pkg/codex/clock import pkg/codex/clock
import pkg/codex/rng
import pkg/datastore
import ../../asynctest import ../../asynctest
import ../helpers import ../helpers
import ../helpers/mocktimer import ../helpers/mocktimer
import ../helpers/mockrepostore
import ../helpers/mockclock import ../helpers/mockclock
import ../examples import ../examples
import codex/stores/maintenance import codex/stores/maintenance
checksuite "BlockMaintainer": asyncchecksuite "DatasetMaintainer":
var mockRepoStore: MockRepoStore
var interval: Duration
var mockTimer: MockTimer
var mockClock: MockClock
var blockMaintainer: BlockMaintainer var
clock: MockClock
var testBe1: BlockExpiration timer: MockTimer
var testBe2: BlockExpiration metaDs: Datastore
var testBe3: BlockExpiration blockStore: BlockStore
maintenance: DatasetMaintainer
proc createTestExpiration(expiry: SecondsSince1970): BlockExpiration =
BlockExpiration(
cid: bt.Block.example.cid,
expiry: expiry
)
setup: setup:
mockClock = MockClock.new() clock = MockClock.new()
mockClock.set(100) timer = MockTimer.new()
metaDs = SQLiteDatastore.new(Memory).tryGet()
blockStore = RepoStore.new(metaDs, metaDs)
maintenance = DatasetMaintainer.new(
blockStore = blockStore,
metaDs = metaDs,
timer = timer,
clock = clock,
defaultExpiry = 100.seconds,
interval = 10.seconds,
retryDelay = 10.seconds
)
testBe1 = createTestExpiration(200) maintenance.start()
testBe2 = createTestExpiration(300) clock.set(0)
testBe3 = createTestExpiration(400)
mockRepoStore = MockRepoStore.new() teardown:
mockRepoStore.testBlockExpirations.add(testBe1) await maintenance.stop()
mockRepoStore.testBlockExpirations.add(testBe2)
mockRepoStore.testBlockExpirations.add(testBe3)
interval = 1.days proc listStoredBlocks(manifest: Manifest): Future[seq[int]] {.async.} =
mockTimer = MockTimer.new() var indicies = newSeq[int]()
blockMaintainer = BlockMaintainer.new( for i in 0..<manifest.blocksCount:
mockRepoStore, let address = BlockAddress.init(manifest.treeCid, i)
interval, if (await address in blockStore):
numberOfBlocksPerInterval = 2, indicies.add(i)
mockTimer,
mockClock)
test "Start should start timer at provided interval": indicies
blockMaintainer.start()
check mockTimer.startCalled == 1
check mockTimer.mockInterval == interval
test "Stop should stop timer": test "Should not delete dataset":
await blockMaintainer.stop() let manifest = await storeDataGetManifest(blockStore, blocksCount = 5)
check mockTimer.stopCalled == 1 (await maintenance.trackExpiry(manifest.treeCid, manifest.blocksCount, 100.SecondsSince1970)).tryGet()
test "Timer callback should call getBlockExpirations on RepoStore": clock.advance(50)
blockMaintainer.start()
await mockTimer.invokeCallback() await timer.invokeCallback()
await sleepAsync(1.seconds)
check: check:
mockRepoStore.getBeMaxNumber == 2 @[0, 1, 2, 3, 4] == await listStoredBlocks(manifest)
mockRepoStore.getBeOffset == 0
test "Timer callback should handle Catachable errors": test "Should delete expired dataset":
mockRepoStore.getBlockExpirationsThrows = true let manifest = await storeDataGetManifest(blockStore, blocksCount = 5)
blockMaintainer.start() (await maintenance.trackExpiry(manifest.treeCid, manifest.blocksCount, 100.SecondsSince1970)).tryGet()
await mockTimer.invokeCallback()
test "Subsequent timer callback should call getBlockExpirations on RepoStore with offset": clock.advance(150)
blockMaintainer.start()
await mockTimer.invokeCallback() await timer.invokeCallback()
await mockTimer.invokeCallback() await sleepAsync(1.seconds)
check: check:
mockRepoStore.getBeMaxNumber == 2 newSeq[int]() == await listStoredBlocks(manifest)
mockRepoStore.getBeOffset == 2
test "Timer callback should delete no blocks if none are expired": test "Should not delete dataset with prolonged expiry":
blockMaintainer.start() let manifest = await storeDataGetManifest(blockStore, blocksCount = 5)
await mockTimer.invokeCallback() (await maintenance.trackExpiry(manifest.treeCid, manifest.blocksCount, 100.SecondsSince1970)).tryGet()
(await maintenance.ensureExpiry(manifest.treeCid, 200.SecondsSince1970)).tryGet()
clock.advance(150)
await timer.invokeCallback()
await sleepAsync(1.seconds)
check: check:
mockRepoStore.delBlockCids.len == 0 @[0, 1, 2, 3, 4] == await listStoredBlocks(manifest)
test "Timer callback should delete one block if it is expired": test "Should delete dataset without prolonged expiry":
mockClock.set(250) let manifest = await storeDataGetManifest(blockStore, blocksCount = 5)
blockMaintainer.start() (await maintenance.trackExpiry(manifest.treeCid, manifest.blocksCount, 100.SecondsSince1970)).tryGet()
await mockTimer.invokeCallback() (await maintenance.ensureExpiry(manifest.treeCid, 100.SecondsSince1970)).tryGet()
clock.advance(150)
await timer.invokeCallback()
await sleepAsync(1.seconds)
check: check:
mockRepoStore.delBlockCids == [testBe1.cid] newSeq[int]() == await listStoredBlocks(manifest)
test "Timer callback should delete multiple blocks if they are expired":
mockClock.set(500)
blockMaintainer.start()
await mockTimer.invokeCallback()
check:
mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid]
test "After deleting a block, subsequent timer callback should decrease offset by the number of deleted blocks":
mockClock.set(250)
blockMaintainer.start()
await mockTimer.invokeCallback()
check mockRepoStore.delBlockCids == [testBe1.cid]
# Because one block was deleted, the offset used in the next call should be 2 minus 1.
await mockTimer.invokeCallback()
check:
mockRepoStore.getBeMaxNumber == 2
mockRepoStore.getBeOffset == 1
test "Should delete all blocks if expired, in two timer callbacks":
mockClock.set(500)
blockMaintainer.start()
await mockTimer.invokeCallback()
await mockTimer.invokeCallback()
check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid]
test "Iteration offset should loop":
blockMaintainer.start()
await mockTimer.invokeCallback()
check mockRepoStore.getBeOffset == 0
await mockTimer.invokeCallback()
check mockRepoStore.getBeOffset == 2
await mockTimer.invokeCallback()
check mockRepoStore.getBeOffset == 0
test "Should handle new blocks":
proc invokeTimerManyTimes(): Future[void] {.async.} =
for i in countup(0, 10):
await mockTimer.invokeCallback()
blockMaintainer.start()
await invokeTimerManyTimes()
# no blocks have expired
check mockRepoStore.delBlockCids == []
mockClock.set(250)
await invokeTimerManyTimes()
# one block has expired
check mockRepoStore.delBlockCids == [testBe1.cid]
# new blocks are added
let testBe4 = createTestExpiration(600)
let testBe5 = createTestExpiration(700)
mockRepoStore.testBlockExpirations.add(testBe4)
mockRepoStore.testBlockExpirations.add(testBe5)
mockClock.set(500)
await invokeTimerManyTimes()
# All blocks have expired
check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid]
mockClock.set(650)
await invokeTimerManyTimes()
# First new block has expired
check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid, testBe4.cid]
mockClock.set(750)
await invokeTimerManyTimes()
# Second new block has expired
check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid, testBe4.cid, testBe5.cid]

View File

@ -14,12 +14,10 @@ import pkg/codex/stores/cachestore
import pkg/codex/chunker import pkg/codex/chunker
import pkg/codex/stores import pkg/codex/stores
import pkg/codex/blocktype as bt import pkg/codex/blocktype as bt
import pkg/codex/clock
import pkg/codex/utils/asynciter import pkg/codex/utils/asynciter
import ../../asynctest import ../../asynctest
import ../helpers import ../helpers
import ../helpers/mockclock
import ../examples import ../examples
import ./commonstoretests import ./commonstoretests
@ -62,20 +60,13 @@ asyncchecksuite "RepoStore":
var var
repoDs: Datastore repoDs: Datastore
metaDs: Datastore metaDs: Datastore
mockClock: MockClock
repo: RepoStore repo: RepoStore
let
now: SecondsSince1970 = 123
setup: setup:
repoDs = SQLiteDatastore.new(Memory).tryGet() repoDs = SQLiteDatastore.new(Memory).tryGet()
metaDs = SQLiteDatastore.new(Memory).tryGet() metaDs = SQLiteDatastore.new(Memory).tryGet()
mockClock = MockClock.new()
mockClock.set(now)
repo = RepoStore.new(repoDs, metaDs, clock = mockClock, quotaMaxBytes = 200'nb) repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200'nb)
teardown: teardown:
(await repoDs.close()).tryGet (await repoDs.close()).tryGet
@ -179,165 +170,6 @@ asyncchecksuite "RepoStore":
repo.quotaUsedBytes == 0'nb repo.quotaUsedBytes == 0'nb
repo.quotaReservedBytes == 100'nb repo.quotaReservedBytes == 100'nb
proc getExpirations(): Future[seq[BlockExpiration]] {.async.} =
let iter = (await repo.getBlockExpirations(100, 0)).tryGet()
var res = newSeq[BlockExpiration]()
for fut in iter:
let be = await fut
res.add(be)
res
test "Should store block expiration timestamp":
let
duration = 10.seconds
blk = createTestBlock(100)
let
expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10)
(await repo.putBlock(blk, duration.some)).tryGet
let expirations = await getExpirations()
check:
expectedExpiration in expirations
test "Should store block with default expiration timestamp when not provided":
let
blk = createTestBlock(100)
let
expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + DefaultBlockTtl.seconds)
(await repo.putBlock(blk)).tryGet
let expirations = await getExpirations()
check:
expectedExpiration in expirations
test "Should refuse update expiry with negative timestamp":
let
blk = createTestBlock(100)
expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10)
(await repo.putBlock(blk, some 10.seconds)).tryGet
let expirations = await getExpirations()
check:
expectedExpiration in expirations
expect ValueError:
(await repo.ensureExpiry(blk.cid, -1)).tryGet
expect ValueError:
(await repo.ensureExpiry(blk.cid, 0)).tryGet
test "Should fail when updating expiry of non-existing block":
let
blk = createTestBlock(100)
expect BlockNotFoundError:
(await repo.ensureExpiry(blk.cid, 10)).tryGet
test "Should update block expiration timestamp when new expiration is farther":
let
blk = createTestBlock(100)
expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10)
updatedExpectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 20)
(await repo.putBlock(blk, some 10.seconds)).tryGet
let expirations = await getExpirations()
check:
expectedExpiration in expirations
(await repo.ensureExpiry(blk.cid, now + 20)).tryGet
let updatedExpirations = await getExpirations()
check:
expectedExpiration notin updatedExpirations
updatedExpectedExpiration in updatedExpirations
test "Should not update block expiration timestamp when current expiration is farther then new one":
let
blk = createTestBlock(100)
expectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 10)
updatedExpectedExpiration = BlockExpiration(cid: blk.cid, expiry: now + 5)
(await repo.putBlock(blk, some 10.seconds)).tryGet
let expirations = await getExpirations()
check:
expectedExpiration in expirations
(await repo.ensureExpiry(blk.cid, now + 5)).tryGet
let updatedExpirations = await getExpirations()
check:
expectedExpiration in updatedExpirations
updatedExpectedExpiration notin updatedExpirations
test "delBlock should remove expiration metadata":
let
blk = createTestBlock(100)
expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet
(await repo.putBlock(blk, 10.seconds.some)).tryGet
(await repo.delBlock(blk.cid)).tryGet
let expirations = await getExpirations()
check:
expirations.len == 0
test "Should retrieve block expiration information":
proc unpack(beIter: Future[?!AsyncIter[BlockExpiration]]): Future[seq[BlockExpiration]] {.async.} =
var expirations = newSeq[BlockExpiration](0)
without iter =? (await beIter), err:
return expirations
for beFut in toSeq(iter):
let value = await beFut
expirations.add(value)
return expirations
let
duration = 10.seconds
blk1 = createTestBlock(10)
blk2 = createTestBlock(11)
blk3 = createTestBlock(12)
let
expectedExpiration: SecondsSince1970 = now + 10
proc assertExpiration(be: BlockExpiration, expectedBlock: bt.Block) =
check:
be.cid == expectedBlock.cid
be.expiry == expectedExpiration
(await repo.putBlock(blk1, duration.some)).tryGet
(await repo.putBlock(blk2, duration.some)).tryGet
(await repo.putBlock(blk3, duration.some)).tryGet
let
blockExpirations1 = await unpack(repo.getBlockExpirations(maxNumber=2, offset=0))
blockExpirations2 = await unpack(repo.getBlockExpirations(maxNumber=2, offset=2))
check blockExpirations1.len == 2
assertExpiration(blockExpirations1[0], blk2)
assertExpiration(blockExpirations1[1], blk1)
check blockExpirations2.len == 1
assertExpiration(blockExpirations2[0], blk3)
test "should put empty blocks": test "should put empty blocks":
let blk = Cid.example.emptyBlock.tryGet() let blk = Cid.example.emptyBlock.tryGet()
check (await repo.putBlock(blk)).isOk check (await repo.putBlock(blk)).isOk
@ -365,8 +197,7 @@ commonBlockStoreTests(
BlockStore( BlockStore(
RepoStore.new( RepoStore.new(
SQLiteDatastore.new(Memory).tryGet(), SQLiteDatastore.new(Memory).tryGet(),
SQLiteDatastore.new(Memory).tryGet(), SQLiteDatastore.new(Memory).tryGet())))
clock = MockClock.new())))
const const
path = currentSourcePath().parentDir / "test" path = currentSourcePath().parentDir / "test"
@ -385,7 +216,6 @@ commonBlockStoreTests(
BlockStore( BlockStore(
RepoStore.new( RepoStore.new(
FSDatastore.new(path, depth).tryGet(), FSDatastore.new(path, depth).tryGet(),
SQLiteDatastore.new(Memory).tryGet(), SQLiteDatastore.new(Memory).tryGet())),
clock = MockClock.new())),
before = before, before = before,
after = after) after = after)

View File

@ -22,7 +22,7 @@ ethersuite "Node block expiration tests":
dataDir.removeDir() dataDir.removeDir()
proc startTestNode(blockTtlSeconds: int) = proc startTestNode(ttlSeconds: int) =
node = startNode([ node = startNode([
"--api-port=8080", "--api-port=8080",
"--data-dir=" & dataDir, "--data-dir=" & dataDir,
@ -30,9 +30,8 @@ ethersuite "Node block expiration tests":
"--listen-addrs=/ip4/127.0.0.1/tcp/0", "--listen-addrs=/ip4/127.0.0.1/tcp/0",
"--disc-ip=127.0.0.1", "--disc-ip=127.0.0.1",
"--disc-port=8090", "--disc-port=8090",
"--block-ttl=" & $blockTtlSeconds, "--default-ttl=" & $ttlSeconds,
"--block-mi=1", "--maintenance-interval=1"
"--block-mn=10"
], debug = false) ], debug = false)
node.waitUntilStarted() node.waitUntilStarted()
@ -61,7 +60,7 @@ ethersuite "Node block expiration tests":
content.code == Http200 content.code == Http200
test "node retains not-expired file": test "node retains not-expired file":
startTestNode(blockTtlSeconds = 10) startTestNode(ttlSeconds = 10)
let contentId = uploadTestFile() let contentId = uploadTestFile()
@ -74,10 +73,15 @@ ethersuite "Node block expiration tests":
response.body == content response.body == content
test "node deletes expired file": test "node deletes expired file":
startTestNode(blockTtlSeconds = 1) startTestNode(ttlSeconds = 2)
let contentId = uploadTestFile() let contentId = uploadTestFile()
await sleepAsync(1.seconds)
# check:
# hasFile(contentId)
await sleepAsync(3.seconds) await sleepAsync(3.seconds)
check: check: