mirror of
https://github.com/codex-storage/nim-codex.git
synced 2025-01-12 05:54:22 +00:00
feat: introduce LRU cache (#50)
* feat: introduce LRU cache Replace `MemoryStore` with LRU caching mechanism. `lrucache` library was forked to https://github.com/status-im/lrucache.nim. Co-authored-by: Eric Mastro <eric.mastro@gmail.com> # Conflicts: # dagger/dagger.nim # dagger/stores.nim # dagger/stores/manager.nim # tests/dagger/blockexc/testengine.nim # tests/dagger/helpers/nodeutils.nim # tests/dagger/testnode.nim # tests/dagger/teststores.nim * feat: introduce --cache-size CLI option Allow for a value of `0` to disable the cache. # Conflicts: # dagger/dagger.nim * allow dynamic block size in cache allow block size to be variable/dynamic update lrucache to use updated lrucache dep Using removeLru proc, added tests * Refactor CacheStore init block Co-authored-by: Michael Bradley, Jr <michaelsbradleyjr@gmail.com>
This commit is contained in:
parent
26ead9726d
commit
70cbdff901
3
.gitmodules
vendored
3
.gitmodules
vendored
@ -159,3 +159,6 @@
|
||||
[submodule "vendor/nim-ethers"]
|
||||
path = vendor/nim-ethers
|
||||
url = https://github.com/status-im/nim-ethers
|
||||
[submodule "vendor/lrucache.nim"]
|
||||
path = vendor/lrucache.nim
|
||||
url = https://github.com/status-im/lrucache.nim
|
||||
|
@ -53,3 +53,6 @@ task testContracts, "Build, deploy and test contracts":
|
||||
|
||||
task testAll, "Build & run Dagger tests":
|
||||
test "testAll", params = "-d:chronicles_log_level=WARN"
|
||||
|
||||
task dagger, "build dagger binary":
|
||||
buildBinary "dagger"
|
||||
|
@ -22,7 +22,7 @@ import ./blocktype
|
||||
export blocktype
|
||||
|
||||
const
|
||||
DefaultChunkSize*: int64 = 1024 * 256
|
||||
DefaultChunkSize*: Positive = 1024 * 256
|
||||
|
||||
type
|
||||
# default reader type
|
||||
|
@ -16,6 +16,10 @@ import pkg/chronicles
|
||||
import pkg/confutils/defs
|
||||
import pkg/libp2p
|
||||
|
||||
import ./stores/cachestore
|
||||
|
||||
export DefaultCacheSizeMiB
|
||||
|
||||
const
|
||||
DefaultTcpListenMultiAddr = "/ip4/0.0.0.0/tcp/0"
|
||||
|
||||
@ -70,6 +74,13 @@ type
|
||||
name: "api-port"
|
||||
abbr: "p" }: int
|
||||
|
||||
cacheSize* {.
|
||||
desc: "The size in MiB of the block cache, 0 disables the cache"
|
||||
defaultValue: DefaultCacheSizeMiB
|
||||
defaultValueDesc: $DefaultCacheSizeMiB
|
||||
name: "cache-size"
|
||||
abbr: "c" }: Natural
|
||||
|
||||
of initNode:
|
||||
discard
|
||||
|
||||
|
@ -23,8 +23,7 @@ import ./node
|
||||
import ./conf
|
||||
import ./rng
|
||||
import ./rest/api
|
||||
import ./stores/fsstore
|
||||
import ./stores/networkstore
|
||||
import ./stores
|
||||
import ./blockexchange
|
||||
import ./utils/fileutils
|
||||
|
||||
@ -62,10 +61,16 @@ proc new*(T: type DaggerServer, config: DaggerConf): T =
|
||||
.withTcpTransport({ServerFlags.ReuseAddr})
|
||||
.build()
|
||||
|
||||
let cache =
|
||||
if config.cacheSize > 0:
|
||||
CacheStore.new(cacheSize = config.cacheSize * MiB)
|
||||
else:
|
||||
CacheStore.new()
|
||||
|
||||
let
|
||||
wallet = WalletRef.new(EthPrivateKey.random())
|
||||
network = BlockExcNetwork.new(switch)
|
||||
localStore = FSStore.new(config.dataDir / "repo")
|
||||
localStore = FSStore.new(config.dataDir / "repo", cache = cache)
|
||||
engine = BlockExcEngine.new(localStore, wallet, network)
|
||||
store = NetworkStore.new(engine, localStore)
|
||||
daggerNode = DaggerNodeRef.new(switch, store, engine)
|
||||
|
@ -1,7 +1,7 @@
|
||||
import ./stores/[
|
||||
memorystore,
|
||||
cachestore,
|
||||
blockstore,
|
||||
networkstore,
|
||||
fsstore]
|
||||
|
||||
export memorystore, blockstore, networkstore, fsstore
|
||||
export cachestore, blockstore, networkstore, fsstore
|
||||
|
124
dagger/stores/cachestore.nim
Normal file
124
dagger/stores/cachestore.nim
Normal file
@ -0,0 +1,124 @@
|
||||
## Nim-Dagger
|
||||
## Copyright (c) 2021 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import std/options
|
||||
import std/sequtils
|
||||
|
||||
import pkg/chronicles
|
||||
import pkg/chronos
|
||||
import pkg/libp2p
|
||||
import pkg/lrucache
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
||||
import ./blockstore
|
||||
import ../blocktype
|
||||
import ../chunker
|
||||
import ../errors
|
||||
|
||||
export blockstore
|
||||
|
||||
logScope:
|
||||
topics = "dagger cachestore"
|
||||
|
||||
type
|
||||
CacheStore* = ref object of BlockStore
|
||||
currentSize*: Natural # in bytes
|
||||
size*: Positive # in bytes
|
||||
cache: LruCache[Cid, Block]
|
||||
|
||||
InvalidBlockSize* = object of DaggerError
|
||||
|
||||
const
|
||||
MiB* = 1024 * 1024 # bytes, 1 mebibyte = 1,048,576 bytes
|
||||
DefaultCacheSizeMiB* = 100
|
||||
DefaultCacheSize* = DefaultCacheSizeMiB * MiB # bytes
|
||||
|
||||
method getBlock*(
|
||||
self: CacheStore,
|
||||
cid: Cid): Future[?!Block] {.async.} =
|
||||
## Get a block from the stores
|
||||
##
|
||||
|
||||
return self.cache[cid].catch()
|
||||
|
||||
method hasBlock*(self: CacheStore, cid: Cid): bool =
|
||||
## check if the block exists
|
||||
##
|
||||
|
||||
self.cache.contains(cid)
|
||||
|
||||
func putBlockSync(self: CacheStore, blk: Block): bool =
|
||||
|
||||
let blkSize = blk.data.len # in bytes
|
||||
|
||||
if blkSize > self.size:
|
||||
return false
|
||||
|
||||
while self.currentSize + blkSize > self.size:
|
||||
try:
|
||||
let removed = self.cache.removeLru()
|
||||
self.currentSize -= removed.data.len
|
||||
except EmptyLruCacheError:
|
||||
# if the cache is empty, can't remove anything, so break and add item
|
||||
# to the cache
|
||||
break
|
||||
|
||||
self.cache[blk.cid] = blk
|
||||
self.currentSize += blkSize
|
||||
return true
|
||||
|
||||
method putBlock*(
|
||||
self: CacheStore,
|
||||
blk: Block): Future[bool] {.async.} =
|
||||
## Put a block to the blockstore
|
||||
##
|
||||
return self.putBlockSync(blk)
|
||||
|
||||
method delBlock*(
|
||||
self: CacheStore,
|
||||
cid: Cid): Future[bool] {.async.} =
|
||||
## delete a block/s from the block store
|
||||
##
|
||||
|
||||
try:
|
||||
let removed = self.cache.del(cid)
|
||||
if removed.isSome:
|
||||
self.currentSize -= removed.get.data.len
|
||||
return true
|
||||
return false
|
||||
except EmptyLruCacheError:
|
||||
return false
|
||||
|
||||
func new*(
|
||||
_: type CacheStore,
|
||||
blocks: openArray[Block] = [],
|
||||
cacheSize: Positive = DefaultCacheSize, # in bytes
|
||||
chunkSize: Positive = DefaultChunkSize # in bytes
|
||||
): CacheStore {.raises: [Defect, ValueError].} =
|
||||
|
||||
if cacheSize < chunkSize:
|
||||
raise newException(ValueError, "cacheSize cannot be less than chunkSize")
|
||||
|
||||
var currentSize = 0
|
||||
let
|
||||
size = cacheSize div chunkSize
|
||||
cache = newLruCache[Cid, Block](size)
|
||||
store = CacheStore(
|
||||
cache: cache,
|
||||
currentSize: currentSize,
|
||||
size: cacheSize
|
||||
)
|
||||
|
||||
for blk in blocks:
|
||||
discard store.putBlockSync(blk)
|
||||
|
||||
return store
|
@ -18,7 +18,7 @@ import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/stew/io2
|
||||
|
||||
import ./memorystore
|
||||
import ./cachestore
|
||||
import ./blockstore
|
||||
import ../blocktype
|
||||
|
||||
@ -106,7 +106,7 @@ proc new*(
|
||||
T: type FSStore,
|
||||
repoDir: string,
|
||||
postfixLen = 2,
|
||||
cache: BlockStore = MemoryStore.new()): T =
|
||||
cache: BlockStore = CacheStore.new()): T =
|
||||
T(
|
||||
postfixLen: postfixLen,
|
||||
repoDir: repoDir,
|
||||
|
@ -1,79 +0,0 @@
|
||||
## Nim-Dagger
|
||||
## Copyright (c) 2021 Status Research & Development GmbH
|
||||
## Licensed under either of
|
||||
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
||||
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
||||
## at your option.
|
||||
## This file may not be copied, modified, or distributed except according to
|
||||
## those terms.
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import std/sequtils
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/libp2p
|
||||
import pkg/chronicles
|
||||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
|
||||
import ./blockstore
|
||||
import ../blocktype
|
||||
|
||||
export blockstore
|
||||
|
||||
logScope:
|
||||
topics = "dagger memstore"
|
||||
|
||||
type
|
||||
MemoryStore* = ref object of BlockStore
|
||||
blocks: seq[Block] # TODO: Should be an LRU cache
|
||||
|
||||
method getBlock*(
|
||||
b: MemoryStore,
|
||||
cid: Cid): Future[?!Block] {.async.} =
|
||||
## Get a block from the stores
|
||||
##
|
||||
|
||||
trace "Getting block", cid
|
||||
let found = b.blocks.filterIt(
|
||||
it.cid == cid
|
||||
)
|
||||
|
||||
if found.len <= 0:
|
||||
return failure("Couldn't get block")
|
||||
|
||||
trace "Retrieved block", cid
|
||||
|
||||
return found[0].success
|
||||
|
||||
method hasBlock*(s: MemoryStore, cid: Cid): bool =
|
||||
## check if the block exists
|
||||
##
|
||||
|
||||
s.blocks.anyIt( it.cid == cid )
|
||||
|
||||
method putBlock*(
|
||||
s: MemoryStore,
|
||||
blk: Block): Future[bool] {.async.} =
|
||||
## Put a block to the blockstore
|
||||
##
|
||||
|
||||
trace "Putting block", cid = blk.cid
|
||||
s.blocks.add(blk)
|
||||
|
||||
return blk.cid in s
|
||||
|
||||
method delBlock*(
|
||||
s: MemoryStore,
|
||||
cid: Cid): Future[bool] {.async.} =
|
||||
## delete a block/s from the block store
|
||||
##
|
||||
|
||||
s.blocks.keepItIf( it.cid != cid )
|
||||
return cid notin s
|
||||
|
||||
func new*(_: type MemoryStore, blocks: openArray[Block] = []): MemoryStore =
|
||||
MemoryStore(
|
||||
blocks: @blocks
|
||||
)
|
@ -62,13 +62,13 @@ suite "NetworkStore engine - 2 nodes":
|
||||
peerId1 = switch1.peerInfo.peerId
|
||||
peerId2 = switch2.peerInfo.peerId
|
||||
|
||||
localStore1 = MemoryStore.new(blocks1.mapIt( it ))
|
||||
localStore1 = CacheStore.new(blocks1.mapIt( it ))
|
||||
network1 = BlockExcNetwork.new(switch = switch1)
|
||||
engine1 = BlockExcEngine.new(localStore1, wallet1, network1)
|
||||
blockexc1 = NetworkStore.new(engine1, localStore1)
|
||||
switch1.mount(network1)
|
||||
|
||||
localStore2 = MemoryStore.new(blocks2.mapIt( it ))
|
||||
localStore2 = CacheStore.new(blocks2.mapIt( it ))
|
||||
network2 = BlockExcNetwork.new(switch = switch2)
|
||||
engine2 = BlockExcEngine.new(localStore2, wallet2, network2)
|
||||
blockexc2 = NetworkStore.new(engine2, localStore2)
|
||||
|
@ -57,7 +57,7 @@ suite "NetworkStore engine basic":
|
||||
))
|
||||
|
||||
engine = BlockExcEngine.new(
|
||||
MemoryStore.new(blocks.mapIt( it )),
|
||||
CacheStore.new(blocks.mapIt( it )),
|
||||
wallet,
|
||||
network)
|
||||
engine.wantList = blocks.mapIt( it.cid )
|
||||
@ -77,7 +77,7 @@ suite "NetworkStore engine basic":
|
||||
sendAccount: sendAccount,
|
||||
))
|
||||
|
||||
engine = BlockExcEngine.new(MemoryStore.new, wallet, network)
|
||||
engine = BlockExcEngine.new(CacheStore.new, wallet, network)
|
||||
|
||||
engine.pricing = pricing.some
|
||||
engine.setupPeer(peerId)
|
||||
@ -106,7 +106,7 @@ suite "NetworkStore engine handlers":
|
||||
blocks.add(bt.Block.init(chunk).tryGet())
|
||||
|
||||
done = newFuture[void]()
|
||||
engine = BlockExcEngine.new(MemoryStore.new(), wallet, BlockExcNetwork())
|
||||
engine = BlockExcEngine.new(CacheStore.new(), wallet, BlockExcNetwork())
|
||||
peerCtx = BlockExcPeerCtx(
|
||||
id: peerId
|
||||
)
|
||||
@ -230,7 +230,7 @@ suite "Task Handler":
|
||||
blocks.add(bt.Block.init(chunk).tryGet())
|
||||
|
||||
done = newFuture[void]()
|
||||
engine = BlockExcEngine.new(MemoryStore.new(), wallet, BlockExcNetwork())
|
||||
engine = BlockExcEngine.new(CacheStore.new(), wallet, BlockExcNetwork())
|
||||
peersCtx = @[]
|
||||
|
||||
for i in 0..3:
|
||||
|
@ -19,7 +19,7 @@ proc generateNodes*(
|
||||
switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
|
||||
wallet = WalletRef.example
|
||||
network = BlockExcNetwork.new(switch)
|
||||
localStore = MemoryStore.new(blocks.mapIt( it ))
|
||||
localStore = CacheStore.new(blocks.mapIt( it ))
|
||||
engine = BlockExcEngine.new(localStore, wallet, network)
|
||||
networkStore = NetworkStore.new(engine, localStore)
|
||||
|
||||
|
108
tests/dagger/stores/testcachestore.nim
Normal file
108
tests/dagger/stores/testcachestore.nim
Normal file
@ -0,0 +1,108 @@
|
||||
import std/strutils
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/asynctest
|
||||
import pkg/libp2p
|
||||
import pkg/stew/byteutils
|
||||
import pkg/questionable/results
|
||||
import pkg/dagger/stores/cachestore
|
||||
import pkg/dagger/chunker
|
||||
|
||||
import ../helpers
|
||||
|
||||
suite "Cache Store tests":
|
||||
var
|
||||
newBlock, newBlock1, newBlock2, newBlock3: Block
|
||||
store: CacheStore
|
||||
|
||||
setup:
|
||||
newBlock = Block.init("New Kids on the Block".toBytes()).tryGet()
|
||||
newBlock1 = Block.init("1".repeat(100).toBytes()).tryGet()
|
||||
newBlock2 = Block.init("2".repeat(100).toBytes()).tryGet()
|
||||
newBlock3 = Block.init("3".repeat(100).toBytes()).tryGet()
|
||||
store = CacheStore.new()
|
||||
|
||||
test "constructor":
|
||||
# cache size cannot be smaller than chunk size
|
||||
expect ValueError:
|
||||
discard CacheStore.new(cacheSize = 1, chunkSize = 2)
|
||||
|
||||
store = CacheStore.new(cacheSize = 100, chunkSize = 1)
|
||||
check store.currentSize == 0
|
||||
store = CacheStore.new(@[newBlock1, newBlock2, newBlock3])
|
||||
check store.currentSize == 300
|
||||
|
||||
# initial cache blocks total more than cache size, currentSize should
|
||||
# never exceed max cache size
|
||||
store = CacheStore.new(
|
||||
blocks = @[newBlock1, newBlock2, newBlock3],
|
||||
cacheSize = 200,
|
||||
chunkSize = 1)
|
||||
check store.currentSize == 200
|
||||
|
||||
# cache size cannot be less than chunks size
|
||||
expect ValueError:
|
||||
discard CacheStore.new(
|
||||
cacheSize = 99,
|
||||
chunkSize = 100)
|
||||
|
||||
test "putBlock":
|
||||
|
||||
check:
|
||||
await store.putBlock(newBlock1)
|
||||
newBlock1.cid in store
|
||||
|
||||
# block size bigger than entire cache
|
||||
store = CacheStore.new(cacheSize = 99, chunkSize = 98)
|
||||
check not await store.putBlock(newBlock1)
|
||||
|
||||
# block being added causes removal of LRU block
|
||||
store = CacheStore.new(
|
||||
@[newBlock1, newBlock2, newBlock3],
|
||||
cacheSize = 200,
|
||||
chunkSize = 1)
|
||||
check:
|
||||
not store.hasBlock(newBlock1.cid)
|
||||
store.hasBlock(newBlock2.cid)
|
||||
store.hasBlock(newBlock3.cid)
|
||||
store.currentSize == newBlock2.data.len + newBlock3.data.len # 200
|
||||
|
||||
test "getBlock":
|
||||
store = CacheStore.new(@[newBlock])
|
||||
|
||||
let blk = await store.getBlock(newBlock.cid)
|
||||
|
||||
check:
|
||||
blk.isOk
|
||||
blk.get == newBlock
|
||||
|
||||
test "fail getBlock":
|
||||
let blk = await store.getBlock(newBlock.cid)
|
||||
|
||||
check:
|
||||
blk.isErr
|
||||
blk.error of system.KeyError
|
||||
|
||||
test "hasBlock":
|
||||
let store = CacheStore.new(@[newBlock])
|
||||
|
||||
check store.hasBlock(newBlock.cid)
|
||||
|
||||
test "fail hasBlock":
|
||||
check not store.hasBlock(newBlock.cid)
|
||||
|
||||
test "delBlock":
|
||||
# empty cache
|
||||
check not await store.delBlock(newBlock1.cid)
|
||||
|
||||
# successfully deleted
|
||||
discard await store.putBlock(newBlock1)
|
||||
check await store.delBlock(newBlock1.cid)
|
||||
|
||||
# deletes item should decrement size
|
||||
store = CacheStore.new(@[newBlock1, newBlock2, newBlock3])
|
||||
check:
|
||||
store.currentSize == 300
|
||||
await store.delBlock(newBlock2.cid)
|
||||
store.currentSize == 200
|
||||
newBlock2.cid notin store
|
@ -8,7 +8,7 @@ import pkg/asynctest
|
||||
import pkg/libp2p
|
||||
import pkg/stew/byteutils
|
||||
|
||||
import pkg/dagger/stores/memorystore
|
||||
import pkg/dagger/stores/cachestore
|
||||
import pkg/dagger/chunker
|
||||
import pkg/dagger/stores
|
||||
|
||||
|
@ -1,57 +0,0 @@
|
||||
import pkg/chronos
|
||||
import pkg/asynctest
|
||||
import pkg/libp2p
|
||||
import pkg/stew/byteutils
|
||||
import pkg/questionable/results
|
||||
import pkg/dagger/stores/memorystore
|
||||
import pkg/dagger/chunker
|
||||
|
||||
import ../helpers
|
||||
|
||||
suite "Memory Store tests":
|
||||
test "putBlock":
|
||||
let
|
||||
newBlock = Block.init("New Block".toBytes()).tryGet()
|
||||
store = MemoryStore.new()
|
||||
|
||||
check await store.putBlock(newBlock)
|
||||
check newBlock.cid in store
|
||||
|
||||
test "getBlock":
|
||||
let
|
||||
newBlock = Block.init("New Block".toBytes()).tryGet()
|
||||
store = MemoryStore.new(@[newBlock])
|
||||
|
||||
let blk = await store.getBlock(newBlock.cid)
|
||||
check blk.isOk
|
||||
check blk == newBlock.success
|
||||
|
||||
test "fail getBlock":
|
||||
let
|
||||
newBlock = Block.init("New Block".toBytes()).tryGet()
|
||||
store = MemoryStore.new(@[])
|
||||
|
||||
let blk = await store.getBlock(newBlock.cid)
|
||||
check blk.isErr
|
||||
|
||||
test "hasBlock":
|
||||
let
|
||||
newBlock = Block.init("New Block".toBytes()).tryGet()
|
||||
store = MemoryStore.new(@[newBlock])
|
||||
|
||||
check store.hasBlock(newBlock.cid)
|
||||
|
||||
test "fail hasBlock":
|
||||
let
|
||||
newBlock = Block.init("New Block".toBytes()).tryGet()
|
||||
store = MemoryStore.new(@[])
|
||||
|
||||
check not store.hasBlock(newBlock.cid)
|
||||
|
||||
test "delBlock":
|
||||
let
|
||||
newBlock = Block.init("New Block".toBytes()).tryGet()
|
||||
store = MemoryStore.new(@[newBlock])
|
||||
|
||||
check await store.delBlock(newBlock.cid)
|
||||
check newBlock.cid notin store
|
@ -27,7 +27,7 @@ suite "Test Node":
|
||||
switch: Switch
|
||||
wallet: WalletRef
|
||||
network: BlockExcNetwork
|
||||
localStore: MemoryStore
|
||||
localStore: CacheStore
|
||||
engine: BlockExcEngine
|
||||
store: NetworkStore
|
||||
node: DaggerNodeRef
|
||||
@ -38,7 +38,7 @@ suite "Test Node":
|
||||
switch = newStandardSwitch()
|
||||
wallet = WalletRef.new(EthPrivateKey.random())
|
||||
network = BlockExcNetwork.new(switch)
|
||||
localStore = MemoryStore.new()
|
||||
localStore = CacheStore.new()
|
||||
engine = BlockExcEngine.new(localStore, wallet, network)
|
||||
store = NetworkStore.new(engine, localStore)
|
||||
node = DaggerNodeRef.new(switch, store, engine)
|
||||
|
@ -1,4 +1,4 @@
|
||||
import ./stores/testfsstore
|
||||
import ./stores/testmemorystore
|
||||
import ./stores/testcachestore
|
||||
|
||||
{.warning[UnusedImport]: off.}
|
||||
|
1
vendor/lrucache.nim
vendored
Submodule
1
vendor/lrucache.nim
vendored
Submodule
@ -0,0 +1 @@
|
||||
Subproject commit 717abe4e612b5bd5c8c71ee14939d139a8e633e3
|
Loading…
x
Reference in New Issue
Block a user