Implements tests for memorystore and cachestore
This commit is contained in:
parent
8740b65a79
commit
3af2943a61
|
@ -78,8 +78,8 @@ method close*(self: CacheStore): Future[void] =
|
||||||
func new*(
|
func new*(
|
||||||
_: type CacheStore,
|
_: type CacheStore,
|
||||||
backingStore: BlockStore,
|
backingStore: BlockStore,
|
||||||
cacheSize: Positive = DefaultCacheSize, # in bytes
|
cacheSize: Positive = DefaultCacheSize,
|
||||||
chunkSize: Positive = DefaultChunkSize # in bytes
|
chunkSize: Positive = DefaultChunkSize
|
||||||
): CacheStore {.raises: [Defect, ValueError].} =
|
): CacheStore {.raises: [Defect, ValueError].} =
|
||||||
|
|
||||||
if cacheSize < chunkSize:
|
if cacheSize < chunkSize:
|
||||||
|
|
|
@ -36,12 +36,10 @@ type
|
||||||
capacity*: int
|
capacity*: int
|
||||||
table: Table[Cid, Block]
|
table: Table[Cid, Block]
|
||||||
|
|
||||||
InvalidBlockSize* = object of CodexError
|
|
||||||
|
|
||||||
const
|
const
|
||||||
MiB* = 1024 * 1024
|
MiB* = 1024 * 1024
|
||||||
DefaultCacheSizeMiB* = 5
|
DefaultMemoryStoreCapacityMiB* = 5
|
||||||
DefaultCacheSize* = DefaultCacheSizeMiB * MiB
|
DefaultMemoryStoreCapacity* = DefaultMemoryStoreCapacityMiB * MiB
|
||||||
|
|
||||||
method getBlock*(self: MemoryStore, cid: Cid): Future[?!Block] {.async.} =
|
method getBlock*(self: MemoryStore, cid: Cid): Future[?!Block] {.async.} =
|
||||||
trace "Getting block from cache", cid
|
trace "Getting block from cache", cid
|
||||||
|
@ -128,7 +126,7 @@ func putBlockSync(self: MemoryStore, blk: Block): ?!void =
|
||||||
|
|
||||||
if blkSize > freeCapacity:
|
if blkSize > freeCapacity:
|
||||||
trace "Block size is larger than free capacity", blk = blkSize, freeCapacity
|
trace "Block size is larger than free capacity", blk = blkSize, freeCapacity
|
||||||
return failure("Unable to store block: Memory store capacity exceeded.")
|
return failure("Unable to store block: Insufficient free capacity.")
|
||||||
|
|
||||||
self.table[blk.cid] = blk
|
self.table[blk.cid] = blk
|
||||||
self.bytesUsed += blkSize
|
self.bytesUsed += blkSize
|
||||||
|
@ -163,11 +161,8 @@ method close*(self: MemoryStore): Future[void] {.async.} =
|
||||||
func new*(
|
func new*(
|
||||||
_: type MemoryStore,
|
_: type MemoryStore,
|
||||||
blocks: openArray[Block] = [],
|
blocks: openArray[Block] = [],
|
||||||
capacity: Positive = DefaultCacheSize,
|
capacity: Positive = DefaultMemoryStoreCapacity,
|
||||||
chunkSize: Positive = DefaultChunkSize
|
|
||||||
): MemoryStore {.raises: [Defect, ValueError].} =
|
): MemoryStore {.raises: [Defect, ValueError].} =
|
||||||
if capacity < chunkSize:
|
|
||||||
raise newException(ValueError, "capacity cannot be less than chunkSize")
|
|
||||||
|
|
||||||
let store = MemoryStore(
|
let store = MemoryStore(
|
||||||
table: initTable[Cid, Block](),
|
table: initTable[Cid, Block](),
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
import std/strutils
|
||||||
|
|
||||||
import pkg/chronos
|
import pkg/chronos
|
||||||
import pkg/libp2p
|
import pkg/libp2p
|
||||||
import pkg/libp2p/varint
|
import pkg/libp2p/varint
|
||||||
|
@ -5,6 +7,7 @@ import pkg/codex/blocktype as bt
|
||||||
import pkg/codex/stores
|
import pkg/codex/stores
|
||||||
import pkg/codex/manifest
|
import pkg/codex/manifest
|
||||||
import pkg/codex/rng
|
import pkg/codex/rng
|
||||||
|
import pkg/stew/byteutils
|
||||||
|
|
||||||
import ./helpers/nodeutils
|
import ./helpers/nodeutils
|
||||||
import ./helpers/randomchunker
|
import ./helpers/randomchunker
|
||||||
|
@ -56,3 +59,6 @@ proc corruptBlocks*(
|
||||||
bytePos.add(ii)
|
bytePos.add(ii)
|
||||||
blk.data[ii] = byte 0
|
blk.data[ii] = byte 0
|
||||||
return pos
|
return pos
|
||||||
|
|
||||||
|
proc createTestBlock*(size: int): bt.Block =
|
||||||
|
bt.Block.new('a'.repeat(size).toBytes).tryGet()
|
||||||
|
|
|
@ -0,0 +1,15 @@
|
||||||
|
import std/strutils
|
||||||
|
|
||||||
|
import pkg/chronos
|
||||||
|
import pkg/libp2p
|
||||||
|
import pkg/questionable/results
|
||||||
|
import pkg/codex/stores/blockstore
|
||||||
|
|
||||||
|
type
|
||||||
|
MockBlockStore* = ref object of BlockStore
|
||||||
|
numberOfGetCalls*: int
|
||||||
|
getBlock*: Block
|
||||||
|
|
||||||
|
method getBlock*(self: MockBlockStore, cid: Cid): Future[?!Block] {.async.} =
|
||||||
|
inc self.numberOfGetCalls
|
||||||
|
return success(self.getBlock)
|
|
@ -6,69 +6,42 @@ import pkg/libp2p
|
||||||
import pkg/stew/byteutils
|
import pkg/stew/byteutils
|
||||||
import pkg/questionable/results
|
import pkg/questionable/results
|
||||||
import pkg/codex/stores/cachestore
|
import pkg/codex/stores/cachestore
|
||||||
|
import pkg/codex/stores/memorystore
|
||||||
import pkg/codex/chunker
|
import pkg/codex/chunker
|
||||||
|
|
||||||
import ./commonstoretests
|
import ./commonstoretests
|
||||||
|
|
||||||
import ../helpers
|
import ../helpers
|
||||||
|
import ../helpers/mockblockstore
|
||||||
|
|
||||||
suite "Cache Store":
|
suite "Cache Store":
|
||||||
var
|
var
|
||||||
newBlock, newBlock1, newBlock2, newBlock3: Block
|
newBlock: Block
|
||||||
|
backingStore: MockBlockStore
|
||||||
store: CacheStore
|
store: CacheStore
|
||||||
|
|
||||||
setup:
|
setup:
|
||||||
newBlock = Block.new("New Kids on the Block".toBytes()).tryGet()
|
newBlock = Block.new("New Kids on the Block".toBytes()).tryGet()
|
||||||
newBlock1 = Block.new("1".repeat(100).toBytes()).tryGet()
|
backingStore = MockBlockStore.new()
|
||||||
newBlock2 = Block.new("2".repeat(100).toBytes()).tryGet()
|
backingStore.getBlock = newBlock
|
||||||
newBlock3 = Block.new("3".repeat(100).toBytes()).tryGet()
|
store = CacheStore.new(backingStore)
|
||||||
store = CacheStore.new()
|
|
||||||
|
|
||||||
test "constructor":
|
test "constructor":
|
||||||
# cache size cannot be smaller than chunk size
|
|
||||||
expect ValueError:
|
expect ValueError:
|
||||||
discard CacheStore.new(cacheSize = 1, chunkSize = 2)
|
discard CacheStore.new(backingStore, cacheSize = 1, chunkSize = 2)
|
||||||
|
|
||||||
store = CacheStore.new(cacheSize = 100, chunkSize = 1)
|
|
||||||
check store.currentSize == 0
|
|
||||||
|
|
||||||
store = CacheStore.new(@[newBlock1, newBlock2, newBlock3])
|
|
||||||
check store.currentSize == 300
|
|
||||||
|
|
||||||
# initial cache blocks total more than cache size, currentSize should
|
|
||||||
# never exceed max cache size
|
|
||||||
store = CacheStore.new(
|
|
||||||
blocks = @[newBlock1, newBlock2, newBlock3],
|
|
||||||
cacheSize = 200,
|
|
||||||
chunkSize = 1)
|
|
||||||
check store.currentSize == 200
|
|
||||||
|
|
||||||
# cache size cannot be less than chunks size
|
|
||||||
expect ValueError:
|
expect ValueError:
|
||||||
discard CacheStore.new(
|
discard CacheStore.new(backingStore, cacheSize = 99, chunkSize = 100)
|
||||||
cacheSize = 99,
|
|
||||||
chunkSize = 100)
|
|
||||||
|
|
||||||
test "putBlock":
|
test "getBlock can return cached block":
|
||||||
(await store.putBlock(newBlock1)).tryGet()
|
let
|
||||||
check (await store.hasBlock(newBlock1.cid)).tryGet()
|
received1 = (await store.getBlock(newBlock.cid)).tryGet()
|
||||||
|
received2 = (await store.getBlock(newBlock.cid)).tryGet()
|
||||||
|
|
||||||
# block size bigger than entire cache
|
|
||||||
store = CacheStore.new(cacheSize = 99, chunkSize = 98)
|
|
||||||
(await store.putBlock(newBlock1)).tryGet()
|
|
||||||
check not (await store.hasBlock(newBlock1.cid)).tryGet()
|
|
||||||
|
|
||||||
# block being added causes removal of LRU block
|
|
||||||
store = CacheStore.new(
|
|
||||||
@[newBlock1, newBlock2, newBlock3],
|
|
||||||
cacheSize = 200,
|
|
||||||
chunkSize = 1)
|
|
||||||
check:
|
check:
|
||||||
not (await store.hasBlock(newBlock1.cid)).tryGet()
|
newBlock == received1
|
||||||
(await store.hasBlock(newBlock2.cid)).tryGet()
|
newBlock == received2
|
||||||
(await store.hasBlock(newBlock2.cid)).tryGet()
|
backingStore.numberOfGetCalls == 1
|
||||||
store.currentSize == newBlock2.data.len + newBlock3.data.len # 200
|
|
||||||
|
|
||||||
commonBlockStoreTests(
|
commonBlockStoreTests(
|
||||||
"Cache", proc: BlockStore =
|
"Cache", proc: BlockStore =
|
||||||
BlockStore(CacheStore.new(cacheSize = 500, chunkSize = 1)))
|
BlockStore(CacheStore.new(MemoryStore.new())))
|
||||||
|
|
|
@ -22,231 +22,20 @@ import ../helpers
|
||||||
import ../helpers/mockclock
|
import ../helpers/mockclock
|
||||||
import ./commonstoretests
|
import ./commonstoretests
|
||||||
|
|
||||||
proc createTestBlock(size: int): bt.Block =
|
|
||||||
bt.Block.new('a'.repeat(size).toBytes).tryGet()
|
|
||||||
|
|
||||||
suite "MemoryStore":
|
suite "MemoryStore":
|
||||||
var
|
test "Should store initial blocks":
|
||||||
initialBlock: bt.Block
|
let
|
||||||
|
capacity = 100
|
||||||
|
chunkSize = 10
|
||||||
|
blk = createTestBlock(10)
|
||||||
|
|
||||||
repo: MemoryStore
|
let store = MemoryStore.new([blk], capacity, chunkSize)
|
||||||
|
|
||||||
let
|
let receivedBlk = await store.getBlock(blk.cid)
|
||||||
capacity = 100
|
|
||||||
chunkSize = 10
|
|
||||||
|
|
||||||
setup:
|
check receivedBlk.tryGet() == blk
|
||||||
initialBlock = createTestBlock(chunkSize)
|
|
||||||
|
|
||||||
repo = MemoryStore.new([initialBlock], capacity, chunkSize)
|
|
||||||
|
|
||||||
# teardown:
|
|
||||||
# discard
|
|
||||||
|
|
||||||
# test "Should update current used bytes on block put":
|
|
||||||
# let blk = createTestBlock(200)
|
|
||||||
|
|
||||||
# check repo.quotaUsedBytes == 0
|
|
||||||
# (await repo.putBlock(blk)).tryGet
|
|
||||||
|
|
||||||
# check:
|
|
||||||
# repo.quotaUsedBytes == 200
|
|
||||||
# uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 200'u
|
|
||||||
|
|
||||||
# test "Should update current used bytes on block delete":
|
|
||||||
# let blk = createTestBlock(100)
|
|
||||||
|
|
||||||
# check repo.quotaUsedBytes == 0
|
|
||||||
# (await repo.putBlock(blk)).tryGet
|
|
||||||
# check repo.quotaUsedBytes == 100
|
|
||||||
|
|
||||||
# (await repo.delBlock(blk.cid)).tryGet
|
|
||||||
|
|
||||||
# check:
|
|
||||||
# repo.quotaUsedBytes == 0
|
|
||||||
# uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 0'u
|
|
||||||
|
|
||||||
# test "Should not update current used bytes if block exist":
|
|
||||||
# let blk = createTestBlock(100)
|
|
||||||
|
|
||||||
# check repo.quotaUsedBytes == 0
|
|
||||||
# (await repo.putBlock(blk)).tryGet
|
|
||||||
# check repo.quotaUsedBytes == 100
|
|
||||||
|
|
||||||
# # put again
|
|
||||||
# (await repo.putBlock(blk)).tryGet
|
|
||||||
# check repo.quotaUsedBytes == 100
|
|
||||||
|
|
||||||
# check:
|
|
||||||
# uint64.fromBytesBE((await metaDs.get(QuotaUsedKey)).tryGet) == 100'u
|
|
||||||
|
|
||||||
# test "Should fail storing passed the quota":
|
|
||||||
# let blk = createTestBlock(300)
|
|
||||||
|
|
||||||
# check repo.totalUsed == 0
|
|
||||||
# expect QuotaUsedError:
|
|
||||||
# (await repo.putBlock(blk)).tryGet
|
|
||||||
|
|
||||||
# test "Should reserve bytes":
|
|
||||||
# let blk = createTestBlock(100)
|
|
||||||
|
|
||||||
# check repo.totalUsed == 0
|
|
||||||
# (await repo.putBlock(blk)).tryGet
|
|
||||||
# check repo.totalUsed == 100
|
|
||||||
|
|
||||||
# (await repo.reserve(100)).tryGet
|
|
||||||
|
|
||||||
# check:
|
|
||||||
# repo.totalUsed == 200
|
|
||||||
# repo.quotaUsedBytes == 100
|
|
||||||
# repo.quotaReservedBytes == 100
|
|
||||||
# uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u
|
|
||||||
|
|
||||||
# test "Should not reserve bytes over max quota":
|
|
||||||
# let blk = createTestBlock(100)
|
|
||||||
|
|
||||||
# check repo.totalUsed == 0
|
|
||||||
# (await repo.putBlock(blk)).tryGet
|
|
||||||
# check repo.totalUsed == 100
|
|
||||||
|
|
||||||
# expect QuotaNotEnoughError:
|
|
||||||
# (await repo.reserve(101)).tryGet
|
|
||||||
|
|
||||||
# check:
|
|
||||||
# repo.totalUsed == 100
|
|
||||||
# repo.quotaUsedBytes == 100
|
|
||||||
# repo.quotaReservedBytes == 0
|
|
||||||
|
|
||||||
# expect DatastoreKeyNotFound:
|
|
||||||
# discard (await metaDs.get(QuotaReservedKey)).tryGet
|
|
||||||
|
|
||||||
# test "Should release bytes":
|
|
||||||
# discard createTestBlock(100)
|
|
||||||
|
|
||||||
# check repo.totalUsed == 0
|
|
||||||
# (await repo.reserve(100)).tryGet
|
|
||||||
# check repo.totalUsed == 100
|
|
||||||
|
|
||||||
# (await repo.release(100)).tryGet
|
|
||||||
|
|
||||||
# check:
|
|
||||||
# repo.totalUsed == 0
|
|
||||||
# repo.quotaUsedBytes == 0
|
|
||||||
# repo.quotaReservedBytes == 0
|
|
||||||
# uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 0'u
|
|
||||||
|
|
||||||
# test "Should not release bytes less than quota":
|
|
||||||
# check repo.totalUsed == 0
|
|
||||||
# (await repo.reserve(100)).tryGet
|
|
||||||
# check repo.totalUsed == 100
|
|
||||||
|
|
||||||
# expect CatchableError:
|
|
||||||
# (await repo.release(101)).tryGet
|
|
||||||
|
|
||||||
# check:
|
|
||||||
# repo.totalUsed == 100
|
|
||||||
# repo.quotaUsedBytes == 0
|
|
||||||
# repo.quotaReservedBytes == 100
|
|
||||||
# uint64.fromBytesBE((await metaDs.get(QuotaReservedKey)).tryGet) == 100'u
|
|
||||||
|
|
||||||
# proc queryMetaDs(key: Key): Future[seq[QueryResponse]] {.async.} =
|
|
||||||
# let
|
|
||||||
# query = Query.init(key)
|
|
||||||
# responseIter = (await metaDs.query(query)).tryGet
|
|
||||||
# response = (await allFinished(toSeq(responseIter)))
|
|
||||||
# .mapIt(it.read.tryGet)
|
|
||||||
# .filterIt(it.key.isSome)
|
|
||||||
# return response
|
|
||||||
|
|
||||||
# test "Should store block expiration timestamp":
|
|
||||||
# let
|
|
||||||
# duration = 10.seconds
|
|
||||||
# blk = createTestBlock(100)
|
|
||||||
|
|
||||||
# let
|
|
||||||
# expectedExpiration: SecondsSince1970 = 123 + 10
|
|
||||||
# expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet
|
|
||||||
|
|
||||||
# (await repo.putBlock(blk, duration.some)).tryGet
|
|
||||||
|
|
||||||
# let response = await queryMetaDs(expectedKey)
|
|
||||||
|
|
||||||
# check:
|
|
||||||
# response.len == 1
|
|
||||||
# !response[0].key == expectedKey
|
|
||||||
# response[0].data == expectedExpiration.toBytes
|
|
||||||
|
|
||||||
# test "Should store block with default expiration timestamp when not provided":
|
|
||||||
# let
|
|
||||||
# blk = createTestBlock(100)
|
|
||||||
|
|
||||||
# let
|
|
||||||
# expectedExpiration: SecondsSince1970 = 123 + DefaultBlockTtl.seconds
|
|
||||||
# expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet
|
|
||||||
|
|
||||||
# (await repo.putBlock(blk)).tryGet
|
|
||||||
|
|
||||||
# let response = await queryMetaDs(expectedKey)
|
|
||||||
|
|
||||||
# check:
|
|
||||||
# response.len == 1
|
|
||||||
# !response[0].key == expectedKey
|
|
||||||
# response[0].data == expectedExpiration.toBytes
|
|
||||||
|
|
||||||
# test "delBlock should remove expiration metadata":
|
|
||||||
# let
|
|
||||||
# blk = createTestBlock(100)
|
|
||||||
# expectedKey = Key.init("meta/ttl/" & $blk.cid).tryGet
|
|
||||||
|
|
||||||
# (await repo.putBlock(blk, 10.seconds.some)).tryGet
|
|
||||||
# (await repo.delBlock(blk.cid)).tryGet
|
|
||||||
|
|
||||||
# let response = await queryMetaDs(expectedKey)
|
|
||||||
|
|
||||||
# check:
|
|
||||||
# response.len == 0
|
|
||||||
|
|
||||||
# test "Should retrieve block expiration information":
|
|
||||||
# proc unpack(beIter: Future[?!BlockExpirationIter]): Future[seq[BlockExpiration]] {.async.} =
|
|
||||||
# var expirations = newSeq[BlockExpiration](0)
|
|
||||||
# without iter =? (await beIter), err:
|
|
||||||
# return expirations
|
|
||||||
# for be in toSeq(iter):
|
|
||||||
# if value =? (await be):
|
|
||||||
# expirations.add(value)
|
|
||||||
# return expirations
|
|
||||||
|
|
||||||
# let
|
|
||||||
# duration = 10.seconds
|
|
||||||
# blk1 = createTestBlock(10)
|
|
||||||
# blk2 = createTestBlock(11)
|
|
||||||
# blk3 = createTestBlock(12)
|
|
||||||
|
|
||||||
# let
|
|
||||||
# expectedExpiration: SecondsSince1970 = 123 + 10
|
|
||||||
|
|
||||||
# proc assertExpiration(be: BlockExpiration, expectedBlock: bt.Block) =
|
|
||||||
# check:
|
|
||||||
# be.cid == expectedBlock.cid
|
|
||||||
# be.expiration == expectedExpiration
|
|
||||||
|
|
||||||
|
|
||||||
# (await repo.putBlock(blk1, duration.some)).tryGet
|
|
||||||
# (await repo.putBlock(blk2, duration.some)).tryGet
|
|
||||||
# (await repo.putBlock(blk3, duration.some)).tryGet
|
|
||||||
|
|
||||||
# let
|
|
||||||
# blockExpirations1 = await unpack(repo.getBlockExpirations(maxNumber=2, offset=0))
|
|
||||||
# blockExpirations2 = await unpack(repo.getBlockExpirations(maxNumber=2, offset=2))
|
|
||||||
|
|
||||||
# check blockExpirations1.len == 2
|
|
||||||
# assertExpiration(blockExpirations1[0], blk2)
|
|
||||||
# assertExpiration(blockExpirations1[1], blk1)
|
|
||||||
|
|
||||||
# check blockExpirations2.len == 1
|
|
||||||
# assertExpiration(blockExpirations2[0], blk3)
|
|
||||||
|
|
||||||
commonBlockStoreTests(
|
commonBlockStoreTests(
|
||||||
"MemoryStore", proc: BlockStore =
|
"MemoryStore", proc: BlockStore =
|
||||||
MemoryStore.new([])
|
BlockStore(MemoryStore.new([]))
|
||||||
)
|
)
|
||||||
|
|
|
@ -78,9 +78,6 @@ suite "RepoStore":
|
||||||
(await repoDs.close()).tryGet
|
(await repoDs.close()).tryGet
|
||||||
(await metaDs.close()).tryGet
|
(await metaDs.close()).tryGet
|
||||||
|
|
||||||
proc createTestBlock(size: int): bt.Block =
|
|
||||||
bt.Block.new('a'.repeat(size).toBytes).tryGet()
|
|
||||||
|
|
||||||
test "Should update current used bytes on block put":
|
test "Should update current used bytes on block put":
|
||||||
let blk = createTestBlock(200)
|
let blk = createTestBlock(200)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue