Setting up memory store

This commit is contained in:
benbierens 2023-03-13 13:05:02 +01:00
parent 720c372be0
commit 65a471802d
No known key found for this signature in database
GPG Key ID: FE44815D96D0A1AA
6 changed files with 460 additions and 144 deletions

View File

@ -2,7 +2,8 @@ import ./stores/cachestore
import ./stores/blockstore
import ./stores/networkstore
import ./stores/repostore
import ./stores/memorystore
import ./stores/maintenance
import ./stores/keyutils
export cachestore, blockstore, networkstore, repostore, maintenance, keyutils
export cachestore, blockstore, networkstore, repostore, memorystore, maintenance, keyutils

View File

@ -32,165 +32,52 @@ logScope:
type
CacheStore* = ref object of BlockStore
currentSize*: Natural # in bytes
size*: Positive # in bytes
backingStore: BlockStore
cache: LruCache[Cid, Block]
InvalidBlockSize* = object of CodexError
const
MiB* = 1024 * 1024 # bytes, 1 mebibyte = 1,048,576 bytes
MiB* = 1024 * 1024
DefaultCacheSizeMiB* = 5
DefaultCacheSize* = DefaultCacheSizeMiB * MiB # bytes
DefaultCacheSize* = DefaultCacheSizeMiB * MiB
method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} =
## Get a block from the stores
##
trace "Getting block from cache", cid
if cid.isEmpty:
trace "Empty block, ignoring"
return success cid.emptyBlock
return success(cid.emptyBlock)
if cid notin self.cache:
return failure (ref BlockNotFoundError)(msg: "Block not in cache")
without blk =? await self.backingStore.getBlock(cid), err:
return failure(err)
self.cache[blk.cid] = blk
return success(blk)
try:
return success self.cache[cid]
except CatchableError as exc:
trace "Error requesting block from cache", cid, error = exc.msg
return failure exc
trace "Returning block from cache"
return success self.cache[cid]
method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} =
## Check if the block exists in the blockstore
##
trace "Checking CacheStore for block presence", cid
if cid.isEmpty:
trace "Empty block, ignoring"
return true.success
return (cid in self.cache).success
func cids(self: CacheStore): (iterator: Cid {.gcsafe.}) =
return iterator(): Cid =
for cid in self.cache.keys:
yield cid
method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] =
self.backingStore.hasBlock(cid)
method listBlocks*(
self: CacheStore,
blockType = BlockType.Manifest): Future[?!BlocksIter] {.async.} =
## Get the list of blocks in the BlockStore. This is an intensive operation
##
var
iter = BlocksIter()
let
cids = self.cids()
proc next(): Future[?Cid] {.async.} =
await idleAsync()
var cid: Cid
while true:
if iter.finished:
return Cid.none
cid = cids()
if finished(cids):
iter.finished = true
return Cid.none
without isManifest =? cid.isManifest, err:
trace "Error checking if cid is a manifest", err = err.msg
return Cid.none
case blockType:
of BlockType.Manifest:
if not isManifest:
trace "Cid is not manifest, skipping", cid
continue
break
of BlockType.Block:
if isManifest:
trace "Cid is a manifest, skipping", cid
continue
break
of BlockType.Both:
break
return cid.some
iter.next = next
return success iter
func putBlockSync(self: CacheStore, blk: Block): bool =
let blkSize = blk.data.len # in bytes
if blkSize > self.size:
trace "Block size is larger than cache size", blk = blkSize, cache = self.size
return false
while self.currentSize + blkSize > self.size:
try:
let removed = self.cache.removeLru()
self.currentSize -= removed.data.len
except EmptyLruCacheError as exc:
# if the cache is empty, can't remove anything, so break and add item
# to the cache
trace "Exception puting block to cache", exc = exc.msg
break
self.cache[blk.cid] = blk
self.currentSize += blkSize
return true
blockType = BlockType.Manifest): Future[?!BlocksIter] =
self.backingStore.listBlocks(blockType)
method putBlock*(
self: CacheStore,
blk: Block,
ttl = Duration.none): Future[?!void] {.async.} =
## Put a block to the blockstore
##
ttl = Duration.none): Future[?!void] =
self.backingStore.putBlock(blk, ttl)
trace "Storing block in cache", cid = blk.cid
if blk.isEmpty:
trace "Empty block, ignoring"
return success()
method delBlock*(self: CacheStore, cid: Cid): Future[?!void] =
discard self.cache.del(cid)
self.backingStore.delBlock(cid)
discard self.putBlockSync(blk)
return success()
method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} =
## Delete a block from the blockstore
##
trace "Deleting block from cache", cid
if cid.isEmpty:
trace "Empty block, ignoring"
return success()
let removed = self.cache.del(cid)
if removed.isSome:
self.currentSize -= removed.get.data.len
return success()
method close*(self: CacheStore): Future[void] {.async.} =
## Close the blockstore, a no-op for this implementation
##
discard
method close*(self: CacheStore): Future[void] =
self.backingStore.close()
func new*(
_: type CacheStore,
blocks: openArray[Block] = [],
backingStore: BlockStore,
cacheSize: Positive = DefaultCacheSize, # in bytes
chunkSize: Positive = DefaultChunkSize # in bytes
): CacheStore {.raises: [Defect, ValueError].} =
@ -198,16 +85,11 @@ func new*(
if cacheSize < chunkSize:
raise newException(ValueError, "cacheSize cannot be less than chunkSize")
var currentSize = 0
let
size = cacheSize div chunkSize
cache = newLruCache[Cid, Block](size)
store = CacheStore(
cache: cache,
currentSize: currentSize,
size: cacheSize)
for blk in blocks:
discard store.putBlockSync(blk)
backingStore: backingStore,
cache: cache)
return store

