From 65a471802d07535e1894c3fe642fa15935cf2b80 Mon Sep 17 00:00:00 2001 From: benbierens Date: Mon, 13 Mar 2023 13:05:02 +0100 Subject: [PATCH] Setting up memory store --- codex/stores.nim | 3 +- codex/stores/cachestore.nim | 166 +++------------- codex/stores/memorystore.nim | 180 ++++++++++++++++++ tests/codex/helpers/nodeutils.nim | 2 +- tests/codex/stores/testmemorystore.nim | 252 +++++++++++++++++++++++++ tests/codex/teststores.nim | 1 + 6 files changed, 460 insertions(+), 144 deletions(-) create mode 100644 codex/stores/memorystore.nim create mode 100644 tests/codex/stores/testmemorystore.nim diff --git a/codex/stores.nim b/codex/stores.nim index 48a0df79..a2c7acd3 100644 --- a/codex/stores.nim +++ b/codex/stores.nim @@ -2,7 +2,8 @@ import ./stores/cachestore import ./stores/blockstore import ./stores/networkstore import ./stores/repostore +import ./stores/memorystore import ./stores/maintenance import ./stores/keyutils -export cachestore, blockstore, networkstore, repostore, maintenance, keyutils +export cachestore, blockstore, networkstore, repostore, memorystore, maintenance, keyutils diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index c7929ff0..66861035 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -32,165 +32,52 @@ logScope: type CacheStore* = ref object of BlockStore - currentSize*: Natural # in bytes - size*: Positive # in bytes + backingStore: BlockStore cache: LruCache[Cid, Block] - InvalidBlockSize* = object of CodexError - const - MiB* = 1024 * 1024 # bytes, 1 mebibyte = 1,048,576 bytes + MiB* = 1024 * 1024 DefaultCacheSizeMiB* = 5 - DefaultCacheSize* = DefaultCacheSizeMiB * MiB # bytes + DefaultCacheSize* = DefaultCacheSizeMiB * MiB method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} = - ## Get a block from the stores - ## - - trace "Getting block from cache", cid - if cid.isEmpty: trace "Empty block, ignoring" - return success cid.emptyBlock + return success(cid.emptyBlock) if cid notin self.cache: - return failure (ref BlockNotFoundError)(msg: "Block not in cache") + without blk =? await self.backingStore.getBlock(cid), err: + return failure(err) + self.cache[blk.cid] = blk + return success(blk) - try: - return success self.cache[cid] - except CatchableError as exc: - trace "Error requesting block from cache", cid, error = exc.msg - return failure exc + trace "Returning block from cache" + return success self.cache[cid] -method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} = - ## Check if the block exists in the blockstore - ## - - trace "Checking CacheStore for block presence", cid - if cid.isEmpty: - trace "Empty block, ignoring" - return true.success - - return (cid in self.cache).success - -func cids(self: CacheStore): (iterator: Cid {.gcsafe.}) = - return iterator(): Cid = - for cid in self.cache.keys: - yield cid +method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] = + self.backingStore.hasBlock(cid) method listBlocks*( self: CacheStore, - blockType = BlockType.Manifest): Future[?!BlocksIter] {.async.} = - ## Get the list of blocks in the BlockStore. This is an intensive operation - ## - - var - iter = BlocksIter() - - let - cids = self.cids() - - proc next(): Future[?Cid] {.async.} = - await idleAsync() - - var cid: Cid - while true: - if iter.finished: - return Cid.none - - cid = cids() - - if finished(cids): - iter.finished = true - return Cid.none - - without isManifest =? cid.isManifest, err: - trace "Error checking if cid is a manifest", err = err.msg - return Cid.none - - case blockType: - of BlockType.Manifest: - if not isManifest: - trace "Cid is not manifest, skipping", cid - continue - - break - of BlockType.Block: - if isManifest: - trace "Cid is a manifest, skipping", cid - continue - - break - of BlockType.Both: - break - - return cid.some - - iter.next = next - - return success iter - -func putBlockSync(self: CacheStore, blk: Block): bool = - - let blkSize = blk.data.len # in bytes - - if blkSize > self.size: - trace "Block size is larger than cache size", blk = blkSize, cache = self.size - return false - - while self.currentSize + blkSize > self.size: - try: - let removed = self.cache.removeLru() - self.currentSize -= removed.data.len - except EmptyLruCacheError as exc: - # if the cache is empty, can't remove anything, so break and add item - # to the cache - trace "Exception puting block to cache", exc = exc.msg - break - - self.cache[blk.cid] = blk - self.currentSize += blkSize - return true + blockType = BlockType.Manifest): Future[?!BlocksIter] = + self.backingStore.listBlocks(blockType) method putBlock*( self: CacheStore, blk: Block, - ttl = Duration.none): Future[?!void] {.async.} = - ## Put a block to the blockstore - ## + ttl = Duration.none): Future[?!void] = + self.backingStore.putBlock(blk, ttl) - trace "Storing block in cache", cid = blk.cid - if blk.isEmpty: - trace "Empty block, ignoring" - return success() +method delBlock*(self: CacheStore, cid: Cid): Future[?!void] = + discard self.cache.del(cid) + self.backingStore.delBlock(cid) - discard self.putBlockSync(blk) - return success() - -method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} = - ## Delete a block from the blockstore - ## - - trace "Deleting block from cache", cid - if cid.isEmpty: - trace "Empty block, ignoring" - return success() - - let removed = self.cache.del(cid) - if removed.isSome: - self.currentSize -= removed.get.data.len - - return success() - -method close*(self: CacheStore): Future[void] {.async.} = - ## Close the blockstore, a no-op for this implementation - ## - - discard +method close*(self: CacheStore): Future[void] = + self.backingStore.close() func new*( _: type CacheStore, - blocks: openArray[Block] = [], + backingStore: BlockStore, cacheSize: Positive = DefaultCacheSize, # in bytes chunkSize: Positive = DefaultChunkSize # in bytes ): CacheStore {.raises: [Defect, ValueError].} = @@ -198,16 +85,11 @@ func new*( if cacheSize < chunkSize: raise newException(ValueError, "cacheSize cannot be less than chunkSize") - var currentSize = 0 let size = cacheSize div chunkSize cache = newLruCache[Cid, Block](size) store = CacheStore( - cache: cache, - currentSize: currentSize, - size: cacheSize) - - for blk in blocks: - discard store.putBlockSync(blk) + backingStore: backingStore, + cache: cache) return store diff --git a/codex/stores/memorystore.nim b/codex/stores/memorystore.nim new file mode 100644 index 00000000..92ce5e1d --- /dev/null +++ b/codex/stores/memorystore.nim @@ -0,0 +1,180 @@ +## Nim-Codex +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/upraises + +push: {.upraises: [].} + +import std/options +import std/tables + +import pkg/chronicles +import pkg/chronos +import pkg/libp2p +import pkg/questionable +import pkg/questionable/results + +import ./blockstore +import ../chunker +import ../errors +import ../manifest + +export blockstore + +logScope: + topics = "codex memorystore" + +type + MemoryStore* = ref object of BlockStore + bytesUsed*: int + capacity*: int + table: Table[Cid, Block] + + InvalidBlockSize* = object of CodexError + +const + MiB* = 1024 * 1024 # bytes, 1 mebibyte = 1,048,576 bytes + DefaultCacheSizeMiB* = 5 + DefaultCacheSize* = DefaultCacheSizeMiB * MiB # bytes + +method getBlock*(self: MemoryStore, cid: Cid): Future[?!Block] {.async.} = + trace "Getting block from cache", cid + if cid.isEmpty: + trace "Empty block, ignoring" + return success cid.emptyBlock + + if cid notin self.table: + return failure (ref BlockNotFoundError)(msg: "Block not in memory store") + + try: + return success self.table[cid] + except CatchableError as exc: + trace "Error getting block from memory store", cid, error = exc.msg + return failure exc + +method hasBlock*(self: MemoryStore, cid: Cid): Future[?!bool] {.async.} = + trace "Checking MemoryStore for block presence", cid + if cid.isEmpty: + trace "Empty block, ignoring" + return true.success + + return (cid in self.table).success + +func cids(self: MemoryStore): (iterator: Cid {.gcsafe.}) = + return iterator(): Cid = + for cid in self.table.keys: + yield cid + +method listBlocks*(self: MemoryStore, blockType = BlockType.Manifest): Future[?!BlocksIter] {.async.} = + var + iter = BlocksIter() + + let + cids = self.cids() + + proc next(): Future[?Cid] {.async.} = + await idleAsync() + + var cid: Cid + while true: + if iter.finished: + return Cid.none + + cid = cids() + + if finished(cids): + iter.finished = true + return Cid.none + + without isManifest =? cid.isManifest, err: + trace "Error checking if cid is a manifest", err = err.msg + return Cid.none + + case blockType: + of BlockType.Manifest: + if not isManifest: + trace "Cid is not manifest, skipping", cid + continue + + break + of BlockType.Block: + if isManifest: + trace "Cid is a manifest, skipping", cid + continue + + break + of BlockType.Both: + break + + return cid.some + + iter.next = next + + return success iter + +proc getFreeCapacity(self: MemoryStore): int = + self.capacity - self.bytesUsed + +func putBlockSync(self: MemoryStore, blk: Block): ?!void = + let + freeCapacity = self.getFreeCapacity() + blkSize = blk.data.len + + if blkSize > freeCapacity: + trace "Block size is larger than free capacity", blk = blkSize, freeCapacity + return failure("Unable to store block: Memory store capacity exceeded.") + + self.table[blk.cid] = blk + self.bytesUsed += blkSize + return success() + +method putBlock*(self: MemoryStore, blk: Block, ttl = Duration.none): Future[?!void] {.async.} = + trace "Storing block in store", cid = blk.cid + if blk.isEmpty: + trace "Empty block, ignoring" + return success() + + return self.putBlockSync(blk) + +method delBlock*(self: MemoryStore, cid: Cid): Future[?!void] {.async.} = + trace "Deleting block from memory store", cid + if cid.isEmpty: + trace "Empty block, ignoring" + return success() + + without toRemove =? await self.getBlock(cid), err: + trace "Unable to find block to remove" + return failure(err) + + self.table.del(cid) + self.bytesUsed -= toRemove.data.len + + return success() + +method close*(self: MemoryStore): Future[void] {.async.} = + discard + +func new*( + _: type MemoryStore, + blocks: openArray[Block] = [], + capacity: Positive = DefaultCacheSize, # in bytes + chunkSize: Positive = DefaultChunkSize # in bytes + ): MemoryStore {.raises: [Defect, ValueError].} = + if capacity < chunkSize: + raise newException(ValueError, "capacity cannot be less than chunkSize") + + let store = MemoryStore( + table: initTable[Cid, Block](), + bytesUsed: 0, + capacity: capacity) + + # for blk in blocks: + # discard store.putBlockSync(blk) + + return store diff --git a/tests/codex/helpers/nodeutils.nim b/tests/codex/helpers/nodeutils.nim index 081de7f9..7f57f494 100644 --- a/tests/codex/helpers/nodeutils.nim +++ b/tests/codex/helpers/nodeutils.nim @@ -35,7 +35,7 @@ proc generateNodes*( .expect("Should return multiaddress")]) wallet = WalletRef.example network = BlockExcNetwork.new(switch) - localStore = CacheStore.new(blocks.mapIt( it )) + localStore = MemoryStore.new(blocks.mapIt( it )) peerStore = PeerCtxStore.new() pendingBlocks = PendingBlocksManager.new() blockDiscovery = DiscoveryEngine.new(localStore, peerStore, network, discovery, pendingBlocks) diff --git a/tests/codex/stores/testmemorystore.nim b/tests/codex/stores/testmemorystore.nim new file mode 100644 index 00000000..5d4d132a --- /dev/null +++ b/tests/codex/stores/testmemorystore.nim @@ -0,0 +1,252 @@ +import std/os +import std/strutils +import std/sequtils + +import pkg/questionable +import pkg/questionable/results + +import pkg/chronos +import pkg/asynctest +import pkg/libp2p +import pkg/stew/byteutils +import pkg/stew/endians2 +import pkg/datastore + +import pkg/codex/stores/cachestore +import pkg/codex/chunker +import pkg/codex/stores +import pkg/codex/blocktype as bt +import pkg/codex/clock + +import ../helpers +import ../helpers/mockclock +import ./commonstoretests + +proc createTestBlock(size: int): bt.Block = + bt.Block.new('a'.repeat(size).toBytes).tryGet() + +suite "MemoryStore": + var + initialBlock: bt.Block + + repo: MemoryStore + + let + capacity = 100 + chunkSize = 10 + + setup: + initialBlock = createTestBlock(chunkSize) + + repo = MemoryStore.new([initialBlock], capacity, chunkSize) + + # teardown: + # discard + + # test "Should update current used bytes on block put": + # let blk = createTestBlock(200) + + # check repo.quotaUsedBytes == 0 + # (await repo.putBlock(blk)).tryGet + + # check: + # repo.quotaUsedBytes == 200 + # uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 200'u + + # test "Should update current used bytes on block delete": + # let blk = createTestBlock(100) + + # check repo.quotaUsedBytes == 0 + # (await repo.putBlock(blk)).tryGet + # check repo.quotaUsedBytes == 100 + + # (await repo.delBlock(blk.cid)).tryGet + + # check: + # repo.quotaUsedBytes == 0 + # uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 0'u + + # test "Should not update current used bytes if block exist": + # let blk = createTestBlock(100) + + # check repo.quotaUsedBytes == 0 + # (await repo.putBlock(blk)).tryGet + # check repo.quotaUsedBytes == 100 + + # # put again + # (await repo.putBlock(blk)).tryGet + # check repo.quotaUsedBytes == 100 + + # check: + # uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 100'u + + # test "Should fail storing passed the quota": + # let blk = createTestBlock(300) + + # check repo.totalUsed == 0 + # expect QuotaUsedError: + # (await repo.putBlock(blk)).tryGet + + # test "Should reserve bytes": + # let blk = createTestBlock(100) + + # check repo.totalUsed == 0 + # (await repo.putBlock(blk)).tryGet + # check repo.totalUsed == 100 + + # (await repo.reserve(100)).tryGet + + # check: + # repo.totalUsed == 200 + # repo.quotaUsedBytes == 100 + # repo.quotaReservedBytes == 100 + # uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u + + # test "Should not reserve bytes over max quota": + # let blk = createTestBlock(100) + + # check repo.totalUsed == 0 + # (await repo.putBlock(blk)).tryGet + # check repo.totalUsed == 100 + + # expect QuotaNotEnoughError: + # (await repo.reserve(101)).tryGet + + # check: + # repo.totalUsed == 100 + # repo.quotaUsedBytes == 100 + # repo.quotaReservedBytes == 0 + + # expect DatastoreKeyNotFound: + # discard (await metaDs.get(QuotaReservedKey)).tryGet + + # test "Should release bytes": + # discard createTestBlock(100) + + # check repo.totalUsed == 0 + # (await repo.reserve(100)).tryGet + # check repo.totalUsed == 100 + + # (await repo.release(100)).tryGet + + # check: + # repo.totalUsed == 0 + # repo.quotaUsedBytes == 0 + # repo.quotaReservedBytes == 0 + # uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 0'u + + # test "Should not release bytes less than quota": + # check repo.totalUsed == 0 + # (await repo.reserve(100)).tryGet + # check repo.totalUsed == 100 + + # expect CatchableError: + # (await repo.release(101)).tryGet + + # check: + # repo.totalUsed == 100 + # repo.quotaUsedBytes == 0 + # repo.quotaReservedBytes == 100 + # uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u + + # proc queryMetaDs(key: Key): Future[seq[QueryResponse]] {.async.} = + # let + # query = Query.init(key) + # responseIter = (await metaDs.query(query)).tryGet + # response = (await allFinished(toSeq(responseIter))) + # .mapIt(it.read.tryGet) + # .filterIt(it.key.isSome) + # return response + + # test "Should store block expiration timestamp": + # let + # duration = 10.seconds + # blk = createTestBlock(100) + + # let + # expectedExpiration: SecondsSince1970 = 123 + 10 + # expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet + + # (await repo.putBlock(blk, duration.some)).tryGet + + # let response = await queryMetaDs(expectedKey) + + # check: + # response.len == 1 + # !response[0].key == expectedKey + # response[0].data == expectedExpiration.toBytes + + # test "Should store block with default expiration timestamp when not provided": + # let + # blk = createTestBlock(100) + + # let + # expectedExpiration: SecondsSince1970 = 123 + DefaultBlockTtl.seconds + # expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet + + # (await repo.putBlock(blk)).tryGet + + # let response = await queryMetaDs(expectedKey) + + # check: + # response.len == 1 + # !response[0].key == expectedKey + # response[0].data == expectedExpiration.toBytes + + # test "delBlock should remove expiration metadata": + # let + # blk = createTestBlock(100) + # expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet + + # (await repo.putBlock(blk, 10.seconds.some)).tryGet + # (await repo.delBlock(blk.cid)).tryGet + + # let response = await queryMetaDs(expectedKey) + + # check: + # response.len == 0 + + # test "Should retrieve block expiration information": + # proc unpack(beIter: Future[?!BlockExpirationIter]): Future[seq[BlockExpiration]] {.async.} = + # var expirations = newSeq[BlockExpiration](0) + # without iter =? (await beIter), err: + # return expirations + # for be in toSeq(iter): + # if value =? (await be): + # expirations.add(value) + # return expirations + + # let + # duration = 10.seconds + # blk1 = createTestBlock(10) + # blk2 = createTestBlock(11) + # blk3 = createTestBlock(12) + + # let + # expectedExpiration: SecondsSince1970 = 123 + 10 + + # proc assertExpiration(be: BlockExpiration, expectedBlock: bt.Block) = + # check: + # be.cid == expectedBlock.cid + # be.expiration == expectedExpiration + + + # (await repo.putBlock(blk1, duration.some)).tryGet + # (await repo.putBlock(blk2, duration.some)).tryGet + # (await repo.putBlock(blk3, duration.some)).tryGet + + # let + # blockExpirations1 = await unpack(repo.getBlockExpirations(maxNumber=2, offset=0)) + # blockExpirations2 = await unpack(repo.getBlockExpirations(maxNumber=2, offset=2)) + + # check blockExpirations1.len == 2 + # assertExpiration(blockExpirations1[0], blk2) + # assertExpiration(blockExpirations1[1], blk1) + + # check blockExpirations2.len == 1 + # assertExpiration(blockExpirations2[0], blk3) + +commonBlockStoreTests( + "MemoryStore", proc: BlockStore = + MemoryStore.new([]) +) diff --git a/tests/codex/teststores.nim b/tests/codex/teststores.nim index 3aad3ef3..834565ec 100644 --- a/tests/codex/teststores.nim +++ b/tests/codex/teststores.nim @@ -1,5 +1,6 @@ import ./stores/testcachestore import ./stores/testrepostore import ./stores/testmaintenance +import ./stores/testmemorystore {.warning[UnusedImport]: off.}