Blockstore maintenance (#347)

* setting up

* Implements timer utility

* applies async and cancellation to timer loop

* Sets up mocktimer and mockblockstore to set up first maintenance module test

* wip: first test that calls blockChecker

* wip: adding user type to timer callback

* Chronicles doesn't like type-arguments? Disables logging in timer module for now

* Implementing block check test for blockMaintainer module

* Sets up additional tests for blockmaintainer

* Removes generic from timer module. Implements numberOfBlocks per interval in blockMaintainer.

* Implements blockMaintainer

* Sets up tests for blockChecker

* Some comments by Mark

* Cleanup repostore tests

* Setting up the use of std/times for block TTL tracking

* repostore adds expiration timestamp

* Defaults the repostore clock to the system clock

* Applies updates to repostore interface.

* Implements retrieving of block expiration information from repostore

* Sets up tests for maintenance module behavior

* Implements block maintenance module

* Wires maintenance module into codex. Sets up integration tests for block expiration

* Sets up test for missing behavior: removing timestamp metadata on block delete

* Implements removing of expiration metadata in repoStore

* Fixes integration tests for block expiration

* Adds block expiration tests to integration test run

* Handled some comments by Dmitriy

* Review comment by Dmitriy: Removes seq[cid] from runBlockCheck

* Review comment by Dmitriy: Moves key formatting methods to keyutils.

* Review comment by Dmitriy: Encodes durations using chronos

* Fixes conversion of TTL type in conf.

* Review comments by Dmitriy

* Adds unit tests for keyUtils.

* Adds test coverage for exception in maintenance module
This commit is contained in:
Ben Bierens 2023-03-08 16:04:54 +01:00 committed by GitHub
parent 25f68c1e4c
commit 9c8a59d150
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1042 additions and 89 deletions

View File

@ -1,11 +1,13 @@
import pkg/chronos
import pkg/stew/endians2
import pkg/upraises
type
Clock* = ref object of RootObj
SecondsSince1970* = int64
Timeout* = object of CatchableError
method now*(clock: Clock): SecondsSince1970 {.base.} =
method now*(clock: Clock): SecondsSince1970 {.base, upraises: [].} =
raiseAssert "not implemented"
proc waitUntil*(clock: Clock, time: SecondsSince1970) {.async.} =
@ -23,3 +25,11 @@ proc withTimeout*(future: Future[void],
if not future.completed:
await future.cancelAndWait()
raise newException(Timeout, "Timed out")
proc toBytes*(i: SecondsSince1970): seq[byte] =
let asUint = cast[uint64](i)
@(asUint.toBytes)
proc toSecondsSince1970*(bytes: seq[byte]): SecondsSince1970 =
let asUint = uint64.fromBytes(bytes)
cast[int64](asUint)

View File

@ -46,6 +46,7 @@ type
restServer: RestServerRef
codexNode: CodexNodeRef
repoStore: RepoStore
maintenance: BlockMaintainer
CodexPrivateKey* = libp2p.PrivateKey # alias
@ -55,6 +56,7 @@ proc start*(s: CodexServer) {.async.} =
await s.repoStore.start()
s.restServer.start()
await s.codexNode.start()
s.maintenance.start()
let
# TODO: Can't define these as constants, pity
@ -90,7 +92,8 @@ proc stop*(s: CodexServer) {.async.} =
await allFuturesThrowing(
s.restServer.stop(),
s.codexNode.stop(),
s.repoStore.start())
s.repoStore.stop(),
s.maintenance.stop())
s.runHandle.complete()
@ -130,6 +133,7 @@ proc new*(T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey):
if config.cacheSize > 0:
cache = CacheStore.new(cacheSize = config.cacheSize * MiB)
## Is unused?
let
discoveryDir = config.dataDir / CodexDhtNamespace
@ -161,7 +165,12 @@ proc new*(T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey):
metaDs = SQLiteDatastore.new(config.dataDir / CodexMetaNamespace)
.expect("Should create meta data store!"),
quotaMaxBytes = config.storageQuota.uint,
blockTtl = config.blockTtl.seconds)
blockTtl = config.blockTtlSeconds.seconds)
maintenance = BlockMaintainer.new(
repoStore,
interval = config.blockMaintenanceIntervalSeconds.seconds,
numberOfBlocksPerInterval = config.blockMaintenanceNumberOfBlocks)
peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
@ -183,4 +192,5 @@ proc new*(T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey):
config: config,
codexNode: codexNode,
restServer: restServer,
repoStore: repoStore)
repoStore: repoStore,
maintenance: maintenance)

View File