View File

@ -0,0 +1,180 @@
## Nim-Codex
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/upraises
push: {.upraises: [].}
import std/options
import std/tables
import pkg/chronicles
import pkg/chronos
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import ./blockstore
import ../chunker
import ../errors
import ../manifest
export blockstore
logScope:
topics = "codex memorystore"
type
MemoryStore* = ref object of BlockStore
bytesUsed*: int
capacity*: int
table: Table[Cid, Block]
InvalidBlockSize* = object of CodexError
const
MiB* = 1024 * 1024 # bytes, 1 mebibyte = 1,048,576 bytes
DefaultCacheSizeMiB* = 5
DefaultCacheSize* = DefaultCacheSizeMiB * MiB # bytes
method getBlock*(self: MemoryStore, cid: Cid): Future[?!Block] {.async.} =
trace "Getting block from cache", cid
if cid.isEmpty:
trace "Empty block, ignoring"
return success cid.emptyBlock
if cid notin self.table:
return failure (ref BlockNotFoundError)(msg: "Block not in memory store")
try:
return success self.table[cid]
except CatchableError as exc:
trace "Error getting block from memory store", cid, error = exc.msg
return failure exc
method hasBlock*(self: MemoryStore, cid: Cid): Future[?!bool] {.async.} =
trace "Checking MemoryStore for block presence", cid
if cid.isEmpty:
trace "Empty block, ignoring"
return true.success
return (cid in self.table).success
func cids(self: MemoryStore): (iterator: Cid {.gcsafe.}) =
return iterator(): Cid =
for cid in self.table.keys:
yield cid
method listBlocks*(self: MemoryStore, blockType = BlockType.Manifest): Future[?!BlocksIter] {.async.} =
var
iter = BlocksIter()
let
cids = self.cids()
proc next(): Future[?Cid] {.async.} =
await idleAsync()
var cid: Cid
while true:
if iter.finished:
return Cid.none
cid = cids()
if finished(cids):
iter.finished = true
return Cid.none
without isManifest =? cid.isManifest, err:
trace "Error checking if cid is a manifest", err = err.msg
return Cid.none
case blockType:
of BlockType.Manifest:
if not isManifest:
trace "Cid is not manifest, skipping", cid
continue
break
of BlockType.Block:
if isManifest:
trace "Cid is a manifest, skipping", cid
continue
break
of BlockType.Both:
break
return cid.some
iter.next = next
return success iter
proc getFreeCapacity(self: MemoryStore): int =
self.capacity - self.bytesUsed
func putBlockSync(self: MemoryStore, blk: Block): ?!void =
let
freeCapacity = self.getFreeCapacity()
blkSize = blk.data.len
if blkSize > freeCapacity:
trace "Block size is larger than free capacity", blk = blkSize, freeCapacity
return failure("Unable to store block: Memory store capacity exceeded.")
self.table[blk.cid] = blk
self.bytesUsed += blkSize
return success()
method putBlock*(self: MemoryStore, blk: Block, ttl = Duration.none): Future[?!void] {.async.} =
trace "Storing block in store", cid = blk.cid
if blk.isEmpty:
trace "Empty block, ignoring"
return success()
return self.putBlockSync(blk)
method delBlock*(self: MemoryStore, cid: Cid): Future[?!void] {.async.} =
trace "Deleting block from memory store", cid
if cid.isEmpty:
trace "Empty block, ignoring"
return success()
without toRemove =? await self.getBlock(cid), err:
trace "Unable to find block to remove"
return failure(err)
self.table.del(cid)
self.bytesUsed -= toRemove.data.len
return success()
method close*(self: MemoryStore): Future[void] {.async.} =
discard
func new*(
_: type MemoryStore,
blocks: openArray[Block] = [],
capacity: Positive = DefaultCacheSize, # in bytes
chunkSize: Positive = DefaultChunkSize # in bytes
): MemoryStore {.raises: [Defect, ValueError].} =
if capacity < chunkSize:
raise newException(ValueError, "capacity cannot be less than chunkSize")
let store = MemoryStore(
table: initTable[Cid, Block](),
bytesUsed: 0,
capacity: capacity)
# for blk in blocks:
# discard store.putBlockSync(blk)
return store

