Metadata in LevelDB (#806)
* pulls in datastore-leveldb update * bump * Applies LevelDb as metadata store. Adds option for repostore. * Sets submodule to main branch * I can do syntax, me * Removes wildcard from metadata query key * Applies leveldb instead of sqlite-in-memory for tests * Restores query key wildcard. * Pins nim-datastore to latest master * bumps leveldb to 0.1.4 --------- Co-authored-by: Dmitriy Ryajov <dryajov@gmail.com>
This commit is contained in:
parent
e6a387e8e8
commit
bd8fedaf28
|
@ -212,3 +212,6 @@
|
|||
[submodule "vendor/nim-serde"]
|
||||
path = vendor/nim-serde
|
||||
url = https://github.com/codex-storage/nim-serde.git
|
||||
[submodule "vendor/nim-leveldbstatic"]
|
||||
path = vendor/nim-leveldbstatic
|
||||
url = https://github.com/codex-storage/nim-leveldb.git
|
||||
|
|
|
@ -232,7 +232,7 @@ proc new*(
|
|||
|
||||
let
|
||||
discoveryStore = Datastore(
|
||||
SQLiteDatastore.new(config.dataDir / CodexDhtProvidersNamespace)
|
||||
LevelDbDatastore.new(config.dataDir / CodexDhtProvidersNamespace)
|
||||
.expect("Should create discovery datastore!"))
|
||||
|
||||
discovery = Discovery.new(
|
||||
|
@ -251,11 +251,13 @@ proc new*(
|
|||
.expect("Should create repo file data store!"))
|
||||
of repoSQLite: Datastore(SQLiteDatastore.new($config.dataDir)
|
||||
.expect("Should create repo SQLite data store!"))
|
||||
of repoLevelDb: Datastore(LevelDbDatastore.new($config.dataDir)
|
||||
.expect("Should create repo LevelDB data store!"))
|
||||
|
||||
repoStore = RepoStore.new(
|
||||
repoDs = repoData,
|
||||
metaDs = SQLiteDatastore.new(config.dataDir / CodexMetaNamespace)
|
||||
.expect("Should create meta data store!"),
|
||||
metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace)
|
||||
.expect("Should create metadata store!"),
|
||||
quotaMaxBytes = config.storageQuota.uint,
|
||||
blockTtl = config.blockTtl)
|
||||
|
||||
|
|
|
@ -82,6 +82,7 @@ type
|
|||
RepoKind* = enum
|
||||
repoFS = "fs"
|
||||
repoSQLite = "sqlite"
|
||||
repoLevelDb = "leveldb"
|
||||
|
||||
CodexConf* = object
|
||||
configFile* {.
|
||||
|
@ -190,7 +191,7 @@ type
|
|||
abbr: "p" }: Port
|
||||
|
||||
repoKind* {.
|
||||
desc: "Backend for main repo store (fs, sqlite)"
|
||||
desc: "Backend for main repo store (fs, sqlite, leveldb)"
|
||||
defaultValueDesc: "fs"
|
||||
defaultValue: repoFS
|
||||
name: "repo-kind" }: RepoKind
|
||||
|
|
|
@ -75,10 +75,12 @@ type
|
|||
repo: RepoStore
|
||||
onAvailabilityAdded: ?OnAvailabilityAdded
|
||||
GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.}
|
||||
IterDispose* = proc(): Future[?!void] {.gcsafe, closure.}
|
||||
OnAvailabilityAdded* = proc(availability: Availability): Future[void] {.upraises: [], gcsafe.}
|
||||
StorableIter* = ref object
|
||||
finished*: bool
|
||||
next*: GetNext
|
||||
dispose*: IterDispose
|
||||
ReservationsError* = object of CodexError
|
||||
ReserveFailedError* = object of ReservationsError
|
||||
ReleaseFailedError* = object of ReservationsError
|
||||
|
@ -552,7 +554,11 @@ proc storables(
|
|||
|
||||
return none seq[byte]
|
||||
|
||||
proc dispose(): Future[?!void] {.async.} =
|
||||
return await results.dispose()
|
||||
|
||||
iter.next = next
|
||||
iter.dispose = dispose
|
||||
return success iter
|
||||
|
||||
proc allImpl(
|
||||
|
@ -620,6 +626,12 @@ proc findAvailability*(
|
|||
minPrice, availMinPrice = availability.minPrice,
|
||||
collateral, availMaxCollateral = availability.maxCollateral
|
||||
|
||||
# TODO: As soon as we're on ARC-ORC, we can use destructors
|
||||
# to automatically dispose our iterators when they fall out of scope.
|
||||
# For now:
|
||||
if err =? (await storables.dispose()).errorOption:
|
||||
error "failed to dispose storables iter", error = err.msg
|
||||
return none Availability
|
||||
return some availability
|
||||
|
||||
trace "availability did not match",
|
||||
|
@ -627,3 +639,4 @@ proc findAvailability*(
|
|||
duration, availDuration = availability.duration,
|
||||
minPrice, availMinPrice = availability.minPrice,
|
||||
collateral, availMaxCollateral = availability.maxCollateral
|
||||
|
||||
|
|
|
@ -87,6 +87,8 @@ template setupAndTearDown*() {.dirty.} =
|
|||
|
||||
let
|
||||
path = currentSourcePath().parentDir
|
||||
repoTmp = TempLevelDb.new()
|
||||
metaTmp = TempLevelDb.new()
|
||||
|
||||
setup:
|
||||
file = open(path /../ "" /../ "fixtures" / "test.jpg")
|
||||
|
@ -96,8 +98,8 @@ template setupAndTearDown*() {.dirty.} =
|
|||
network = BlockExcNetwork.new(switch)
|
||||
|
||||
clock = SystemClock.new()
|
||||
localStoreMetaDs = SQLiteDatastore.new(Memory).tryGet()
|
||||
localStoreRepoDs = SQLiteDatastore.new(Memory).tryGet()
|
||||
localStoreMetaDs = metaTmp.newDb()
|
||||
localStoreRepoDs = repoTmp.newDb()
|
||||
localStore = RepoStore.new(localStoreRepoDs, localStoreMetaDs, clock = clock)
|
||||
await localStore.start()
|
||||
|
||||
|
@ -124,3 +126,5 @@ template setupAndTearDown*() {.dirty.} =
|
|||
teardown:
|
||||
close(file)
|
||||
await node.stop()
|
||||
await metaTmp.destroyDb()
|
||||
await repoTmp.destroyDb()
|
||||
|
|
|
@ -17,16 +17,23 @@ asyncchecksuite "Reservations module":
|
|||
var
|
||||
repo: RepoStore
|
||||
repoDs: Datastore
|
||||
metaDs: SQLiteDatastore
|
||||
metaDs: Datastore
|
||||
reservations: Reservations
|
||||
let
|
||||
repoTmp = TempLevelDb.new()
|
||||
metaTmp = TempLevelDb.new()
|
||||
|
||||
setup:
|
||||
randomize(1.int64) # create reproducible results
|
||||
repoDs = SQLiteDatastore.new(Memory).tryGet()
|
||||
metaDs = SQLiteDatastore.new(Memory).tryGet()
|
||||
repoDs = repoTmp.newDb()
|
||||
metaDs = metaTmp.newDb()
|
||||
repo = RepoStore.new(repoDs, metaDs)
|
||||
reservations = Reservations.new(repo)
|
||||
|
||||
teardown:
|
||||
await repoTmp.destroyDb()
|
||||
await metaTmp.destroyDb()
|
||||
|
||||
proc createAvailability(): Availability =
|
||||
let example = Availability.example
|
||||
let totalSize = rand(100000..200000)
|
||||
|
|
|
@ -22,7 +22,10 @@ import ../examples
|
|||
import ./helpers/periods
|
||||
|
||||
asyncchecksuite "Sales - start":
|
||||
let proof = Groth16Proof.example
|
||||
let
|
||||
proof = Groth16Proof.example
|
||||
repoTmp = TempLevelDb.new()
|
||||
metaTmp = TempLevelDb.new()
|
||||
|
||||
var request: StorageRequest
|
||||
var sales: Sales
|
||||
|
@ -50,8 +53,8 @@ asyncchecksuite "Sales - start":
|
|||
|
||||
market = MockMarket.new()
|
||||
clock = MockClock.new()
|
||||
let repoDs = SQLiteDatastore.new(Memory).tryGet()
|
||||
let metaDs = SQLiteDatastore.new(Memory).tryGet()
|
||||
let repoDs = repoTmp.newDb()
|
||||
let metaDs = metaTmp.newDb()
|
||||
repo = RepoStore.new(repoDs, metaDs)
|
||||
await repo.start()
|
||||
sales = Sales.new(market, clock, repo)
|
||||
|
@ -73,6 +76,8 @@ asyncchecksuite "Sales - start":
|
|||
teardown:
|
||||
await sales.stop()
|
||||
await repo.stop()
|
||||
await repoTmp.destroyDb()
|
||||
await metaTmp.destroyDb()
|
||||
|
||||
proc fillSlot(slotIdx: UInt256 = 0.u256) {.async.} =
|
||||
let address = await market.getSigner()
|
||||
|
@ -113,7 +118,10 @@ asyncchecksuite "Sales - start":
|
|||
check sales.agents.any(agent => agent.data.requestId == request.id and agent.data.slotIndex == 1.u256)
|
||||
|
||||
asyncchecksuite "Sales":
|
||||
let proof = Groth16Proof.example
|
||||
let
|
||||
proof = Groth16Proof.example
|
||||
repoTmp = TempLevelDb.new()
|
||||
metaTmp = TempLevelDb.new()
|
||||
|
||||
var availability: Availability
|
||||
var request: StorageRequest
|
||||
|
@ -154,8 +162,8 @@ asyncchecksuite "Sales":
|
|||
market.requestEnds[request.id] = request.expiry.toSecondsSince1970
|
||||
|
||||
clock = MockClock.new()
|
||||
let repoDs = SQLiteDatastore.new(Memory).tryGet()
|
||||
let metaDs = SQLiteDatastore.new(Memory).tryGet()
|
||||
let repoDs = repoTmp.newDb()
|
||||
let metaDs = metaTmp.newDb()
|
||||
repo = RepoStore.new(repoDs, metaDs)
|
||||
await repo.start()
|
||||
sales = Sales.new(market, clock, repo)
|
||||
|
@ -177,6 +185,8 @@ asyncchecksuite "Sales":
|
|||
teardown:
|
||||
await sales.stop()
|
||||
await repo.stop()
|
||||
await repoTmp.destroyDb()
|
||||
await metaTmp.destroyDb()
|
||||
|
||||
proc allowRequestToStart {.async.} =
|
||||
# wait until we're in initialproving state
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
|
||||
import std/sequtils
|
||||
import std/sugar
|
||||
import std/options
|
||||
|
@ -19,6 +18,7 @@ import pkg/codex/stores
|
|||
|
||||
import ./helpers
|
||||
import ../helpers
|
||||
import ../../helpers
|
||||
|
||||
suite "Test Circom Compat Backend - control inputs":
|
||||
let
|
||||
|
@ -69,6 +69,9 @@ suite "Test Circom Compat Backend":
|
|||
wasm = "tests/circuits/fixtures/proof_main.wasm"
|
||||
zkey = "tests/circuits/fixtures/proof_main.zkey"
|
||||
|
||||
repoTmp = TempLevelDb.new()
|
||||
metaTmp = TempLevelDb.new()
|
||||
|
||||
var
|
||||
store: BlockStore
|
||||
manifest: Manifest
|
||||
|
@ -82,8 +85,8 @@ suite "Test Circom Compat Backend":
|
|||
|
||||
setup:
|
||||
let
|
||||
repoDs = SQLiteDatastore.new(Memory).tryGet()
|
||||
metaDs = SQLiteDatastore.new(Memory).tryGet()
|
||||
repoDs = repoTmp.newDb()
|
||||
metaDs = metaTmp.newDb()
|
||||
|
||||
store = RepoStore.new(repoDs, metaDs)
|
||||
|
||||
|
@ -105,6 +108,9 @@ suite "Test Circom Compat Backend":
|
|||
|
||||
teardown:
|
||||
circom.release() # this comes from the rust FFI
|
||||
await repoTmp.destroyDb()
|
||||
await metaTmp.destroyDb()
|
||||
|
||||
|
||||
test "Should verify with correct input":
|
||||
var
|
||||
|
|
|
@ -84,6 +84,8 @@ suite "Test Sampler":
|
|||
entropy = 1234567.toF
|
||||
blockSize = DefaultBlockSize
|
||||
cellSize = DefaultCellSize
|
||||
repoTmp = TempLevelDb.new()
|
||||
metaTmp = TempLevelDb.new()
|
||||
|
||||
var
|
||||
store: RepoStore
|
||||
|
@ -94,8 +96,8 @@ suite "Test Sampler":
|
|||
|
||||
setup:
|
||||
let
|
||||
repoDs = SQLiteDatastore.new(Memory).tryGet()
|
||||
metaDs = SQLiteDatastore.new(Memory).tryGet()
|
||||
repoDs = repoTmp.newDb()
|
||||
metaDs = metaTmp.newDb()
|
||||
|
||||
store = RepoStore.new(repoDs, metaDs)
|
||||
|
||||
|
@ -112,6 +114,8 @@ suite "Test Sampler":
|
|||
|
||||
teardown:
|
||||
await store.close()
|
||||
await repoTmp.destroyDb()
|
||||
await metaTmp.destroyDb()
|
||||
|
||||
test "Should fail instantiating for invalid slot index":
|
||||
let
|
||||
|
|
|
@ -31,6 +31,8 @@ suite "Test Prover":
|
|||
numDatasetBlocks = 8
|
||||
blockSize = DefaultBlockSize
|
||||
cellSize = DefaultCellSize
|
||||
repoTmp = TempLevelDb.new()
|
||||
metaTmp = TempLevelDb.new()
|
||||
|
||||
var
|
||||
datasetBlocks: seq[bt.Block]
|
||||
|
@ -42,8 +44,8 @@ suite "Test Prover":
|
|||
|
||||
setup:
|
||||
let
|
||||
repoDs = SQLiteDatastore.new(Memory).tryGet()
|
||||
metaDs = SQLiteDatastore.new(Memory).tryGet()
|
||||
repoDs = repoTmp.newDb()
|
||||
metaDs = metaTmp.newDb()
|
||||
|
||||
store = RepoStore.new(repoDs, metaDs)
|
||||
|
||||
|
@ -55,6 +57,10 @@ suite "Test Prover":
|
|||
blockSize,
|
||||
cellSize)
|
||||
|
||||
teardown:
|
||||
await repoTmp.destroyDb()
|
||||
await metaTmp.destroyDb()
|
||||
|
||||
test "Should sample and prove a slot":
|
||||
let
|
||||
r1cs = "tests/circuits/fixtures/proof_main.r1cs"
|
||||
|
|
|
@ -65,6 +65,8 @@ suite "Slot builder":
|
|||
|
||||
# empty digest
|
||||
emptyDigest = SpongeMerkle.digest(newSeq[byte](blockSize.int), cellSize.int)
|
||||
repoTmp = TempLevelDb.new()
|
||||
metaTmp = TempLevelDb.new()
|
||||
|
||||
var
|
||||
datasetBlocks: seq[bt.Block]
|
||||
|
@ -77,8 +79,8 @@ suite "Slot builder":
|
|||
|
||||
setup:
|
||||
let
|
||||
repoDs = SQLiteDatastore.new(Memory).tryGet()
|
||||
metaDs = SQLiteDatastore.new(Memory).tryGet()
|
||||
repoDs = repoTmp.newDb()
|
||||
metaDs = metaTmp.newDb()
|
||||
|
||||
localStore = RepoStore.new(repoDs, metaDs)
|
||||
chunker = RandomChunker.new(Rng.instance(), size = totalDatasetSize, chunkSize = blockSize)
|
||||
|
@ -96,6 +98,8 @@ suite "Slot builder":
|
|||
|
||||
teardown:
|
||||
await localStore.close()
|
||||
await repoTmp.destroyDb()
|
||||
await metaTmp.destroyDb()
|
||||
|
||||
# TODO: THIS IS A BUG IN asynctest, because it doesn't release the
|
||||
# objects after the test is done, so we need to do it manually
|
||||
|
|
|
@ -90,4 +90,4 @@ checksuite "KeyUtils":
|
|||
namespaces.len == 3
|
||||
namespaces[0].value == CodexMetaNamespace
|
||||
namespaces[1].value == "ttl"
|
||||
namespaces[2].value == "*"
|
||||
namespaces[2].value == "*"
|
|
@ -28,11 +28,13 @@ suite "Erasure encode/decode":
|
|||
var store: BlockStore
|
||||
var erasure: Erasure
|
||||
var taskpool: Taskpool
|
||||
let repoTmp = TempLevelDb.new()
|
||||
let metaTmp = TempLevelDb.new()
|
||||
|
||||
setup:
|
||||
let
|
||||
repoDs = SQLiteDatastore.new(Memory).tryGet()
|
||||
metaDs = SQLiteDatastore.new(Memory).tryGet()
|
||||
repoDs = repoTmp.newDb()
|
||||
metaDs = metaTmp.newDb()
|
||||
rng = Rng.instance()
|
||||
chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize)
|
||||
store = RepoStore.new(repoDs, metaDs)
|
||||
|
@ -40,6 +42,10 @@ suite "Erasure encode/decode":
|
|||
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider, taskpool)
|
||||
manifest = await storeDataGetManifest(store, chunker)
|
||||
|
||||
teardown:
|
||||
await repoTmp.destroyDb()
|
||||
await metaTmp.destroyDb()
|
||||
|
||||
proc encode(buffers, parity: int): Future[Manifest] {.async.} =
|
||||
let
|
||||
encoded = (await erasure.encode(
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import helpers/multisetup
|
||||
import helpers/trackers
|
||||
import helpers/templeveldb
|
||||
|
||||
export multisetup, trackers
|
||||
export multisetup, trackers, templeveldb
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
import os
|
||||
import std/monotimes
|
||||
import pkg/datastore
|
||||
import pkg/chronos
|
||||
import pkg/questionable/results
|
||||
|
||||
type
|
||||
TempLevelDb* = ref object
|
||||
currentPath: string
|
||||
ds: LevelDbDatastore
|
||||
|
||||
var number = 0
|
||||
|
||||
proc newDb*(self: TempLevelDb): Datastore =
|
||||
if self.currentPath.len > 0:
|
||||
raiseAssert("TempLevelDb already active.")
|
||||
self.currentPath = getTempDir() / "templeveldb" / $number / $getmonotime()
|
||||
inc number
|
||||
createdir(self.currentPath)
|
||||
self.ds = LevelDbDatastore.new(self.currentPath).tryGet()
|
||||
return self.ds
|
||||
|
||||
proc destroyDb*(self: TempLevelDb): Future[void] {.async.} =
|
||||
if self.currentPath.len == 0:
|
||||
raiseAssert("TempLevelDb not active.")
|
||||
try:
|
||||
(await self.ds.close()).tryGet()
|
||||
finally:
|
||||
removedir(self.currentPath)
|
||||
self.currentPath = ""
|
|
@ -1 +1 @@
|
|||
Subproject commit 8a95ed9c90a9ea31fc1341b92c8a9c0935368cd9
|
||||
Subproject commit f4989fcce5d74a648e7e2598a72a7b21948f4a85
|
|
@ -0,0 +1 @@
|
|||
Subproject commit 3cb21890d4dc29c579d309b94f60f51ee9633a6d
|
Loading…
Reference in New Issue