mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-10 17:33:09 +00:00
feat: simple impl for filestore
This commit is contained in:
parent
60861d6af8
commit
e656c50500
110
benchmarks/repostore_rw_bench.nim
Normal file
110
benchmarks/repostore_rw_bench.nim
Normal file
@ -0,0 +1,110 @@
|
||||
import std/tempfiles
|
||||
import std/times
|
||||
import std/random
|
||||
import std/os
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/datastore
|
||||
|
||||
import pkg/codex/blocktype
|
||||
import pkg/codex/merkletree/codex
|
||||
import pkg/codex/manifest
|
||||
import pkg/codex/stores/[filestore, repostore]
|
||||
|
||||
const
|
||||
NBlocks = 81_920
|
||||
BlockSize = 65_536
|
||||
DataDir = ""
|
||||
|
||||
type Dataset* = tuple[blocks: seq[Block], tree: CodexTree, manifest: Manifest]
|
||||
|
||||
template benchmark*(name: string, blk: untyped) =
|
||||
let t0 = epochTime()
|
||||
blk
|
||||
let elapsed = epochTime() - t0
|
||||
echo name, " ", elapsed.formatFloat(format = ffDecimal, precision = 3), " s"
|
||||
|
||||
proc makeRandomBlock*(size: NBytes): Block =
|
||||
let bytes = newSeqWith(size.int, rand(uint8))
|
||||
Block.new(bytes).tryGet()
|
||||
|
||||
proc makeDataset*(nblocks: int, blockSize: int): ?!Dataset =
|
||||
echo "Generate dataset with ", nblocks, " blocks of ", blockSize, " bytes each."
|
||||
|
||||
let
|
||||
blocks = newSeqWith(nblocks, makeRandomBlock(NBytes(blockSize)))
|
||||
tree = ?CodexTree.init(blocks.mapIt(it.cid))
|
||||
treeCid = ?tree.rootCid
|
||||
manifest = Manifest.new(
|
||||
treeCid = treeCid,
|
||||
blockSize = NBytes(blockSize),
|
||||
datasetSize = NBytes(nblocks * blockSize),
|
||||
)
|
||||
|
||||
return success((blocks, tree, manifest))
|
||||
|
||||
proc newRepostore(dataDir: string): RepoStore =
|
||||
let blockStore =
|
||||
FSDatastore.new(dataDir, depth = 5).expect("Should create repo file data store!")
|
||||
let repoStore = RepoStore.new(
|
||||
blockStore,
|
||||
LevelDbDatastore.new(dataDir / "meta").expect("Should create metadata store!"),
|
||||
quotaMaxBytes = 10_000_000_000'nb,
|
||||
)
|
||||
repoStore
|
||||
|
||||
proc asBlock(m: Manifest): Block =
|
||||
let mdata = m.encode().tryGet()
|
||||
Block.new(data = mdata, codec = ManifestCodec).tryGet()
|
||||
|
||||
proc writeData(
|
||||
dataset: Dataset, store: RepoStore
|
||||
): Future[void] {.async: (raises: [CatchableError]).} =
|
||||
let (blocks, tree, manifest) = dataset
|
||||
|
||||
for i in 0 ..< NBlocks:
|
||||
(await store.putBlock(blocks[i])).tryGet()
|
||||
|
||||
(await store.putBlock(manifest.asBlock())).tryGet()
|
||||
|
||||
proc writeData(
|
||||
dataset: Dataset, store: FileStore
|
||||
): Future[void] {.async: (raises: [CatchableError]).} =
|
||||
let (blocks, tree, manifest) = dataset
|
||||
|
||||
let file = store.create(manifest).tryGet()
|
||||
|
||||
for i in 0 ..< NBlocks:
|
||||
(await file.putBlock(i, blocks[i])).tryGet()
|
||||
|
||||
proc runRepostoreBench(
|
||||
dataset: Dataset
|
||||
): Future[void] {.async: (raises: [CatchableError]).} =
|
||||
let
|
||||
dir = createTempDir("repostore-bench", "")
|
||||
store = newRepostore(dir)
|
||||
|
||||
echo "Store dir is ", dir
|
||||
defer:
|
||||
removeDir(dir)
|
||||
|
||||
benchmark "filestore write data":
|
||||
await writeData(dataset, store)
|
||||
|
||||
proc runFilestoreBench(
|
||||
dataset: Dataset
|
||||
): Future[void] {.async: (raises: [CatchableError]).} =
|
||||
let
|
||||
dir = createTempDir("filestore-bench", "")
|
||||
store = FileStore(root: dir)
|
||||
|
||||
echo "Store dir is ", dir
|
||||
defer:
|
||||
removeDir(dir)
|
||||
|
||||
benchmark "repostore write data":
|
||||
await writeData(dataset, store)
|
||||
|
||||
let dataset = makeDataset(NBlocks, BlockSize).tryGet()
|
||||
waitFor runRepostoreBench(dataset)
|
||||
waitFor runFilestoreBench(dataset)
|
||||
95
codex/stores/filestore.nim
Normal file
95
codex/stores/filestore.nim
Normal file
@ -0,0 +1,95 @@
|
||||
import std/posix
|
||||
import std/strformat
|
||||
|
||||
import ../manifest/manifest
|
||||
import ../blocktype
|
||||
import ../utils
|
||||
import ../utils/json
|
||||
|
||||
import pkg/libp2p/protobuf/minprotobuf
|
||||
import pkg/libp2p/[cid, multihash, multicodec]
|
||||
import pkg/questionable/results
|
||||
|
||||
import ../errors
|
||||
import ../units
|
||||
import ../blocktype
|
||||
import ../indexingstrategy
|
||||
import ../logutils
|
||||
|
||||
const DefaultBlockSize*: uint = 65536
|
||||
|
||||
type FileStore* = object
|
||||
root*: string
|
||||
|
||||
type File* = object
|
||||
manifest: Manifest
|
||||
filepath: string
|
||||
fd*: cint
|
||||
|
||||
proc allocStorage*(filepath: string, size: int): ?!cint =
|
||||
let fd = open(filepath, O_CREAT or O_RDWR or O_TRUNC, 0o644)
|
||||
if fd < 0:
|
||||
return failure(&"open failed with error {fd}")
|
||||
|
||||
let rc = posix_fallocate(fd, 0, size)
|
||||
if rc != 0:
|
||||
return failure(&"posix_fallocate failed with error {rc}")
|
||||
|
||||
success(fd)
|
||||
|
||||
proc open*(self: var File): ?!void =
|
||||
self.fd = open(self.filepath, O_RDWR)
|
||||
if self.fd < 0:
|
||||
return failure(&"open failed with error {self.fd}")
|
||||
|
||||
success()
|
||||
|
||||
proc initDataset*(filepath: string, manifest: Manifest): ?!File =
|
||||
let fd = ?allocStorage(filepath, manifest.datasetSize.int)
|
||||
success(File(manifest: manifest, filepath: filepath, fd: fd))
|
||||
|
||||
proc ensureOpen*(self: File): ?!void =
|
||||
if self.fd < 0:
|
||||
return failure("dataset is not open")
|
||||
success()
|
||||
|
||||
proc getBlock*(
|
||||
self: File, index: int
|
||||
): Future[?!Block] {.async: (raises: [CatchableError]).} =
|
||||
?self.ensureOpen()
|
||||
|
||||
if index > self.manifest.blocksCount:
|
||||
return failure(&"index out of bounds")
|
||||
|
||||
let
|
||||
blockSize = self.manifest.blockSize.int
|
||||
offset = index * blockSize
|
||||
buf = newSeqWith(blockSize, 0.byte)
|
||||
rc = pread(self.fd, addr(buf[0]), blockSize, offset)
|
||||
|
||||
if rc != blockSize:
|
||||
return failure(&"pread failed with error {rc}")
|
||||
|
||||
Block.new(data = buf)
|
||||
|
||||
proc putBlock*(
|
||||
self: File, index: int, blk: Block
|
||||
): Future[?!void] {.async: (raises: [CatchableError]).} =
|
||||
?self.ensureOpen()
|
||||
|
||||
if index > self.manifest.blocksCount:
|
||||
return failure(&"index out of bounds")
|
||||
|
||||
let
|
||||
blockSize = self.manifest.blockSize.int
|
||||
offset = index * blockSize
|
||||
rc = pwrite(self.fd, addr(blk.data[0]), blockSize, offset)
|
||||
|
||||
if rc != blockSize:
|
||||
return failure(&"pwrite failed with error {rc}")
|
||||
|
||||
success()
|
||||
|
||||
proc create*(self: FileStore, manifest: Manifest): ?!File =
|
||||
let path = self.root & "/" & $manifest.treeCid
|
||||
initDataset(path, manifest)
|
||||
Loading…
x
Reference in New Issue
Block a user