From 9c8a59d150a732726c3205e55a3f163823c290a6 Mon Sep 17 00:00:00 2001 From: Ben Bierens <39762930+benbierens@users.noreply.github.com> Date: Wed, 8 Mar 2023 16:04:54 +0100 Subject: [PATCH] Blockstore maintenance (#347) * setting up * Implements timer utility * applies async and cancellation to timer loop * Sets up mocktimer and mockblockstore to set up first maintenance module test * wip: first test that calls blockChecker * wip: adding user type to timer callback * Chronicles doesn't like type-arguments? Disables logging in timer module for now * Implementing block check test for blockMaintainer module * Sets up additional tests for blockmaintainer * Removes generic from timer module. Implements numberOfBlocks per interval in blockMaintainer. * Implements blockMaintainer * Sets up tests for blockChecker * Some comments by Mark * Cleanup repostore tests * Setting up the use of std/times for block TTL tracking * repostore adds expiration timestamp * Defaults the repostore clock to the system clock * Applies updates to repostore interface. * Implements retrieving of block expiration information from repostore * Sets up tests for maintenance module behavior * Implements block maintenance module * Wires maintenance module into codex. Sets up integration tests for block expiration * Sets up test for missing behavior: removing timestamp metadata on block delete * Implements removing of expiration metadata in repoStore * Fixes integration tests for block expiration * Adds block expiration tests to integration test run * Handled some comments by Dmitriy * Review comment by Dmitriy: Removes seq[cid] from runBlockCheck * Review comment by Dmitriy: Moves key formatting methods to keyutils. * Review comment by Dmitriy: Encodes durations using chronos * Fixes conversion of TTL type in conf. * Review comments by Dmitriy * Adds unit tests for keyUtils. * Adds test coverage for exception in maintenance module --- codex/clock.nim | 12 +- codex/codex.nim | 16 +- codex/conf.nim | 22 ++- codex/stores.nim | 4 +- codex/stores/keyutils.nim | 44 +++++ codex/stores/maintenance.nim | 92 +++++++++++ codex/stores/repostore.nim | 140 ++++++++++------ codex/systemclock.nim | 10 ++ codex/utils/keyutils.nim | 2 + codex/utils/timer.nim | 50 ++++++ tests/codex/helpers/mockrepostore.nim | 57 +++++++ tests/codex/helpers/mocktimer.nim | 36 ++++ tests/codex/stores/testkeyutils.nim | 94 +++++++++++ tests/codex/stores/testmaintenance.nim | 190 ++++++++++++++++++++++ tests/codex/stores/testrepostore.nim | 157 ++++++++++++++---- tests/codex/testclock.nim | 25 +++ tests/codex/teststores.nim | 1 + tests/codex/testsystemclock.nim | 13 ++ tests/codex/testutils.nim | 1 + tests/codex/utils/testtimer.nim | 85 ++++++++++ tests/integration/testblockexpiration.nim | 77 +++++++++ tests/testCodex.nim | 2 + tests/testIntegration.nim | 1 + 23 files changed, 1042 insertions(+), 89 deletions(-) create mode 100644 codex/stores/keyutils.nim create mode 100644 codex/stores/maintenance.nim create mode 100644 codex/systemclock.nim create mode 100644 codex/utils/timer.nim create mode 100644 tests/codex/helpers/mockrepostore.nim create mode 100644 tests/codex/helpers/mocktimer.nim create mode 100644 tests/codex/stores/testkeyutils.nim create mode 100644 tests/codex/stores/testmaintenance.nim create mode 100644 tests/codex/testclock.nim create mode 100644 tests/codex/testsystemclock.nim create mode 100644 tests/codex/utils/testtimer.nim create mode 100644 tests/integration/testblockexpiration.nim diff --git a/codex/clock.nim b/codex/clock.nim index ab40eeb4..7cfebc25 100644 --- a/codex/clock.nim +++ b/codex/clock.nim @@ -1,11 +1,13 @@ import pkg/chronos +import pkg/stew/endians2 +import pkg/upraises type Clock* = ref object of RootObj SecondsSince1970* = int64 Timeout* = object of CatchableError -method now*(clock: Clock): SecondsSince1970 {.base.} = +method now*(clock: Clock): SecondsSince1970 {.base, upraises: [].} = raiseAssert "not implemented" proc waitUntil*(clock: Clock, time: SecondsSince1970) {.async.} = @@ -23,3 +25,11 @@ proc withTimeout*(future: Future[void], if not future.completed: await future.cancelAndWait() raise newException(Timeout, "Timed out") + +proc toBytes*(i: SecondsSince1970): seq[byte] = + let asUint = cast[uint64](i) + @(asUint.toBytes) + +proc toSecondsSince1970*(bytes: seq[byte]): SecondsSince1970 = + let asUint = uint64.fromBytes(bytes) + cast[int64](asUint) diff --git a/codex/codex.nim b/codex/codex.nim index 63560fe1..8e761961 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -46,6 +46,7 @@ type restServer: RestServerRef codexNode: CodexNodeRef repoStore: RepoStore + maintenance: BlockMaintainer CodexPrivateKey* = libp2p.PrivateKey # alias @@ -55,6 +56,7 @@ proc start*(s: CodexServer) {.async.} = await s.repoStore.start() s.restServer.start() await s.codexNode.start() + s.maintenance.start() let # TODO: Can't define these as constants, pity @@ -90,7 +92,8 @@ proc stop*(s: CodexServer) {.async.} = await allFuturesThrowing( s.restServer.stop(), s.codexNode.stop(), - s.repoStore.start()) + s.repoStore.stop(), + s.maintenance.stop()) s.runHandle.complete() @@ -130,6 +133,7 @@ proc new*(T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey): if config.cacheSize > 0: cache = CacheStore.new(cacheSize = config.cacheSize * MiB) + ## Is unused? let discoveryDir = config.dataDir / CodexDhtNamespace @@ -161,7 +165,12 @@ proc new*(T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey): metaDs = SQLiteDatastore.new(config.dataDir / CodexMetaNamespace) .expect("Should create meta data store!"), quotaMaxBytes = config.storageQuota.uint, - blockTtl = config.blockTtl.seconds) + blockTtl = config.blockTtlSeconds.seconds) + + maintenance = BlockMaintainer.new( + repoStore, + interval = config.blockMaintenanceIntervalSeconds.seconds, + numberOfBlocksPerInterval = config.blockMaintenanceNumberOfBlocks) peerStore = PeerCtxStore.new() pendingBlocks = PendingBlocksManager.new() @@ -183,4 +192,5 @@ proc new*(T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey): config: config, codexNode: codexNode, restServer: restServer, - repoStore: repoStore) + repoStore: repoStore, + maintenance: maintenance) diff --git a/codex/conf.nim b/codex/conf.nim index f3c94d68..431f7821 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -32,7 +32,7 @@ import pkg/ethers import ./discovery import ./stores -export DefaultCacheSizeMiB, DefaultQuotaBytes, DefaultBlockTtl, net +export DefaultCacheSizeMiB, net, DefaultQuotaBytes, DefaultBlockTtl, DefaultBlockMaintenanceInterval, DefaultNumberOfBlocksToMaintainPerInterval type StartUpCommand* {.pure.} = enum @@ -158,12 +158,24 @@ type name: "storage-quota" abbr: "q" }: Natural - blockTtl* {. + blockTtlSeconds* {. desc: "Default block timeout in seconds - 0 disables the ttl" - defaultValue: DefaultBlockTtl.secs - defaultValueDesc: "86400" # 24h in secs + defaultValue: DefaultBlockTtl.seconds + defaultValueDesc: $DefaultBlockTtl name: "block-ttl" - abbr: "t" }: Natural + abbr: "t" }: int + + blockMaintenanceIntervalSeconds* {. + desc: "Time interval in seconds - determines frequency of block maintenance cycle: how often blocks are checked for expiration and cleanup." + defaultValue: DefaultBlockMaintenanceInterval.seconds + defaultValueDesc: $DefaultBlockMaintenanceInterval + name: "block-mi" }: int + + blockMaintenanceNumberOfBlocks* {. + desc: "Number of blocks to check every maintenance cycle." + defaultValue: DefaultNumberOfBlocksToMaintainPerInterval + defaultValueDesc: $DefaultNumberOfBlocksToMaintainPerInterval + name: "block-mn" }: int cacheSize* {. desc: "The size in MiB of the block cache, 0 disables the cache - might help on slow hardrives" diff --git a/codex/stores.nim b/codex/stores.nim index 1386a990..48a0df79 100644 --- a/codex/stores.nim +++ b/codex/stores.nim @@ -2,5 +2,7 @@ import ./stores/cachestore import ./stores/blockstore import ./stores/networkstore import ./stores/repostore +import ./stores/maintenance +import ./stores/keyutils -export cachestore, blockstore, networkstore, repostore +export cachestore, blockstore, networkstore, repostore, maintenance, keyutils diff --git a/codex/stores/keyutils.nim b/codex/stores/keyutils.nim new file mode 100644 index 00000000..8e438620 --- /dev/null +++ b/codex/stores/keyutils.nim @@ -0,0 +1,44 @@ +## Nim-Codex +## Copyright (c) 2022 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/upraises +push: {.upraises: [].} + +import pkg/chronicles +import pkg/questionable/results +import pkg/datastore +import pkg/libp2p +import ../namespaces +import ../manifest + +const + CodexMetaKey* = Key.init(CodexMetaNamespace).tryGet + CodexRepoKey* = Key.init(CodexRepoNamespace).tryGet + CodexBlocksKey* = Key.init(CodexBlocksNamespace).tryGet + CodexManifestKey* = Key.init(CodexManifestNamespace).tryGet + BlocksTtlKey* = Key.init(CodexBlocksTtlNamespace).tryGet + QuotaKey* = Key.init(CodexQuotaNamespace).tryGet + QuotaUsedKey* = (QuotaKey / "used").tryGet + QuotaReservedKey* = (QuotaKey / "reserved").tryGet + +func makePrefixKey*(postFixLen: int, cid: Cid): ?!Key = + let + cidKey = ? Key.init(($cid)[^postFixLen..^1] & "/" & $cid) + + if ? cid.isManifest: + success CodexManifestKey / cidKey + else: + success CodexBlocksKey / cidKey + +proc createBlockExpirationMetadataKey*(cid: Cid): ?!Key = + BlocksTtlKey / $cid + +proc createBlockExpirationMetadataQueryKey*(): ?!Key = + let queryString = ? (BlocksTtlKey / "*") + Key.init(queryString) diff --git a/codex/stores/maintenance.nim b/codex/stores/maintenance.nim new file mode 100644 index 00000000..e12edb84 --- /dev/null +++ b/codex/stores/maintenance.nim @@ -0,0 +1,92 @@ +## Nim-Codex +## Copyright (c) 2023 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +## Store maintenance module +## Looks for and removes expired blocks from blockstores. + +import pkg/chronos +import pkg/chronicles +import pkg/questionable +import pkg/questionable/results + +import ./repostore +import ../utils/timer +import ../clock +import ../systemclock + +const + DefaultBlockMaintenanceInterval* = 10.minutes + DefaultNumberOfBlocksToMaintainPerInterval* = 1000 + +type + BlockMaintainer* = ref object of RootObj + repoStore: RepoStore + interval: Duration + timer: Timer + clock: Clock + numberOfBlocksPerInterval: int + offset: int + +proc new*(T: type BlockMaintainer, + repoStore: RepoStore, + interval: Duration, + numberOfBlocksPerInterval = 100, + timer = Timer.new(), + clock: Clock = SystemClock.new() + ): T = + T( + repoStore: repoStore, + interval: interval, + numberOfBlocksPerInterval: numberOfBlocksPerInterval, + timer: timer, + clock: clock, + offset: 0) + +proc deleteExpiredBlock(self: BlockMaintainer, cid: Cid): Future[void] {.async.} = + if isErr (await self.repoStore.delBlock(cid)): + trace "Unable to delete block from repoStore" + +proc processBlockExpiration(self: BlockMaintainer, be: BlockExpiration): Future[void] {.async} = + if be.expiration < self.clock.now: + await self.deleteExpiredBlock(be.cid) + else: + inc self.offset + +proc runBlockCheck(self: BlockMaintainer): Future[void] {.async.} = + let expirations = await self.repoStore.getBlockExpirations( + maxNumber = self.numberOfBlocksPerInterval, + offset = self.offset + ) + + without iter =? expirations, err: + trace "Unable to obtain blockExpirations iterator from repoStore" + return + + var numberReceived = 0 + for maybeBeFuture in iter: + if be =? await maybeBeFuture: + inc numberReceived + await self.processBlockExpiration(be) + + # If we received fewer blockExpirations from the iterator than we asked for, + # We're at the end of the dataset and should start from 0 next time. + if numberReceived < self.numberOfBlocksPerInterval: + self.offset = 0 + +proc start*(self: BlockMaintainer) = + proc onTimer(): Future[void] {.async.} = + try: + await self.runBlockCheck() + except CatchableError as exc: + error "Unexpected exception in BlockMaintainer.onTimer(): ", msg=exc.msg + + self.timer.start(onTimer, self.interval) + +proc stop*(self: BlockMaintainer): Future[void] {.async.} = + await self.timer.stop() diff --git a/codex/stores/repostore.nim b/codex/stores/repostore.nim index e7d76fb5..18d12c46 100644 --- a/codex/stores/repostore.nim +++ b/codex/stores/repostore.nim @@ -7,6 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +import std/sequtils import pkg/upraises push: {.upraises: [].} @@ -20,9 +21,11 @@ import pkg/datastore import pkg/stew/endians2 import ./blockstore +import ./keyutils import ../blocktype import ../namespaces -import ../manifest +import ../clock +import ../systemclock export blocktype, libp2p @@ -30,22 +33,9 @@ logScope: topics = "codex repostore" const - CodexMetaKey* = Key.init(CodexMetaNamespace).tryGet - CodexRepoKey* = Key.init(CodexRepoNamespace).tryGet - CodexBlocksKey* = Key.init(CodexBlocksNamespace).tryGet - CodexManifestKey* = Key.init(CodexManifestNamespace).tryGet - - QuotaKey* = Key.init(CodexQuotaNamespace).tryGet - QuotaUsedKey* = (QuotaKey / "used").tryGet - QuotaReservedKey* = (QuotaKey / "reserved").tryGet - - BlocksTtlKey* = Key.init(CodexBlocksTtlNamespace).tryGet - DefaultBlockTtl* = 24.hours DefaultQuotaBytes* = 1'u shl 33'u # ~8GB - ZeroMoment = Moment.init(0, Nanosecond) # used for converting between Duration and Moment - type QuotaUsedError* = object of CodexError QuotaNotEnoughError* = object of CodexError @@ -54,23 +44,24 @@ type postFixLen*: int repoDs*: Datastore metaDs*: Datastore + clock: Clock quotaMaxBytes*: uint quotaUsedBytes*: uint quotaReservedBytes*: uint blockTtl*: Duration started*: bool -func makePrefixKey*(self: RepoStore, cid: Cid): ?!Key = - let - cidKey = ? Key.init(($cid)[^self.postFixLen..^1] & "/" & $cid) + BlockExpiration* = object + cid*: Cid + expiration*: SecondsSince1970 + GetNext = proc(): Future[?BlockExpiration] {.upraises: [], gcsafe, closure.} + BlockExpirationIter* = ref object + finished*: bool + next*: GetNext - if ? cid.isManifest: - success CodexManifestKey / cidKey - else: - success CodexBlocksKey / cidKey - -func makeExpiresKey(expires: Duration, cid: Cid): ?!Key = - BlocksTtlKey / $cid / $expires.seconds +iterator items*(q: BlockExpirationIter): Future[?BlockExpiration] = + while not q.finished: + yield q.next() func totalUsed*(self: RepoStore): uint = (self.quotaUsedBytes + self.quotaReservedBytes) @@ -79,7 +70,7 @@ method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} = ## Get a block from the blockstore ## - without key =? self.makePrefixKey(cid), err: + without key =? makePrefixKey(self.postFixLen, cid), err: trace "Error getting key from provider", err = err.msg return failure(err) @@ -93,6 +84,17 @@ method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} = trace "Got block for cid", cid return Block.new(cid, data) +proc getBlockExpirationTimestamp(self: RepoStore, ttl: ?Duration): SecondsSince1970 = + let duration = ttl |? self.blockTtl + self.clock.now() + duration.seconds + +proc getBlockExpirationEntry(self: RepoStore, batch: var seq[BatchEntry], cid: Cid, ttl: ?Duration): ?!BatchEntry = + without key =? createBlockExpirationMetadataKey(cid), err: + return failure(err) + + let value = self.getBlockExpirationTimestamp(ttl).toBytes + return success((key, value)) + method putBlock*( self: RepoStore, blk: Block, @@ -100,7 +102,7 @@ method putBlock*( ## Put a block to the blockstore ## - without key =? self.makePrefixKey(blk.cid), err: + without key =? makePrefixKey(self.postFixLen, blk.cid), err: trace "Error getting key from provider", err = err.msg return failure(err) @@ -115,9 +117,6 @@ method putBlock*( trace "Storing block with key", key - without var expires =? ttl: - expires = Moment.fromNow(self.blockTtl) - ZeroMoment - var batch: seq[BatchEntry] @@ -131,14 +130,10 @@ method putBlock*( trace "Updating quota", used batch.add((QuotaUsedKey, @(used.uint64.toBytesBE))) - without expiresKey =? makeExpiresKey(expires, blk.cid), err: - trace "Unable make block ttl key", - err = err.msg, cid = blk.cid, expires, expiresKey - + without blockExpEntry =? self.getBlockExpirationEntry(batch, blk.cid, ttl), err: + trace "Unable to create block expiration metadata key", err = err.msg return failure(err) - - trace "Adding expires key", expiresKey, expires - batch.add((expiresKey, @[])) + batch.add(blockExpEntry) if err =? (await self.metaDs.put(batch)).errorOption: trace "Error updating quota bytes", err = err.msg @@ -152,6 +147,21 @@ method putBlock*( self.quotaUsedBytes = used return success() +proc updateQuotaBytesUsed(self: RepoStore, blk: Block): Future[?!void] {.async.} = + let used = self.quotaUsedBytes - blk.data.len.uint + if err =? (await self.metaDs.put( + QuotaUsedKey, + @(used.uint64.toBytesBE))).errorOption: + trace "Error updating quota key!", err = err.msg + return failure(err) + self.quotaUsedBytes = used + return success() + +proc removeBlockExpirationEntry(self: RepoStore, cid: Cid): Future[?!void] {.async.} = + without key =? createBlockExpirationMetadataKey(cid), err: + return failure(err) + return await self.metaDs.delete(key) + method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = ## Delete a block from the blockstore ## @@ -159,21 +169,18 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = trace "Deleting block", cid if blk =? (await self.getBlock(cid)): - if key =? self.makePrefixKey(cid) and + if key =? makePrefixKey(self.postFixLen, cid) and err =? (await self.repoDs.delete(key)).errorOption: trace "Error deleting block!", err = err.msg return failure(err) - let - used = self.quotaUsedBytes - blk.data.len.uint + if isErr (await self.updateQuotaBytesUsed(blk)): + trace "Unable to update quote-bytes-used in metadata store" + return failure("Unable to update quote-bytes-used in metadata store") - if err =? (await self.metaDs.put( - QuotaUsedKey, - @(used.uint64.toBytesBE))).errorOption: - trace "Error updating quota key!", err = err.msg - return failure(err) - - self.quotaUsedBytes = used + if isErr (await self.removeBlockExpirationEntry(blk.cid)): + trace "Unable to remove block expiration entry from metadata store" + return failure("Unable to remove block expiration entry from metadata store") trace "Deleted block", cid, totalUsed = self.totalUsed @@ -183,7 +190,7 @@ method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} = ## Check if the block exists in the blockstore ## - without key =? self.makePrefixKey(cid), err: + without key =? makePrefixKey(self.postFixLen, cid), err: trace "Error getting key from provider", err = err.msg return failure(err) @@ -222,6 +229,40 @@ method listBlocks*( iter.next = next return success iter +proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query = + let queryKey = ? createBlockExpirationMetadataQueryKey() + success Query.init(queryKey, offset = offset, limit = maxNumber) + +method getBlockExpirations*(self: RepoStore, maxNumber: int, offset: int): Future[?!BlockExpirationIter] {.async, base.} = + without query =? createBlockExpirationQuery(maxNumber, offset), err: + trace "Unable to format block expirations query" + return failure(err) + + without queryIter =? (await self.metaDs.query(query)), err: + trace "Unable to execute block expirations query" + return failure(err) + + var iter = BlockExpirationIter() + + proc next(): Future[?BlockExpiration] {.async.} = + if not queryIter.finished: + if pair =? (await queryIter.next()) and blockKey =? pair.key: + let expirationTimestamp = pair.data + let cidResult = Cid.init(blockKey.value) + if not cidResult.isOk: + raiseAssert("Unable to parse CID from blockKey.value: " & blockKey.value & $cidResult.error) + return BlockExpiration( + cid: cidResult.get, + expiration: expirationTimestamp.toSecondsSince1970 + ).some + else: + discard await queryIter.dispose() + iter.finished = true + return BlockExpiration.none + + iter.next = next + return success iter + method close*(self: RepoStore): Future[void] {.async.} = ## Close the blockstore, cleaning up resources managed by it. ## For some implementations this may be a no-op @@ -234,7 +275,7 @@ proc hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} = ## Return false if error encountered ## - without key =? self.makePrefixKey(cid), err: + without key =? makePrefixKey(self.postFixLen, cid), err: trace "Error getting key from provider", err = err.msg return failure(err.msg) @@ -345,13 +386,14 @@ func new*( T: type RepoStore, repoDs: Datastore, metaDs: Datastore, + clock: Clock = SystemClock.new(), postFixLen = 2, quotaMaxBytes = DefaultQuotaBytes, blockTtl = DefaultBlockTtl): T = - T( repoDs: repoDs, metaDs: metaDs, + clock: clock, postFixLen: postFixLen, quotaMaxBytes: quotaMaxBytes, blockTtl: blockTtl) diff --git a/codex/systemclock.nim b/codex/systemclock.nim new file mode 100644 index 00000000..25ac4216 --- /dev/null +++ b/codex/systemclock.nim @@ -0,0 +1,10 @@ +import std/times +import pkg/upraises +import ./clock + +type + SystemClock* = ref object of Clock + +method now*(clock: SystemClock): SecondsSince1970 {.upraises: [].} = + let now = times.now().utc + now.toTime().toUnix() diff --git a/codex/utils/keyutils.nim b/codex/utils/keyutils.nim index fa24ac3d..5179a0bc 100644 --- a/codex/utils/keyutils.nim +++ b/codex/utils/keyutils.nim @@ -13,10 +13,12 @@ push: {.upraises: [].} import pkg/chronicles import pkg/questionable/results import pkg/libp2p +import pkg/datastore import ./fileutils import ../errors import ../rng +import ../namespaces const SafePermissions = {UserRead, UserWrite} diff --git a/codex/utils/timer.nim b/codex/utils/timer.nim new file mode 100644 index 00000000..39dd122d --- /dev/null +++ b/codex/utils/timer.nim @@ -0,0 +1,50 @@ +## Nim-Codex +## Copyright (c) 2023 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +## Timer +## Used to execute a callback in a loop + +import pkg/chronos +import pkg/chronicles +import pkg/upraises + +type + TimerCallback* = proc(): Future[void] {.gcsafe, upraises:[].} + Timer* = ref object of RootObj + callback: TimerCallback + interval: Duration + name: string + loopFuture: Future[void] + +proc new*(T: type Timer, timerName = "Unnamed Timer"): T = + T( + name: timerName + ) + +proc timerLoop(timer: Timer) {.async.} = + try: + while true: + await timer.callback() + await sleepAsync(timer.interval) + except CatchableError as exc: + error "Timer caught unhandled exception: ", name=timer.name, msg=exc.msg + +method start*(timer: Timer, callback: TimerCallback, interval: Duration) {.base.} = + if timer.loopFuture != nil: + return + trace "Timer starting: ", name=timer.name + timer.callback = callback + timer.interval = interval + timer.loopFuture = timerLoop(timer) + +method stop*(timer: Timer) {.async, base.} = + if timer.loopFuture != nil: + trace "Timer stopping: ", name=timer.name + await timer.loopFuture.cancelAndWait() + timer.loopFuture = nil diff --git a/tests/codex/helpers/mockrepostore.nim b/tests/codex/helpers/mockrepostore.nim new file mode 100644 index 00000000..aa7f5d70 --- /dev/null +++ b/tests/codex/helpers/mockrepostore.nim @@ -0,0 +1,57 @@ +## Nim-Codex +## Copyright (c) 2023 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/sequtils +import pkg/chronos +import pkg/libp2p +import pkg/questionable +import pkg/questionable/results +import pkg/codex/blocktype as bt + +import codex/stores/repostore + +type + MockRepoStore* = ref object of RepoStore + delBlockCids*: seq[Cid] + getBeMaxNumber*: int + getBeOffset*: int + + testBlockExpirations*: seq[BlockExpiration] + getBlockExpirationsThrows*: bool + iteratorIndex: int + +method delBlock*(self: MockRepoStore, cid: Cid): Future[?!void] {.async.} = + self.delBlockCids.add(cid) + self.testBlockExpirations = self.testBlockExpirations.filterIt(it.cid != cid) + dec self.iteratorIndex + return success() + +method getBlockExpirations*(self: MockRepoStore, maxNumber: int, offset: int): Future[?!BlockExpirationIter] {.async.} = + if self.getBlockExpirationsThrows: + raise new CatchableError + + self.getBeMaxNumber = maxNumber + self.getBeOffset = offset + + var iter = BlockExpirationIter() + iter.finished = false + + self.iteratorIndex = offset + var numberLeft = maxNumber + proc next(): Future[?BlockExpiration] {.async.} = + if numberLeft > 0 and self.iteratorIndex >= 0 and self.iteratorIndex < len(self.testBlockExpirations): + dec numberLeft + let selectedBlock = self.testBlockExpirations[self.iteratorIndex] + inc self.iteratorIndex + return selectedBlock.some + iter.finished = true + return BlockExpiration.none + + iter.next = next + return success iter diff --git a/tests/codex/helpers/mocktimer.nim b/tests/codex/helpers/mocktimer.nim new file mode 100644 index 00000000..4b11f9cd --- /dev/null +++ b/tests/codex/helpers/mocktimer.nim @@ -0,0 +1,36 @@ +## Nim-Codex +## Copyright (c) 2023 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/chronos + +import codex/utils/timer + +type + MockTimer* = ref object of Timer + startCalled*: int + stopCalled*: int + mockInterval*: Duration + mockCallback: timer.TimerCallback + +proc new*(T: type MockTimer): T = + T( + startCalled: 0, + stopCalled: 0 + ) + +method start*(mockTimer: MockTimer, callback: timer.TimerCallback, interval: Duration) = + mockTimer.mockCallback = callback + mockTimer.mockInterval = interval + inc mockTimer.startCalled + +method stop*(mockTimer: MockTimer) {.async.} = + inc mockTimer.stopCalled + +method invokeCallback*(mockTimer: MockTimer) {.async, base.} = + await mockTimer.mockCallback() diff --git a/tests/codex/stores/testkeyutils.nim b/tests/codex/stores/testkeyutils.nim new file mode 100644 index 00000000..734d33b9 --- /dev/null +++ b/tests/codex/stores/testkeyutils.nim @@ -0,0 +1,94 @@ +## Nim-Codex +## Copyright (c) 2023 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/random +import std/sequtils +import pkg/chronos +import pkg/libp2p +import pkg/asynctest +import pkg/questionable +import pkg/questionable/results +import pkg/codex/blocktype as bt +import pkg/codex/stores/repostore +import pkg/codex/clock + +import ../helpers/mocktimer +import ../helpers/mockrepostore +import ../helpers/mockclock +import ../examples + +import codex/namespaces +import codex/stores/keyutils + +proc createManifestCid(): ?!Cid = + let + length = rand(4096) + bytes = newSeqWith(length, rand(uint8)) + mcodec = multiCodec("sha2-256") + codec = multiCodec("dag-pb") + version = CIDv1 + + let hash = ? MultiHash.digest($mcodec, bytes).mapFailure + let cid = ? Cid.init(version, codec, hash).mapFailure + return success cid + +suite "KeyUtils": + test "makePrefixKey should create block key": + let length = 6 + let cid = Cid.example + let expectedPrefix = ($cid)[^length..^1] + let expectedPostfix = $cid + + let key = !makePrefixKey(length, cid).option + let namespaces = key.namespaces + + check: + namespaces.len == 4 + namespaces[0].value == CodexRepoNamespace + namespaces[1].value == "blocks" + namespaces[2].value == expectedPrefix + namespaces[3].value == expectedPostfix + + test "makePrefixKey should create manifest key": + let length = 6 + let cid = !createManifestCid().option + let expectedPrefix = ($cid)[^length..^1] + let expectedPostfix = $cid + + let key = !makePrefixKey(length, cid).option + let namespaces = key.namespaces + + check: + namespaces.len == 4 + namespaces[0].value == CodexRepoNamespace + namespaces[1].value == "manifests" + namespaces[2].value == expectedPrefix + namespaces[3].value == expectedPostfix + + test "createBlockExpirationMetadataKey should create block TTL key": + let cid = Cid.example + + let key = !createBlockExpirationMetadataKey(cid).option + let namespaces = key.namespaces + + check: + namespaces.len == 3 + namespaces[0].value == CodexMetaNamespace + namespaces[1].value == "ttl" + namespaces[2].value == $cid + + test "createBlockExpirationMetadataQueryKey should create key for all block TTL entries": + let key = !createBlockExpirationMetadataQueryKey().option + let namespaces = key.namespaces + + check: + namespaces.len == 3 + namespaces[0].value == CodexMetaNamespace + namespaces[1].value == "ttl" + namespaces[2].value == "*" diff --git a/tests/codex/stores/testmaintenance.nim b/tests/codex/stores/testmaintenance.nim new file mode 100644 index 00000000..40c18d58 --- /dev/null +++ b/tests/codex/stores/testmaintenance.nim @@ -0,0 +1,190 @@ +## Nim-Codex +## Copyright (c) 2023 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/chronos +import pkg/libp2p +import pkg/asynctest +import pkg/questionable +import pkg/questionable/results +import pkg/codex/blocktype as bt +import pkg/codex/stores/repostore +import pkg/codex/clock + +import ../helpers/mocktimer +import ../helpers/mockrepostore +import ../helpers/mockclock +import ../examples + +import codex/stores/maintenance + +suite "BlockMaintainer": + var mockRepoStore: MockRepoStore + var interval: Duration + var mockTimer: MockTimer + var mockClock: MockClock + + var blockMaintainer: BlockMaintainer + + var testBe1: BlockExpiration + var testBe2: BlockExpiration + var testBe3: BlockExpiration + + proc createTestExpiration(expiration: SecondsSince1970): BlockExpiration = + BlockExpiration( + cid: bt.Block.example.cid, + expiration: expiration + ) + + setup: + mockClock = MockClock.new() + mockClock.set(100) + + testBe1 = createTestExpiration(200) + testBe2 = createTestExpiration(300) + testBe3 = createTestExpiration(400) + + mockRepoStore = MockRepoStore.new() + mockRepoStore.testBlockExpirations.add(testBe1) + mockRepoStore.testBlockExpirations.add(testBe2) + mockRepoStore.testBlockExpirations.add(testBe3) + + interval = 1.days + mockTimer = MockTimer.new() + + blockMaintainer = BlockMaintainer.new( + mockRepoStore, + interval, + numberOfBlocksPerInterval = 2, + mockTimer, + mockClock) + + test "Start should start timer at provided interval": + blockMaintainer.start() + check mockTimer.startCalled == 1 + check mockTimer.mockInterval == interval + + test "Stop should stop timer": + await blockMaintainer.stop() + check mockTimer.stopCalled == 1 + + test "Timer callback should call getBlockExpirations on RepoStore": + blockMaintainer.start() + await mockTimer.invokeCallback() + + check: + mockRepoStore.getBeMaxNumber == 2 + mockRepoStore.getBeOffset == 0 + + test "Timer callback should handle Catachable errors": + mockRepoStore.getBlockExpirationsThrows = true + blockMaintainer.start() + await mockTimer.invokeCallback() + + test "Subsequent timer callback should call getBlockExpirations on RepoStore with offset": + blockMaintainer.start() + await mockTimer.invokeCallback() + await mockTimer.invokeCallback() + + check: + mockRepoStore.getBeMaxNumber == 2 + mockRepoStore.getBeOffset == 2 + + test "Timer callback should delete no blocks if none are expired": + blockMaintainer.start() + await mockTimer.invokeCallback() + + check: + mockRepoStore.delBlockCids.len == 0 + + test "Timer callback should delete one block if it is expired": + mockClock.set(250) + blockMaintainer.start() + await mockTimer.invokeCallback() + + check: + mockRepoStore.delBlockCids == [testBe1.cid] + + test "Timer callback should delete multiple blocks if they are expired": + mockClock.set(500) + blockMaintainer.start() + await mockTimer.invokeCallback() + + check: + mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid] + + test "After deleting a block, subsequent timer callback should decrease offset by the number of deleted blocks": + mockClock.set(250) + blockMaintainer.start() + await mockTimer.invokeCallback() + + check mockRepoStore.delBlockCids == [testBe1.cid] + + # Because one block was deleted, the offset used in the next call should be 2 minus 1. + await mockTimer.invokeCallback() + + check: + mockRepoStore.getBeMaxNumber == 2 + mockRepoStore.getBeOffset == 1 + + test "Should delete all blocks if expired, in two timer callbacks": + mockClock.set(500) + blockMaintainer.start() + await mockTimer.invokeCallback() + await mockTimer.invokeCallback() + + check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid] + + test "Iteration offset should loop": + blockMaintainer.start() + await mockTimer.invokeCallback() + check mockRepoStore.getBeOffset == 0 + + await mockTimer.invokeCallback() + check mockRepoStore.getBeOffset == 2 + + await mockTimer.invokeCallback() + check mockRepoStore.getBeOffset == 0 + + test "Should handle new blocks": + proc invokeTimerManyTimes(): Future[void] {.async.} = + for i in countUp(0, 10): + await mockTimer.invokeCallback() + + blockMaintainer.start() + await invokeTimerManyTimes() + + # no blocks have expired + check mockRepoStore.delBlockCids == [] + + mockClock.set(250) + await invokeTimerManyTimes() + # one block has expired + check mockRepoStore.delBlockCids == [testBe1.cid] + + # new blocks are added + let testBe4 = createTestExpiration(600) + let testBe5 = createTestExpiration(700) + mockRepoStore.testBlockExpirations.add(testBe4) + mockRepoStore.testBlockExpirations.add(testBe5) + + mockClock.set(500) + await invokeTimerManyTimes() + # All blocks have expired + check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid] + + mockClock.set(650) + await invokeTimerManyTimes() + # First new block has expired + check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid, testBe4.cid] + + mockClock.set(750) + await invokeTimerManyTimes() + # Second new block has expired + check mockRepoStore.delBlockCids == [testBe1.cid, testBe2.cid, testBe3.cid, testBe4.cid, testBe5.cid] + diff --git a/tests/codex/stores/testrepostore.nim b/tests/codex/stores/testrepostore.nim index 530dd2c4..46131fe4 100644 --- a/tests/codex/stores/testrepostore.nim +++ b/tests/codex/stores/testrepostore.nim @@ -1,6 +1,7 @@ import std/os import std/options import std/strutils +import std/sequtils import pkg/questionable import pkg/questionable/results @@ -16,40 +17,50 @@ import pkg/codex/stores/cachestore import pkg/codex/chunker import pkg/codex/stores import pkg/codex/blocktype as bt +import pkg/codex/clock import ../helpers +import ../helpers/mockclock import ./commonstoretests -suite "Test RepoStore Quota": - +suite "RepoStore": var repoDs: Datastore metaDs: Datastore + mockClock: MockClock + + repo: RepoStore + + let + now: SecondsSince1970 = 123 setup: repoDs = SQLiteDatastore.new(Memory).tryGet() metaDs = SQLiteDatastore.new(Memory).tryGet() + mockClock = MockClock.new() + mockClock.set(now) + + repo = RepoStore.new(repoDs, metaDs, mockClock, quotaMaxBytes = 200) teardown: (await repoDs.close()).tryGet (await metaDs.close()).tryGet + proc createTestBlock(size: int): bt.Block = + bt.Block.new('a'.repeat(size).toBytes).tryGet() + test "Should update current used bytes on block put": - let - blk = bt.Block.new('a'.repeat(100).toBytes).tryGet() - repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 100) + let blk = createTestBlock(200) check repo.quotaUsedBytes == 0 (await repo.putBlock(blk)).tryGet check: - repo.quotaUsedBytes == 100 - uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 100'u + repo.quotaUsedBytes == 200 + uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 200'u test "Should update current used bytes on block delete": - let - blk = bt.Block.new('a'.repeat(100).toBytes).tryGet() - repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 100) + let blk = createTestBlock(100) check repo.quotaUsedBytes == 0 (await repo.putBlock(blk)).tryGet @@ -62,9 +73,7 @@ suite "Test RepoStore Quota": uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 0'u test "Should not update current used bytes if block exist": - let - blk = bt.Block.new('a'.repeat(100).toBytes).tryGet() - repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) + let blk = createTestBlock(100) check repo.quotaUsedBytes == 0 (await repo.putBlock(blk)).tryGet @@ -78,18 +87,14 @@ suite "Test RepoStore Quota": uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 100'u test "Should fail storing passed the quota": - let - blk = bt.Block.new('a'.repeat(200).toBytes).tryGet() - repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 100) + let blk = createTestBlock(300) check repo.totalUsed == 0 expect QuotaUsedError: (await repo.putBlock(blk)).tryGet test "Should reserve bytes": - let - blk = bt.Block.new('a'.repeat(100).toBytes).tryGet() - repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) + let blk = createTestBlock(100) check repo.totalUsed == 0 (await repo.putBlock(blk)).tryGet @@ -104,9 +109,7 @@ suite "Test RepoStore Quota": uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u test "Should not reserve bytes over max quota": - let - blk = bt.Block.new('a'.repeat(100).toBytes).tryGet() - repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) + let blk = createTestBlock(100) check repo.totalUsed == 0 (await repo.putBlock(blk)).tryGet @@ -124,9 +127,7 @@ suite "Test RepoStore Quota": discard (await metaDs.get(QuotaReservedKey)).tryGet test "Should release bytes": - let - blk = bt.Block.new('a'.repeat(100).toBytes).tryGet() - repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) + let blk = createTestBlock(100) check repo.totalUsed == 0 (await repo.reserve(100)).tryGet @@ -141,9 +142,6 @@ suite "Test RepoStore Quota": uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 0'u test "Should not release bytes less than quota": - let - repo = RepoStore.new(repoDs, metaDs, quotaMaxBytes = 200) - check repo.totalUsed == 0 (await repo.reserve(100)).tryGet check repo.totalUsed == 100 @@ -157,12 +155,110 @@ suite "Test RepoStore Quota": repo.quotaReservedBytes == 100 uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u + proc queryMetaDs(key: Key): Future[seq[QueryResponse]] {.async.} = + let + query = Query.init(key) + responseIter = (await metaDs.query(query)).tryGet + response = (await allFinished(toSeq(responseIter))) + .mapIt(it.read.tryGet) + .filterIt(it.key.isSome) + return response + + test "Should store block expiration timestamp": + let + duration = 10.seconds + blk = createTestBlock(100) + + let + expectedExpiration: SecondsSince1970 = 123 + 10 + expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet + + (await repo.putBlock(blk, duration.some)).tryGet + + let response = await queryMetaDs(expectedKey) + + check: + response.len == 1 + response[0].key.get == expectedKey + response[0].data == expectedExpiration.toBytes + + test "Should store block with default expiration timestamp when not provided": + let + blk = createTestBlock(100) + + let + expectedExpiration: SecondsSince1970 = 123 + DefaultBlockTtl.seconds + expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet + + (await repo.putBlock(blk)).tryGet + + let response = await queryMetaDs(expectedKey) + + check: + response.len == 1 + response[0].key.get == expectedKey + response[0].data == expectedExpiration.toBytes + + test "delBlock should remove expiration metadata": + let + blk = createTestBlock(100) + expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet + + (await repo.putBlock(blk, 10.seconds.some)).tryGet + (await repo.delBlock(blk.cid)).tryGet + + let response = await queryMetaDs(expectedKey) + + check: + response.len == 0 + + test "Should retrieve block expiration information": + proc unpack(beIter: Future[?!BlockExpirationIter]): Future[seq[BlockExpiration]] {.async.} = + var expirations = newSeq[BlockExpiration](0) + without iter =? (await beIter), err: + return expirations + for be in toSeq(iter): + if value =? (await be): + expirations.add(value) + return expirations + + let + duration = 10.seconds + blk1 = createTestBlock(10) + blk2 = createTestBlock(11) + blk3 = createTestBlock(12) + + let + expectedExpiration: SecondsSince1970 = 123 + 10 + + proc assertExpiration(be: BlockExpiration, expectedBlock: bt.Block) = + check: + be.cid == expectedBlock.cid + be.expiration == expectedExpiration + + + (await repo.putBlock(blk1, duration.some)).tryGet + (await repo.putBlock(blk2, duration.some)).tryGet + (await repo.putBlock(blk3, duration.some)).tryGet + + let + blockExpirations1 = await unpack(repo.getBlockExpirations(maxNumber=2, offset=0)) + blockExpirations2 = await unpack(repo.getBlockExpirations(maxNumber=2, offset=2)) + + check blockExpirations1.len == 2 + assertExpiration(blockExpirations1[0], blk2) + assertExpiration(blockExpirations1[1], blk1) + + check blockExpirations2.len == 1 + assertExpiration(blockExpirations2[0], blk3) + commonBlockStoreTests( "RepoStore Sql backend", proc: BlockStore = BlockStore( RepoStore.new( SQLiteDatastore.new(Memory).tryGet(), - SQLiteDatastore.new(Memory).tryGet()))) + SQLiteDatastore.new(Memory).tryGet(), + MockClock.new()))) const path = currentSourcePath().parentDir / "test" @@ -181,6 +277,7 @@ commonBlockStoreTests( BlockStore( RepoStore.new( FSDatastore.new(path, depth).tryGet(), - SQLiteDatastore.new(Memory).tryGet())), + SQLiteDatastore.new(Memory).tryGet(), + MockClock.new())), before = before, after = after) diff --git a/tests/codex/testclock.nim b/tests/codex/testclock.nim new file mode 100644 index 00000000..166f333d --- /dev/null +++ b/tests/codex/testclock.nim @@ -0,0 +1,25 @@ +import std/unittest + +import codex/clock + +suite "Clock": + proc testConversion(seconds: SecondsSince1970) = + let asBytes = seconds.toBytes + + let restored = asBytes.toSecondsSince1970 + + check restored == seconds + + test "SecondsSince1970 should support bytes conversions": + let secondsToTest: seq[int64] = @[ + int64.high, + int64.low, + 0, + 1, + 12345, + -1, + -12345 + ] + + for seconds in secondsToTest: + testConversion(seconds) diff --git a/tests/codex/teststores.nim b/tests/codex/teststores.nim index 355897c6..3aad3ef3 100644 --- a/tests/codex/teststores.nim +++ b/tests/codex/teststores.nim @@ -1,4 +1,5 @@ import ./stores/testcachestore import ./stores/testrepostore +import ./stores/testmaintenance {.warning[UnusedImport]: off.} diff --git a/tests/codex/testsystemclock.nim b/tests/codex/testsystemclock.nim new file mode 100644 index 00000000..9ed00846 --- /dev/null +++ b/tests/codex/testsystemclock.nim @@ -0,0 +1,13 @@ +import std/times +import std/unittest + +import codex/systemclock + +suite "SystemClock": + test "Should get now": + let clock = SystemClock.new() + + let expectedNow = times.now().utc + let now = clock.now() + + check now == expectedNow.toTime().toUnix() diff --git a/tests/codex/testutils.nim b/tests/codex/testutils.nim index 72a6bcc7..e6b885f7 100644 --- a/tests/codex/testutils.nim +++ b/tests/codex/testutils.nim @@ -3,5 +3,6 @@ import ./utils/teststatemachineasync import ./utils/testoptionalcast import ./utils/testkeyutils import ./utils/testasyncstatemachine +import ./utils/testtimer {.warning[UnusedImport]: off.} diff --git a/tests/codex/utils/testtimer.nim b/tests/codex/utils/testtimer.nim new file mode 100644 index 00000000..481406cb --- /dev/null +++ b/tests/codex/utils/testtimer.nim @@ -0,0 +1,85 @@ +## Nim-Codex +## Copyright (c) 2023 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/questionable + +import pkg/chronos +import pkg/asynctest + +import codex/utils/timer +import ../helpers/eventually + +suite "Timer": + var timer1: Timer + var timer2: Timer + var output: string + var numbersState = 0 + var lettersState = 'a' + + proc numbersCallback(): Future[void] {.async.} = + output &= $numbersState + inc numbersState + + proc lettersCallback(): Future[void] {.async.} = + output &= $lettersState + inc lettersState + + proc exceptionCallback(): Future[void] {.async.} = + raise newException(CatchableError, "Test Exception") + + proc startNumbersTimer() = + timer1.start(numbersCallback, 10.milliseconds) + + proc startLettersTimer() = + timer2.start(lettersCallback, 10.milliseconds) + + setup: + timer1 = Timer.new() + timer2 = Timer.new() + + output = "" + numbersState = 0 + lettersState = 'a' + + teardown: + await timer1.stop() + await timer2.stop() + + test "Start timer1 should execute callback": + startNumbersTimer() + check eventually output == "0" + + test "Start timer1 should execute callback multiple times": + startNumbersTimer() + check eventually output == "012" + + test "Starting timer1 multiple times has no impact": + startNumbersTimer() + startNumbersTimer() + startNumbersTimer() + check eventually output == "01234" + + test "Stop timer1 should stop execution of the callback": + startNumbersTimer() + check eventually output == "012" + await timer1.stop() + await sleepAsync(30.milliseconds) + let stoppedOutput = output + await sleepAsync(30.milliseconds) + check output == stoppedOutput + + test "Exceptions raised in timer callback are handled": + timer1.start(exceptionCallback, 10.milliseconds) + await sleepAsync(30.milliseconds) + await timer1.stop() + + test "Starting both timers should execute callbacks sequentially": + startNumbersTimer() + startLettersTimer() + check eventually output == "0a1b2c3d4e" diff --git a/tests/integration/testblockexpiration.nim b/tests/integration/testblockexpiration.nim new file mode 100644 index 00000000..6178553a --- /dev/null +++ b/tests/integration/testblockexpiration.nim @@ -0,0 +1,77 @@ +import std/osproc +import std/os +import std/httpclient +import std/strutils +import std/times + +import pkg/chronos +import ../ethertest +import ../contracts/time +import ../codex/helpers/eventually +import ./nodes +import ./tokens + +ethersuite "Node block expiration tests": + + var node: NodeProcess + var baseurl: string + + let dataDir = getTempDir() / "Codex1" + let content = "test file content" + + setup: + baseurl = "http://localhost:8080/api/codex/v1" + + teardown: + node.stop() + + dataDir.removeDir() + + proc startTestNode(blockTtlSeconds: int) = + node = startNode([ + "--api-port=8080", + "--data-dir=" & dataDir, + "--nat=127.0.0.1", + "--disc-ip=127.0.0.1", + "--disc-port=8090", + "--block-ttl=" & $blockTtlSeconds, + "--block-mi=3", + "--block-mn=10" + ], debug = false) + + proc uploadTestFile(): string = + let client = newHttpClient() + let uploadUrl = baseurl & "/upload" + let uploadResponse = client.post(uploadUrl, content) + check uploadResponse.status == "200 OK" + client.close() + uploadResponse.body + + proc downloadTestFile(contentId: string): Response = + let client = newHttpClient(timeout=3000) + let downloadUrl = baseurl & "/download/" & contentId + let content = client.get(downloadUrl) + client.close() + content + + test "node retains not-expired file": + startTestNode(blockTtlSeconds = 60 * 60 * 1) + + let contentId = uploadTestFile() + + await sleepAsync(10 * 1000) + + let response = downloadTestFile(contentId) + check: + response.status == "200 OK" + response.body == content + + test "node deletes expired file": + startTestNode(blockTtlSeconds = 5) + + let contentId = uploadTestFile() + + await sleepAsync(10 * 1000) + + expect TimeoutError: + discard downloadTestFile(contentId) diff --git a/tests/testCodex.nim b/tests/testCodex.nim index 5ddd38da..0afc5894 100644 --- a/tests/testCodex.nim +++ b/tests/testCodex.nim @@ -11,6 +11,8 @@ import ./codex/testsales import ./codex/testerasure import ./codex/testproving import ./codex/testutils +import ./codex/testclock +import ./codex/testsystemclock # to check that everything compiles import ../codex diff --git a/tests/testIntegration.nim b/tests/testIntegration.nim index d58e461c..b428f336 100644 --- a/tests/testIntegration.nim +++ b/tests/testIntegration.nim @@ -9,6 +9,7 @@ import ./ethertest import ./contracts/time import ./integration/nodes import ./integration/tokens +import ./integration/testblockexpiration import ./codex/helpers/eventually ethersuite "Integration tests":