View File

@ -35,7 +35,7 @@ proc generateNodes*(
.expect("Should return multiaddress")])
wallet = WalletRef.example
network = BlockExcNetwork.new(switch)
localStore = CacheStore.new(blocks.mapIt( it ))
localStore = MemoryStore.new(blocks.mapIt( it ))
peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
blockDiscovery = DiscoveryEngine.new(localStore, peerStore, network, discovery, pendingBlocks)

View File

@ -0,0 +1,252 @@
import std/os
import std/strutils
import std/sequtils
import pkg/questionable
import pkg/questionable/results
import pkg/chronos
import pkg/asynctest
import pkg/libp2p
import pkg/stew/byteutils
import pkg/stew/endians2
import pkg/datastore
import pkg/codex/stores/cachestore
import pkg/codex/chunker
import pkg/codex/stores
import pkg/codex/blocktype as bt
import pkg/codex/clock
import ../helpers
import ../helpers/mockclock
import ./commonstoretests
proc createTestBlock(size: int): bt.Block =
bt.Block.new('a'.repeat(size).toBytes).tryGet()
suite "MemoryStore":
var
initialBlock: bt.Block
repo: MemoryStore
let
capacity = 100
chunkSize = 10
setup:
initialBlock = createTestBlock(chunkSize)
repo = MemoryStore.new([initialBlock], capacity, chunkSize)
# teardown:
# discard
# test "Should update current used bytes on block put":
# let blk = createTestBlock(200)
# check repo.quotaUsedBytes == 0
# (await repo.putBlock(blk)).tryGet
# check:
# repo.quotaUsedBytes == 200
# uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 200'u
# test "Should update current used bytes on block delete":
# let blk = createTestBlock(100)
# check repo.quotaUsedBytes == 0
# (await repo.putBlock(blk)).tryGet
# check repo.quotaUsedBytes == 100
# (await repo.delBlock(blk.cid)).tryGet
# check:
# repo.quotaUsedBytes == 0
# uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 0'u
# test "Should not update current used bytes if block exist":
# let blk = createTestBlock(100)
# check repo.quotaUsedBytes == 0
# (await repo.putBlock(blk)).tryGet
# check repo.quotaUsedBytes == 100
# # put again
# (await repo.putBlock(blk)).tryGet
# check repo.quotaUsedBytes == 100
# check:
# uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 100'u
# test "Should fail storing passed the quota":
# let blk = createTestBlock(300)
# check repo.totalUsed == 0
# expect QuotaUsedError:
# (await repo.putBlock(blk)).tryGet
# test "Should reserve bytes":
# let blk = createTestBlock(100)
# check repo.totalUsed == 0
# (await repo.putBlock(blk)).tryGet
# check repo.totalUsed == 100
# (await repo.reserve(100)).tryGet
# check:
# repo.totalUsed == 200
# repo.quotaUsedBytes == 100
# repo.quotaReservedBytes == 100
# uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u
# test "Should not reserve bytes over max quota":
# let blk = createTestBlock(100)
# check repo.totalUsed == 0
# (await repo.putBlock(blk)).tryGet
# check repo.totalUsed == 100
# expect QuotaNotEnoughError:
# (await repo.reserve(101)).tryGet
# check:
# repo.totalUsed == 100
# repo.quotaUsedBytes == 100
# repo.quotaReservedBytes == 0
# expect DatastoreKeyNotFound:
# discard (await metaDs.get(QuotaReservedKey)).tryGet
# test "Should release bytes":
# discard createTestBlock(100)
# check repo.totalUsed == 0
# (await repo.reserve(100)).tryGet
# check repo.totalUsed == 100
# (await repo.release(100)).tryGet
# check:
# repo.totalUsed == 0
# repo.quotaUsedBytes == 0
# repo.quotaReservedBytes == 0
# uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 0'u
# test "Should not release bytes less than quota":
# check repo.totalUsed == 0
# (await repo.reserve(100)).tryGet
# check repo.totalUsed == 100
# expect CatchableError:
# (await repo.release(101)).tryGet
# check:
# repo.totalUsed == 100
# repo.quotaUsedBytes == 0
# repo.quotaReservedBytes == 100
# uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u
# proc queryMetaDs(key: Key): Future[seq[QueryResponse]] {.async.} =
# let
# query = Query.init(key)
# responseIter = (await metaDs.query(query)).tryGet
# response = (await allFinished(toSeq(responseIter)))
# .mapIt(it.read.tryGet)
# .filterIt(it.key.isSome)
# return response
# test "Should store block expiration timestamp":
# let
# duration = 10.seconds
# blk = createTestBlock(100)
# let
# expectedExpiration: SecondsSince1970 = 123 + 10
# expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet
# (await repo.putBlock(blk, duration.some)).tryGet
# let response = await queryMetaDs(expectedKey)
# check:
# response.len == 1
# !response[0].key == expectedKey
# response[0].data == expectedExpiration.toBytes
# test "Should store block with default expiration timestamp when not provided":
# let
# blk = createTestBlock(100)
# let
# expectedExpiration: SecondsSince1970 = 123 + DefaultBlockTtl.seconds
# expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet
# (await repo.putBlock(blk)).tryGet
# let response = await queryMetaDs(expectedKey)
# check:
# response.len == 1
# !response[0].key == expectedKey
# response[0].data == expectedExpiration.toBytes
# test "delBlock should remove expiration metadata":
# let
# blk = createTestBlock(100)
# expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet
# (await repo.putBlock(blk, 10.seconds.some)).tryGet
# (await repo.delBlock(blk.cid)).tryGet
# let response = await queryMetaDs(expectedKey)
# check:
# response.len == 0
# test "Should retrieve block expiration information":
# proc unpack(beIter: Future[?!BlockExpirationIter]): Future[seq[BlockExpiration]] {.async.} =
# var expirations = newSeq[BlockExpiration](0)
# without iter =? (await beIter), err:
# return expirations
# for be in toSeq(iter):
# if value =? (await be):
# expirations.add(value)
# return expirations
# let
# duration = 10.seconds
# blk1 = createTestBlock(10)
# blk2 = createTestBlock(11)
# blk3 = createTestBlock(12)
# let
# expectedExpiration: SecondsSince1970 = 123 + 10
# proc assertExpiration(be: BlockExpiration, expectedBlock: bt.Block) =
# check:
# be.cid == expectedBlock.cid
# be.expiration == expectedExpiration
# (await repo.putBlock(blk1, duration.some)).tryGet
# (await repo.putBlock(blk2, duration.some)).tryGet
# (await repo.putBlock(blk3, duration.some)).tryGet
# let
# blockExpirations1 = await unpack(repo.getBlockExpirations(maxNumber=2, offset=0))
# blockExpirations2 = await unpack(repo.getBlockExpirations(maxNumber=2, offset=2))
# check blockExpirations1.len == 2
# assertExpiration(blockExpirations1[0], blk2)
# assertExpiration(blockExpirations1[1], blk1)
# check blockExpirations2.len == 1
# assertExpiration(blockExpirations2[0], blk3)
commonBlockStoreTests(
"MemoryStore", proc: BlockStore =
MemoryStore.new([])
)

View File

@ -1,5 +1,6 @@
import ./stores/testcachestore
import ./stores/testrepostore
import ./stores/testmaintenance
import ./stores/testmemorystore
{.warning[UnusedImport]: off.}