@ -32,7 +32,7 @@ import pkg/ethers
import ./discovery
import ./stores
export DefaultCacheSizeMiB, DefaultQuotaBytes, DefaultBlockTtl, net
export DefaultCacheSizeMiB, net, DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockMaintenanceInterval, DefaultNumberOfBlocksToMaintainPerInterval
type
StartUpCommand* {.pure.} = enum
@ -158,12 +158,24 @@ type
name: "storage-quota"
abbr: "q" }: Natural
blockTtl* {.
blockTtlSeconds* {.
desc: "Default block timeout in seconds - 0 disables the ttl"
defaultValue: DefaultBlockTtl.secs
defaultValueDesc: "86400" # 24h in secs
defaultValue: DefaultBlockTtl.seconds
defaultValueDesc: $DefaultBlockTtl
name: "block-ttl"
abbr: "t" }: Natural
abbr: "t" }: int
blockMaintenanceIntervalSeconds* {.
desc: "Time interval in seconds - determines frequency of block maintenance cycle: how often blocks are checked for expiration and cleanup."
defaultValue: DefaultBlockMaintenanceInterval.seconds
defaultValueDesc: $DefaultBlockMaintenanceInterval
name: "block-mi" }: int
blockMaintenanceNumberOfBlocks* {.
desc: "Number of blocks to check every maintenance cycle."
defaultValue: DefaultNumberOfBlocksToMaintainPerInterval
defaultValueDesc: $DefaultNumberOfBlocksToMaintainPerInterval
name: "block-mn" }: int
cacheSize* {.
desc: "The size in MiB of the block cache, 0 disables the cache - might help on slow hardrives"

View File

@ -2,5 +2,7 @@ import ./stores/cachestore
import ./stores/blockstore
import ./stores/networkstore
import ./stores/repostore
import ./stores/maintenance
import ./stores/keyutils
export cachestore, blockstore, networkstore, repostore
export cachestore, blockstore, networkstore, repostore, maintenance, keyutils

44
codex/stores/keyutils.nim Normal file
View File

@ -0,0 +1,44 @@
## Nim-Codex
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/upraises
push: {.upraises: [].}
import pkg/chronicles
import pkg/questionable/results
import pkg/datastore
import pkg/libp2p
import ../namespaces
import ../manifest
const
CodexMetaKey* = Key.init(CodexMetaNamespace).tryGet
CodexRepoKey* = Key.init(CodexRepoNamespace).tryGet
CodexBlocksKey* = Key.init(CodexBlocksNamespace).tryGet
CodexManifestKey* = Key.init(CodexManifestNamespace).tryGet
BlocksTtlKey* = Key.init(CodexBlocksTtlNamespace).tryGet
QuotaKey* = Key.init(CodexQuotaNamespace).tryGet
QuotaUsedKey* = (QuotaKey / "used").tryGet
QuotaReservedKey* = (QuotaKey / "reserved").tryGet
func makePrefixKey*(postFixLen: int, cid: Cid): ?!Key =
let
cidKey = ? Key.init(($cid)[^postFixLen..^1] & "/" & $cid)
if ? cid.isManifest:
success CodexManifestKey / cidKey
else:
success CodexBlocksKey / cidKey
proc createBlockExpirationMetadataKey*(cid: Cid): ?!Key =
BlocksTtlKey / $cid
proc createBlockExpirationMetadataQueryKey*(): ?!Key =
let queryString = ? (BlocksTtlKey / "*")
Key.init(queryString)

View File

@ -0,0 +1,92 @@
## Nim-Codex
## Copyright (c) 2023 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
## Store maintenance module
## Looks for and removes expired blocks from blockstores.
import pkg/chronos
import pkg/chronicles
import pkg/questionable
import pkg/questionable/results
import ./repostore
import ../utils/timer
import ../clock
import ../systemclock
const
DefaultBlockMaintenanceInterval* = 10.minutes
DefaultNumberOfBlocksToMaintainPerInterval* = 1000
type
BlockMaintainer* = ref object of RootObj
repoStore: RepoStore
interval: Duration
timer: Timer
clock: Clock
numberOfBlocksPerInterval: int
offset: int
proc new*(T: type BlockMaintainer,
repoStore: RepoStore,
interval: Duration,
numberOfBlocksPerInterval = 100,
timer = Timer.new(),
clock: Clock = SystemClock.new()
): T =
T(
repoStore: repoStore,
interval: interval,
numberOfBlocksPerInterval: numberOfBlocksPerInterval,
timer: timer,
clock: clock,
offset: 0)
proc deleteExpiredBlock(self: BlockMaintainer, cid: Cid): Future[void] {.async.} =
if isErr (await self.repoStore.delBlock(cid)):
trace "Unable to delete block from repoStore"
proc processBlockExpiration(self: BlockMaintainer, be: BlockExpiration): Future[void] {.async} =
if be.expiration < self.clock.now:
await self.deleteExpiredBlock(be.cid)
else:
inc self.offset
proc runBlockCheck(self: BlockMaintainer): Future[void] {.async.} =
let expirations = await self.repoStore.getBlockExpirations(
maxNumber = self.numberOfBlocksPerInterval,
offset = self.offset
)
without iter =? expirations, err:
trace "Unable to obtain blockExpirations iterator from repoStore"
return
var numberReceived = 0
for maybeBeFuture in iter:
if be =? await maybeBeFuture:
inc numberReceived
await self.processBlockExpiration(be)
# If we received fewer blockExpirations from the iterator than we asked for,
# We're at the end of the dataset and should start from 0 next time.
if numberReceived < self.numberOfBlocksPerInterval:
self.offset = 0
proc start*(self: BlockMaintainer) =
proc onTimer(): Future[void] {.async.} =
try:
await self.runBlockCheck()
except CatchableError as exc:
error "Unexpected exception in BlockMaintainer.onTimer(): ", msg=exc.msg
self.timer.start(onTimer, self.interval)
proc stop*(self: BlockMaintainer): Future[void] {.async.} =
await self.timer.stop()

View File

@ -7,6 +7,7 @@
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
import pkg/upraises
push: {.upraises: [].}
@ -20,9 +21,11 @@ import pkg/datastore
import pkg/stew/endians2
import ./blockstore
import ./keyutils
import ../blocktype
import ../namespaces
import ../manifest
import ../clock
import ../systemclock
export blocktype, libp2p
@ -30,22 +33,9 @@ logScope:
topics = "codex repostore"
const
CodexMetaKey* = Key.init(CodexMetaNamespace).tryGet
CodexRepoKey* = Key.init(CodexRepoNamespace).tryGet
CodexBlocksKey* = Key.init(CodexBlocksNamespace).tryGet
CodexManifestKey* = Key.init(CodexManifestNamespace).tryGet
QuotaKey* = Key.init(CodexQuotaNamespace).tryGet
QuotaUsedKey* = (QuotaKey / "used").tryGet
QuotaReservedKey* = (QuotaKey / "reserved").tryGet
BlocksTtlKey* = Key.init(CodexBlocksTtlNamespace).tryGet
DefaultBlockTtl* = 24.hours
DefaultQuotaBytes* = 1'u shl 33'u # ~8GB
ZeroMoment = Moment.init(0, Nanosecond) # used for converting between Duration and Moment
type
QuotaUsedError* = object of CodexError
QuotaNotEnoughError* = object of CodexError
@ -54,23 +44,24 @@ type
postFixLen*: int
repoDs*: Datastore
metaDs*: Datastore
clock: Clock
quotaMaxBytes*: uint
quotaUsedBytes*: uint
quotaReservedBytes*: uint
blockTtl*: Duration
started*: bool
func makePrefixKey*(self: RepoStore, cid: Cid): ?!Key =
let
cidKey = ? Key.init(($cid)[^self.postFixLen..^1] & "/" & $cid)
BlockExpiration* = object
cid*: Cid
expiration*: SecondsSince1970
GetNext = proc(): Future[?BlockExpiration] {.upraises: [], gcsafe, closure.}
BlockExpirationIter* = ref object
finished*: bool
next*: GetNext
if ? cid.isManifest:
success CodexManifestKey / cidKey
else:
success CodexBlocksKey / cidKey
func makeExpiresKey(expires: Duration, cid: Cid): ?!Key =
BlocksTtlKey / $cid / $expires.seconds
iterator items*(q: BlockExpirationIter): Future[?BlockExpiration] =
while not q.finished:
yield q.next()
func totalUsed*(self: RepoStore): uint =
(self.quotaUsedBytes + self.quotaReservedBytes)
@ -79,7 +70,7 @@ method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
## Get a block from the blockstore
##
without key =? self.makePrefixKey(cid), err:
without key =? makePrefixKey(self.postFixLen, cid), err:
trace "Error getting key from provider", err = err.msg
return failure(err)
@ -93,6 +84,17 @@ method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
trace "Got block for cid", cid
return Block.new(cid, data)
proc getBlockExpirationTimestamp(self: RepoStore, ttl: ?Duration): SecondsSince1970 =
let duration = ttl |? self.blockTtl
self.clock.now() + duration.seconds
proc getBlockExpirationEntry(self: RepoStore, batch: var seq[BatchEntry], cid: Cid, ttl: ?Duration): ?!BatchEntry =
without key =? createBlockExpirationMetadataKey(cid), err:
return failure(err)
let value = self.getBlockExpirationTimestamp(ttl).toBytes
return success((key, value))
method putBlock*(
self: RepoStore,
blk: Block,
@ -100,7 +102,7 @@ method putBlock*(
## Put a block to the blockstore
##
without key =? self.makePrefixKey(blk.cid), err:
without key =? makePrefixKey(self.postFixLen, blk.cid), err:
trace "Error getting key from provider", err = err.msg
return failure(err)
@ -115,9 +117,6 @@ method putBlock*(
trace "Storing block with key", key
without var expires =? ttl:
expires = Moment.fromNow(self.blockTtl) - ZeroMoment
var
batch: seq[BatchEntry]
@ -131,14 +130,10 @@ method putBlock*(
trace "Updating quota", used
batch.add((QuotaUsedKey, @(used.uint64.toBytesBE)))
without expiresKey =? makeExpiresKey(expires, blk.cid), err:
trace "Unable make block ttl key",
err = err.msg, cid = blk.cid, expires, expiresKey
without blockExpEntry =? self.getBlockExpirationEntry(batch, blk.cid, ttl), err:
trace "Unable to create block expiration metadata key", err = err.msg
return failure(err)
trace "Adding expires key", expiresKey, expires
batch.add((expiresKey, @[]))
batch.add(blockExpEntry)
if err =? (await self.metaDs.put(batch)).errorOption:
trace "Error updating quota bytes", err = err.msg
@ -152,6 +147,21 @@ method putBlock*(
self.quotaUsedBytes = used
return success()
proc updateQuotaBytesUsed(self: RepoStore, blk: Block): Future[?!void] {.async.} =
let used = self.quotaUsedBytes - blk.data.len.uint
if err =? (await self.metaDs.put(
QuotaUsedKey,
@(used.uint64.toBytesBE))).errorOption:
trace "Error updating quota key!", err = err.msg
return failure(err)
self.quotaUsedBytes = used
return success()
proc removeBlockExpirationEntry(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
without key =? createBlockExpirationMetadataKey(cid), err:
return failure(err)
return await self.metaDs.delete(key)
method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
## Delete a block from the blockstore
##
@ -159,21 +169,18 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
trace "Deleting block", cid
if blk =? (await self.getBlock(cid)):
if key =? self.makePrefixKey(cid) and
if key =? makePrefixKey(self.postFixLen, cid) and
err =? (await self.repoDs.delete(key)).errorOption:
trace "Error deleting block!", err = err.msg
return failure(err)
let
used = self.quotaUsedBytes - blk.data.len.uint
if isErr (await self.updateQuotaBytesUsed(blk)):
trace "Unable to update quote-bytes-used in metadata store"
return failure("Unable to update quote-bytes-used in metadata store")
if err =? (await self.metaDs.put(
QuotaUsedKey,
@(used.uint64.toBytesBE))).errorOption:
trace "Error updating quota key!", err = err.msg
return failure(err)
self.quotaUsedBytes = used
if isErr (await self.removeBlockExpirationEntry(blk.cid)):
trace "Unable to remove block expiration entry from metadata store"
return failure("Unable to remove block expiration entry from metadata store")
trace "Deleted block", cid, totalUsed = self.totalUsed
@ -183,7 +190,7 @@ method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
## Check if the block exists in the blockstore
##
without key =? self.makePrefixKey(cid), err:
without key =? makePrefixKey(self.postFixLen, cid), err:
trace "Error getting key from provider", err = err.msg
return failure(err)
@ -222,6 +229,40 @@ method listBlocks*(
iter.next = next
return success iter
proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query =
let queryKey = ? createBlockExpirationMetadataQueryKey()
success Query.init(queryKey, offset = offset, limit = maxNumber)
method getBlockExpirations*(self: RepoStore, maxNumber: int, offset: int): Future[?!BlockExpirationIter] {.async, base.} =
without query =? createBlockExpirationQuery(maxNumber, offset), err:
trace "Unable to format block expirations query"
return failure(err)
without queryIter =? (await self.metaDs.query(query)), err:
trace "Unable to execute block expirations query"
return failure(err)
var iter = BlockExpirationIter()
proc next(): Future[?BlockExpiration] {.async.} =
if not queryIter.finished:
if pair =? (await queryIter.next()) and blockKey =? pair.key:
let expirationTimestamp = pair.data
let cidResult = Cid.init(blockKey.value)
if not cidResult.isOk:
raiseAssert("Unable to parse CID from blockKey.value: " & blockKey.value & $cidResult.error)
return BlockExpiration(
cid: cidResult.get,
expiration: expirationTimestamp.toSecondsSince1970
).some
else:
discard await queryIter.dispose()
iter.finished = true
return BlockExpiration.none
iter.next = next
return success iter
method close*(self: RepoStore): Future[void] {.async.} =
## Close the blockstore, cleaning up resources managed by it.
## For some implementations this may be a no-op
@ -234,7 +275,7 @@ proc hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
## Return false if error encountered
##
without key =? self.makePrefixKey(cid), err:
without key =? makePrefixKey(self.postFixLen, cid), err:
trace "Error getting key from provider", err = err.msg
return failure(err.msg)
@ -345,13 +386,14 @@ func new*(
T: type RepoStore,
repoDs: Datastore,
metaDs: Datastore,
clock: Clock = SystemClock.new(),
postFixLen = 2,
quotaMaxBytes = DefaultQuotaBytes,
blockTtl = DefaultBlockTtl): T =
T(
repoDs: repoDs,
metaDs: metaDs,
clock: clock,
postFixLen: postFixLen,
quotaMaxBytes: quotaMaxBytes,
blockTtl: blockTtl)

10
codex/systemclock.nim Normal file
View File

@ -0,0 +1,10 @@
import std/times
import pkg/upraises
import ./clock
type
SystemClock* = ref object of Clock
method now*(clock: SystemClock): SecondsSince1970 {.upraises: [].} =
let now = times.now().utc
now.toTime().toUnix()

View File

@ -13,10 +13,12 @@ push: {.upraises: [].}
import pkg/chronicles
import pkg/questionable/results
import pkg/libp2p
import pkg/datastore
import ./fileutils
import ../errors
import ../rng
import ../namespaces
const
SafePermissions = {UserRead, UserWrite}

50
codex/utils/timer.nim Normal file
View File

@ -0,0 +1,50 @@
## Nim-Codex
## Copyright (c) 2023 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
## Timer
## Used to execute a callback in a loop
import pkg/chronos
import pkg/chronicles
import pkg/upraises
type
TimerCallback* = proc(): Future[void] {.gcsafe, upraises:[].}
Timer* = ref object of RootObj
callback: TimerCallback
interval: Duration
name: string
loopFuture: Future[void]
proc new*(T: type Timer, timerName = "Unnamed Timer"): T =
T(
name: timerName
)
proc timerLoop(timer: Timer) {.async.} =
try:
while true:
await timer.callback()
await sleepAsync(timer.interval)
except CatchableError as exc:
error "Timer caught unhandled exception: ", name=timer.name, msg=exc.msg
method start*(timer: Timer, callback: TimerCallback, interval: Duration) {.base.} =
if timer.loopFuture != nil:
return
trace "Timer starting: ", name=timer.name
timer.callback = callback
timer.interval = interval
timer.loopFuture = timerLoop(timer)
method stop*(timer: Timer) {.async, base.} =
if timer.loopFuture != nil:
trace "Timer stopping: ", name=timer.name
await timer.loopFuture.cancelAndWait()
timer.loopFuture = nil

View File

@ -0,0 +1,57 @@
## Nim-Codex
## Copyright (c) 2023 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/sequtils
import pkg/chronos
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import pkg/codex/blocktype as bt
import codex/stores/repostore
type
MockRepoStore* = ref object of RepoStore
delBlockCids*: seq[Cid]
getBeMaxNumber*: int
getBeOffset*: int
testBlockExpirations*: seq[BlockExpiration]
getBlockExpirationsThrows*: bool
iteratorIndex: int
method delBlock*(self: MockRepoStore, cid: Cid): Future[?!void] {.async.} =
self.delBlockCids.add(cid)
self.testBlockExpirations = self.testBlockExpirations.filterIt(it.cid != cid)
dec self.iteratorIndex
return success()
method getBlockExpirations*(self: MockRepoStore, maxNumber: int, offset: int): Future[?!BlockExpirationIter] {.async.} =
if self.getBlockExpirationsThrows:
raise new CatchableError
self.getBeMaxNumber = maxNumber
self.getBeOffset = offset
var iter = BlockExpirationIter()
iter.finished = false
self.iteratorIndex = offset
var numberLeft = maxNumber
proc next(): Future[?BlockExpiration] {.async.} =
if numberLeft > 0 and self.iteratorIndex >= 0 and self.iteratorIndex < len(self.testBlockExpirations):
dec numberLeft
let selectedBlock = self.testBlockExpirations[self.iteratorIndex]
inc self.iteratorIndex
return selectedBlock.some
iter.finished = true
return BlockExpiration.none
iter.next = next
return success iter

View File

@ -0,0 +1,36 @@
## Nim-Codex
## Copyright (c) 2023 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/chronos
import codex/utils/timer
type
MockTimer* = ref object of Timer
startCalled*: int
stopCalled*: int
mockInterval*: Duration
mockCallback: timer.TimerCallback
proc new*(T: type MockTimer): T =
T(
startCalled: 0,
stopCalled: 0
)
method start*(mockTimer: MockTimer, callback: timer.TimerCallback, interval: Duration) =
mockTimer.mockCallback = callback
mockTimer.mockInterval = interval
inc mockTimer.startCalled
method stop*(mockTimer: MockTimer) {.async.} =
inc mockTimer.stopCalled
method invokeCallback*(mockTimer: MockTimer) {.async, base.} =
await mockTimer.mockCallback()

View File

@ -0,0 +1,94 @@
## Nim-Codex
## Copyright (c) 2023 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/random
import std/sequtils
import pkg/chronos
import pkg/libp2p
import pkg/asynctest
import pkg/questionable
import pkg/questionable/results
import pkg/codex/blocktype as bt
import pkg/codex/stores/repostore
import pkg/codex/clock
import ../helpers/mocktimer
import ../helpers/mockrepostore
import ../helpers/mockclock
import ../examples
import codex/namespaces
import codex/stores/keyutils
proc createManifestCid(): ?!Cid =
let
length = rand(4096)
bytes = newSeqWith(length, rand(uint8))
mcodec = multiCodec("sha2-256")
codec = multiCodec("dag-pb")
version = CIDv1
let hash = ? MultiHash.digest($mcodec, bytes).mapFailure
let cid = ? Cid.init(version, codec, hash).mapFailure
return success cid
suite "KeyUtils":
test "makePrefixKey should create block key":
let length = 6
let cid = Cid.example
let expectedPrefix = ($cid)[^length..^1]
let expectedPostfix = $cid
let key = !makePrefixKey(length, cid).option
let namespaces = key.namespaces
check:
namespaces.len == 4
namespaces[0].value == CodexRepoNamespace
namespaces[1].value == "blocks"
namespaces[2].value == expectedPrefix
namespaces[3].value == expectedPostfix
test "makePrefixKey should create manifest key":
let length = 6
let cid = !createManifestCid().option
let expectedPrefix = ($cid)[^length..^1]
let expectedPostfix = $cid
let key = !makePrefixKey(length, cid).option
let namespaces = key.namespaces
check:
namespaces.len == 4
namespaces[0].value == CodexRepoNamespace
namespaces[1].value == "manifests"
namespaces[2].value == expectedPrefix
namespaces[3].value == expectedPostfix
test "createBlockExpirationMetadataKey should create block TTL key":
let cid = Cid.example
let key = !createBlockExpirationMetadataKey(cid).option
let namespaces = key.namespaces
check:
namespaces.len == 3
namespaces[0].value == CodexMetaNamespace
namespaces[1].value == "ttl"
namespaces[2].value == $cid
test "createBlockExpirationMetadataQueryKey should create key for all block TTL entries":
let key = !createBlockExpirationMetadataQueryKey().option
let namespaces = key.namespaces
check:
namespaces.len == 3
namespaces[0].value == CodexMetaNamespace
namespaces[1].value == "ttl"
namespaces[2].value == "*"

View File

@ -0,0 +1,190 @@
## Nim-Codex
## Copyright (c) 2023 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/chronos
import pkg/libp2p
import pkg/asynctest
import pkg/questionable
import pkg/questionable/results
import pkg/codex/blocktype as bt
import pkg/codex/stores/repostore
import pkg/codex/clock
import ../helpers/mocktimer
import ../helpers/mockrepostore
import ../helpers/mockclock
import ../examples
import codex/stores/maintenance
suite "BlockMaintainer":
var mockRepoStore: MockRepoStore
var interval: Duration
var mockTimer: MockTimer
var mockClock: MockClock
var blockMaintainer: BlockMaintainer
var testBe1: BlockExpiration
var testBe2: BlockExpiration
var testBe3: BlockExpiration
proc createTestExpiration(expiration: SecondsSince1970): BlockExpiration =
BlockExpiration(
cid: bt.Block.example.cid,
expiration: expiration
)
setup:
mockClock = MockClock.new()
mockClock.set(100)
testBe1 = createTestExpiration(200)
testBe2 = createTestExpiration(300)
testBe3 = createTestExpiration(400)
mockRepoStore = MockRepoStore.new()
mockRepoStore.testBlockExpirations.add(testBe1)
mockRepoStore.testBlockExpirations.add(testBe2)
mockRepoStore.testBlockExpirations.add(testBe3)
interval = 1.days
mockTimer = MockTimer.new()
blockMaintainer = BlockMaintainer.new(
mockRepoStore,
interval,
numberOfBlocksPerInterval = 2,
mockTimer,
mockClock)
test "Start should start timer at provided interval":
blockMaintainer.start()
check mockTimer.startCalled == 1
check mockTimer.mockInterval == interval
test "Stop should stop timer":
await blockMaintainer.stop()
check mockTimer.stopCalled == 1
test "Timer callback should call getBlockExpirations on RepoStore":
blockMaintainer.start()
await mockTimer.invokeCallback()
check:
mockRepoStore.getBeMaxNumber == 2
mockRepoStore.getBeOffset == 0
test "Timer callback should handle Catachable errors":
mockRepoStore.getBlockExpirationsThrows = true
blockMaintainer.start()
await mockTimer.invokeCallback()
test "Subsequent timer callback should call getBlockExpirations on RepoStore with offset":
blockMaintainer.start()
await mockTimer.invokeCallback()
await mockTimer.invokeCallback()
check:
mockRepoStore.getBeMaxNumber == 2
mockRepoStore.getBeOffset == 2
test "Timer callback should delete no blocks if none are expired":
blockMaintainer.start()
await mockTimer.invokeCallback()
check:
mockRepoStore.delBlockCids.len == 0
test "Timer callback should delete one block if it is expired":
mockClock.set(250)
blockMaintainer.start()
await mockTimer.invokeCallback()
check:
mockRepoStore.delBlockCids == [testBe1.cid]
test "Timer callback should delete multiple blocks if they are expired":
mockClock.set(500)
blockMaintainer.start()
await mockTimer.invokeCallback()
check:
mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid]
test "After deleting a block, subsequent timer callback should decrease offset by the number of deleted blocks":
mockClock.set(250)
blockMaintainer.start()
await mockTimer.invokeCallback()
check mockRepoStore.delBlockCids == [testBe1.cid]
# Because one block was deleted, the offset used in the next call should be 2 minus 1.
await mockTimer.invokeCallback()
check:
mockRepoStore.getBeMaxNumber == 2
mockRepoStore.getBeOffset == 1
test "Should delete all blocks if expired, in two timer callbacks":
mockClock.set(500)
blockMaintainer.start()
await mockTimer.invokeCallback()
await mockTimer.invokeCallback()
check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid]
test "Iteration offset should loop":
blockMaintainer.start()
await mockTimer.invokeCallback()
check mockRepoStore.getBeOffset == 0
await mockTimer.invokeCallback()
check mockRepoStore.getBeOffset == 2
await mockTimer.invokeCallback()
check mockRepoStore.getBeOffset == 0
test "Should handle new blocks":
proc invokeTimerManyTimes(): Future[void] {.async.} =
for i in countUp(0, 10):
await mockTimer.invokeCallback()
blockMaintainer.start()
await invokeTimerManyTimes()
# no blocks have expired
check mockRepoStore.delBlockCids == []
mockClock.set(250)
await invokeTimerManyTimes()
# one block has expired
check mockRepoStore.delBlockCids == [testBe1.cid]
# new blocks are added
let testBe4 = createTestExpiration(600)
let testBe5 = createTestExpiration(700)
mockRepoStore.testBlockExpirations.add(testBe4)
mockRepoStore.testBlockExpirations.add(testBe5)
mockClock.set(500)
await invokeTimerManyTimes()
# All blocks have expired
check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid]
mockClock.set(650)
await invokeTimerManyTimes()
# First new block has expired
check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid, testBe4.cid]
mockClock.set(750)
await invokeTimerManyTimes()
# Second new block has expired
check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid, testBe4.cid, testBe5.cid]

View File

@ -1,6 +1,7 @@
import std/os
import std/options
import std/strutils
import std/sequtils
import pkg/questionable
import pkg/questionable/results
@ -16,40 +17,50 @@ import pkg/codex/stores/cachestore
import pkg/codex/chunker
import pkg/codex/stores
import pkg/codex/blocktype as bt
import pkg/codex/clock
import ../helpers
import ../helpers/mockclock
import ./commonstoretests
suite "Test RepoStore Quota":
suite "RepoStore":
var
repoDs: Datastore
metaDs: Datastore
mockClock: MockClock
repo: RepoStore
let
now: SecondsSince1970 = 123
setup:
repoDs = SQLiteDatastore.new(Memory).tryGet()
metaDs = SQLiteDatastore.new(Memory).tryGet()
mockClock = MockClock.new()
mockClock.set(now)
repo = RepoStore.new(repoDs, metaDs, mockClock, quotaMaxBytes = 200)
teardown:
(await repoDs.close()).tryGet
(await metaDs.close()).tryGet
proc createTestBlock(size: int): bt.Block =
bt.Block.new('a'.repeat(size).toBytes).tryGet()
test "Should update current used bytes on block put":
let
blk = bt.Block.new('a'.repeat(100).toBytes).tryGet()
repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 100)
let blk = createTestBlock(200)
check repo.quotaUsedBytes == 0
(await repo.putBlock(blk)).tryGet
check:
repo.quotaUsedBytes == 100
uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 100'u
repo.quotaUsedBytes == 200
uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 200'u
test "Should update current used bytes on block delete":
let
blk = bt.Block.new('a'.repeat(100).toBytes).tryGet()
repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 100)
let blk = createTestBlock(100)
check repo.quotaUsedBytes == 0
(await repo.putBlock(blk)).tryGet
@ -62,9 +73,7 @@ suite "Test RepoStore Quota":
uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 0'u
test "Should not update current used bytes if block exist":
let
blk = bt.Block.new('a'.repeat(100).toBytes).tryGet()
repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200)
let blk = createTestBlock(100)
check repo.quotaUsedBytes == 0
(await repo.putBlock(blk)).tryGet
@ -78,18 +87,14 @@ suite "Test RepoStore Quota":
uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 100'u
test "Should fail storing passed the quota":
let
blk = bt.Block.new('a'.repeat(200).toBytes).tryGet()
repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 100)
let blk = createTestBlock(300)
check repo.totalUsed == 0
expect QuotaUsedError:
(await repo.putBlock(blk)).tryGet
test "Should reserve bytes":
let
blk = bt.Block.new('a'.repeat(100).toBytes).tryGet()
repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200)
let blk = createTestBlock(100)
check repo.totalUsed == 0
(await repo.putBlock(blk)).tryGet
@ -104,9 +109,7 @@ suite "Test RepoStore Quota":
uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u
test "Should not reserve bytes over max quota":
let
blk = bt.Block.new('a'.repeat(100).toBytes).tryGet()
repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200)
let blk = createTestBlock(100)
check repo.totalUsed == 0
(await repo.putBlock(blk)).tryGet
@ -124,9 +127,7 @@ suite "Test RepoStore Quota":
discard (await metaDs.get(QuotaReservedKey)).tryGet
test "Should release bytes":
let
blk = bt.Block.new('a'.repeat(100).toBytes).tryGet()
repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200)
let blk = createTestBlock(100)
check repo.totalUsed == 0
(await repo.reserve(100)).tryGet
@ -141,9 +142,6 @@ suite "Test RepoStore Quota":
uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 0'u
test "Should not release bytes less than quota":
let
repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200)
check repo.totalUsed == 0
(await repo.reserve(100)).tryGet
check repo.totalUsed == 100
@ -157,12 +155,110 @@ suite "Test RepoStore Quota":
repo.quotaReservedBytes == 100
uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u
proc queryMetaDs(key: Key): Future[seq[QueryResponse]] {.async.} =
let
query = Query.init(key)
responseIter = (await metaDs.query(query)).tryGet
response = (await allFinished(toSeq(responseIter)))
.mapIt(it.read.tryGet)
.filterIt(it.key.isSome)
return response
test "Should store block expiration timestamp":
let
duration = 10.seconds
blk = createTestBlock(100)
let
expectedExpiration: SecondsSince1970 = 123 + 10
expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet
(await repo.putBlock(blk, duration.some)).tryGet
let response = await queryMetaDs(expectedKey)
check:
response.len == 1
response[0].key.get == expectedKey
response[0].data == expectedExpiration.toBytes
test "Should store block with default expiration timestamp when not provided":
let
blk = createTestBlock(100)
let
expectedExpiration: SecondsSince1970 = 123 + DefaultBlockTtl.seconds
expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet
(await repo.putBlock(blk)).tryGet
let response = await queryMetaDs(expectedKey)
check:
response.len == 1
response[0].key.get == expectedKey
response[0].data == expectedExpiration.toBytes
test "delBlock should remove expiration metadata":
let
blk = createTestBlock(100)
expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet
(await repo.putBlock(blk, 10.seconds.some)).tryGet
(await repo.delBlock(blk.cid)).tryGet
let response = await queryMetaDs(expectedKey)
check:
response.len == 0
test "Should retrieve block expiration information":
proc unpack(beIter: Future[?!BlockExpirationIter]): Future[seq[BlockExpiration]] {.async.} =
var expirations = newSeq[BlockExpiration](0)
without iter =? (await beIter), err:
return expirations
for be in toSeq(iter):
if value =? (await be):
expirations.add(value)
return expirations
let
duration = 10.seconds
blk1 = createTestBlock(10)
blk2 = createTestBlock(11)
blk3 = createTestBlock(12)
let
expectedExpiration: SecondsSince1970 = 123 + 10
proc assertExpiration(be: BlockExpiration, expectedBlock: bt.Block) =
check:
be.cid == expectedBlock.cid
be.expiration == expectedExpiration
(await repo.putBlock(blk1, duration.some)).tryGet
(await repo.putBlock(blk2, duration.some)).tryGet
(await repo.putBlock(blk3, duration.some)).tryGet
let
blockExpirations1 = await unpack(repo.getBlockExpirations(maxNumber=2, offset=0))
blockExpirations2 = await unpack(repo.getBlockExpirations(maxNumber=2, offset=2))
check blockExpirations1.len == 2
assertExpiration(blockExpirations1[0], blk2)
assertExpiration(blockExpirations1[1], blk1)
check blockExpirations2.len == 1
assertExpiration(blockExpirations2[0], blk3)
commonBlockStoreTests(
"RepoStore Sql backend", proc: BlockStore =
BlockStore(
RepoStore.new(
SQLiteDatastore.new(Memory).tryGet(),
SQLiteDatastore.new(Memory).tryGet())))
SQLiteDatastore.new(Memory).tryGet(),
MockClock.new())))
const
path = currentSourcePath().parentDir / "test"
@ -181,6 +277,7 @@ commonBlockStoreTests(
BlockStore(
RepoStore.new(
FSDatastore.new(path, depth).tryGet(),
SQLiteDatastore.new(Memory).tryGet())),
SQLiteDatastore.new(Memory).tryGet(),
MockClock.new())),
before = before,
after = after)

25
tests/codex/testclock.nim Normal file
View File

@ -0,0 +1,25 @@
import std/unittest
import codex/clock
suite "Clock":
proc testConversion(seconds: SecondsSince1970) =
let asBytes = seconds.toBytes
let restored = asBytes.toSecondsSince1970
check restored == seconds
test "SecondsSince1970 should support bytes conversions":
let secondsToTest: seq[int64] = @[
int64.high,
int64.low,
0,
1,
12345,
-1,
-12345
]
for seconds in secondsToTest:
testConversion(seconds)

View File

@ -1,4 +1,5 @@
import ./stores/testcachestore
import ./stores/testrepostore
import ./stores/testmaintenance
{.warning[UnusedImport]: off.}

View File

@ -0,0 +1,13 @@
import std/times
import std/unittest
import codex/systemclock
suite "SystemClock":
test "Should get now":
let clock = SystemClock.new()
let expectedNow = times.now().utc
let now = clock.now()
check now == expectedNow.toTime().toUnix()

View File

@ -3,5 +3,6 @@ import ./utils/teststatemachineasync
import ./utils/testoptionalcast
import ./utils/testkeyutils
import ./utils/testasyncstatemachine
import ./utils/testtimer
{.warning[UnusedImport]: off.}

View File

@ -0,0 +1,85 @@
## Nim-Codex
## Copyright (c) 2023 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/questionable
import pkg/chronos
import pkg/asynctest
import codex/utils/timer
import ../helpers/eventually
suite "Timer":
var timer1: Timer
var timer2: Timer
var output: string
var numbersState = 0
var lettersState = 'a'
proc numbersCallback(): Future[void] {.async.} =
output &= $numbersState
inc numbersState
proc lettersCallback(): Future[void] {.async.} =
output &= $lettersState
inc lettersState
proc exceptionCallback(): Future[void] {.async.} =
raise newException(CatchableError, "Test Exception")
proc startNumbersTimer() =
timer1.start(numbersCallback, 10.milliseconds)
proc startLettersTimer() =
timer2.start(lettersCallback, 10.milliseconds)
setup:
timer1 = Timer.new()
timer2 = Timer.new()
output = ""
numbersState = 0
lettersState = 'a'
teardown:
await timer1.stop()
await timer2.stop()
test "Start timer1 should execute callback":
startNumbersTimer()
check eventually output == "0"
test "Start timer1 should execute callback multiple times":
startNumbersTimer()
check eventually output == "012"
test "Starting timer1 multiple times has no impact":
startNumbersTimer()
startNumbersTimer()
startNumbersTimer()
check eventually output == "01234"
test "Stop timer1 should stop execution of the callback":
startNumbersTimer()
check eventually output == "012"
await timer1.stop()
await sleepAsync(30.milliseconds)
let stoppedOutput = output
await sleepAsync(30.milliseconds)
check output == stoppedOutput
test "Exceptions raised in timer callback are handled":
timer1.start(exceptionCallback, 10.milliseconds)
await sleepAsync(30.milliseconds)
await timer1.stop()
test "Starting both timers should execute callbacks sequentially":
startNumbersTimer()
startLettersTimer()
check eventually output == "0a1b2c3d4e"

View File

@ -0,0 +1,77 @@
import std/osproc
import std/os
import std/httpclient
import std/strutils
import std/times
import pkg/chronos
import ../ethertest
import ../contracts/time
import ../codex/helpers/eventually
import ./nodes
import ./tokens
ethersuite "Node block expiration tests":
var node: NodeProcess
var baseurl: string
let dataDir = getTempDir() / "Codex1"
let content = "test file content"
setup:
baseurl = "http://localhost:8080/api/codex/v1"
teardown:
node.stop()
dataDir.removeDir()
proc startTestNode(blockTtlSeconds: int) =
node = startNode([
"--api-port=8080",
"--data-dir=" & dataDir,
"--nat=127.0.0.1",
"--disc-ip=127.0.0.1",
"--disc-port=8090",
"--block-ttl=" & $blockTtlSeconds,
"--block-mi=3",
"--block-mn=10"
], debug = false)
proc uploadTestFile(): string =
let client = newHttpClient()
let uploadUrl = baseurl & "/upload"
let uploadResponse = client.post(uploadUrl, content)
check uploadResponse.status == "200 OK"
client.close()
uploadResponse.body
proc downloadTestFile(contentId: string): Response =
let client = newHttpClient(timeout=3000)
let downloadUrl = baseurl & "/download/" & contentId
let content = client.get(downloadUrl)
client.close()
content
test "node retains not-expired file":
startTestNode(blockTtlSeconds = 60 * 60 * 1)
let contentId = uploadTestFile()
await sleepAsync(10 * 1000)
let response = downloadTestFile(contentId)
check:
response.status == "200 OK"
response.body == content
test "node deletes expired file":
startTestNode(blockTtlSeconds = 5)
let contentId = uploadTestFile()
await sleepAsync(10 * 1000)
expect TimeoutError:
discard downloadTestFile(contentId)

View File

@ -11,6 +11,8 @@ import ./codex/testsales
import ./codex/testerasure
import ./codex/testproving
import ./codex/testutils
import ./codex/testclock
import ./codex/testsystemclock
# to check that everything compiles
import ../codex

View File

@ -9,6 +9,7 @@ import ./ethertest
import ./contracts/time
import ./integration/nodes
import ./integration/tokens
import ./integration/testblockexpiration
import ./codex/helpers/eventually
ethersuite "Integration tests":