nim-codex/codex/stores/repostore.nim

546 lines
16 KiB
Nim

## Nim-Codex
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/upraises
push: {.upraises: [].}
import pkg/chronos
import pkg/chronos/futures
import pkg/chronicles
import pkg/libp2p/[cid, multicodec, multihash]
import pkg/lrucache
import pkg/metrics
import pkg/questionable
import pkg/questionable/results
import pkg/datastore
import pkg/stew/endians2
import ./blockstore
import ./keyutils
import ../blocktype
import ../clock
import ../systemclock
import ../merkletree
export blocktype, cid
logScope:
topics = "codex repostore"
declareGauge(codexRepostoreBlocks, "codex repostore blocks")
declareGauge(codexRepostoreBytesUsed, "codex repostore bytes used")
declareGauge(codexRepostoreBytesReserved, "codex repostore bytes reserved")
const
DefaultBlockTtl* = 24.hours
DefaultQuotaBytes* = 1'u shl 33'u # ~8GB
DefaultTreeCacheCapacity* = 10 # Max number of trees stored in memory
type
QuotaUsedError* = object of CodexError
QuotaNotEnoughError* = object of CodexError
RepoStore* = ref object of BlockStore
postFixLen*: int
repoDs*: Datastore
metaDs*: Datastore
clock: Clock
totalBlocks*: uint # number of blocks in the store
quotaMaxBytes*: uint # maximum available bytes
quotaUsedBytes*: uint # bytes used by the repo
quotaReservedBytes*: uint # bytes reserved by the repo
blockTtl*: Duration
started*: bool
treeCache*: LruCache[Cid, MerkleTree]
BlockExpiration* = object
cid*: Cid
expiration*: SecondsSince1970
GetNext = proc(): Future[?BlockExpiration] {.upraises: [], gcsafe, closure.}
BlockExpirationIter* = ref object
finished*: bool
next*: GetNext
iterator items*(q: BlockExpirationIter): Future[?BlockExpiration] =
while not q.finished:
yield q.next()
proc updateMetrics(self: RepoStore) =
codexRepostoreBlocks.set(self.totalBlocks.int64)
codexRepostoreBytesUsed.set(self.quotaUsedBytes.int64)
codexRepostoreBytesReserved.set(self.quotaReservedBytes.int64)
func totalUsed*(self: RepoStore): uint =
(self.quotaUsedBytes + self.quotaReservedBytes)
func available*(self: RepoStore): uint =
return self.quotaMaxBytes - self.totalUsed
func available*(self: RepoStore, bytes: uint): bool =
return bytes < self.available()
method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
## Get a block from the blockstore
##
without key =? makePrefixKey(self.postFixLen, cid), err:
trace "Error getting key from provider", err = err.msg
return failure(err)
without data =? await self.repoDs.get(key), err:
if not (err of DatastoreKeyNotFound):
trace "Error getting block from datastore", err = err.msg, key
return failure(err)
return failure(newException(BlockNotFoundError, err.msg))
trace "Got block for cid", cid
return Block.new(cid, data, verify = true)
proc getMerkleTree(self: RepoStore, cid: Cid): Future[?!MerkleTree] {.async.} =
try:
return success(self.treeCache[cid])
except KeyError:
without treeBlk =? await self.getBlock(cid), err:
return failure(err)
without tree =? MerkleTree.decode(treeBlk.data), err:
return failure("Error decoding a merkle tree with cid " & $cid & ". Nested error is: " & err.msg)
self.treeCache[cid] = tree
return success(tree)
method getBlock*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!Block] {.async.} =
without tree =? await self.getMerkleTree(treeCid), err:
return failure(err)
without leaf =? tree.getLeaf(index), err:
return failure(err)
without leafCid =? Cid.init(treeCid.cidver, treeCid.mcodec, leaf).mapFailure, err:
return failure(err)
without blk =? await self.getBlock(leafCid), err:
return failure(err)
return success(blk)
method getBlockAndProof*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!(Block, MerkleProof)] {.async.} =
without tree =? await self.getMerkleTree(treeCid), err:
return failure(err)
without proof =? tree.getProof(index), err:
return failure(err)
without leaf =? tree.getLeaf(index), err:
return failure(err)
without leafCid =? Cid.init(CIDv1, leaf.mcodec, leaf).mapFailure, err:
return failure(err)
without blk =? await self.getBlock(leafCid), err:
return failure(err)
return success((blk, proof))
method getBlocks*(self: RepoStore, treeCid: Cid, leavesCount: Natural, merkleRoot: MultiHash): Future[?!BlockIter] {.async.} =
without tree =? await self.getMerkleTree(treeCid), err:
return failure(err)
var
iter = BlockIter()
index = 0
proc next(): Future[?!Block] {.async.} =
if index < leavesCount:
inc index
if index >= leavesCount:
iter.finished = true
without leaf =? tree.getLeaf(index - 1), err:
return failure(err)
without leafCid =? Cid.init(CIDv1, leaf.mcodec, leaf).mapFailure, err:
return failure(err)
without blk =? await self.getBlock(leafCid), err:
return failure(err)
return success(blk)
else:
return failure("No more elements for tree with cid " & $treeCid)
iter.next = next
return success(iter)
proc getBlockExpirationTimestamp(self: RepoStore, ttl: ?Duration): SecondsSince1970 =
let duration = ttl |? self.blockTtl
self.clock.now() + duration.seconds
proc getBlockExpirationEntry(
self: RepoStore,
batch: var seq[BatchEntry],
cid: Cid,
ttl: ?Duration
): ?!BatchEntry =
## Get an expiration entry for a batch
without key =? createBlockExpirationMetadataKey(cid), err:
return failure(err)
let value = self.getBlockExpirationTimestamp(ttl).toBytes
return success((key, value))
proc persistTotalBlocksCount(self: RepoStore): Future[?!void] {.async.} =
if err =? (await self.metaDs.put(
CodexTotalBlocksKey,
@(self.totalBlocks.uint64.toBytesBE))).errorOption:
trace "Error total blocks key!", err = err.msg
return failure(err)
return success()
method putBlock*(
self: RepoStore,
blk: Block,
ttl = Duration.none
): Future[?!void] {.async.} =
## Put a block to the blockstore
##
without key =? makePrefixKey(self.postFixLen, blk.cid), err:
trace "Error getting key from provider", err = err.msg
return failure(err)
if await key in self.repoDs:
trace "Block already in store", cid = blk.cid
return success()
if (self.totalUsed + blk.data.len.uint) > self.quotaMaxBytes:
error "Cannot store block, quota used!", used = self.totalUsed
return failure(
newException(QuotaUsedError, "Cannot store block, quota used!"))
trace "Storing block with key", key
var
batch: seq[BatchEntry]
let
used = self.quotaUsedBytes + blk.data.len.uint
if err =? (await self.repoDs.put(key, blk.data)).errorOption:
trace "Error storing block", err = err.msg
return failure(err)
trace "Updating quota", used
batch.add((QuotaUsedKey, @(used.uint64.toBytesBE)))
without blockExpEntry =? self.getBlockExpirationEntry(batch, blk.cid, ttl), err:
trace "Unable to create block expiration metadata key", err = err.msg
return failure(err)
batch.add(blockExpEntry)
if err =? (await self.metaDs.put(batch)).errorOption:
trace "Error updating quota bytes", err = err.msg
if err =? (await self.repoDs.delete(key)).errorOption:
trace "Error deleting block after failed quota update", err = err.msg
return failure(err)
return failure(err)
self.quotaUsedBytes = used
inc self.totalBlocks
if isErr (await self.persistTotalBlocksCount()):
trace "Unable to update block total metadata"
return failure("Unable to update block total metadata")
self.updateMetrics()
return success()
proc updateQuotaBytesUsed(self: RepoStore, blk: Block): Future[?!void] {.async.} =
let used = self.quotaUsedBytes - blk.data.len.uint
if err =? (await self.metaDs.put(
QuotaUsedKey,
@(used.uint64.toBytesBE))).errorOption:
trace "Error updating quota key!", err = err.msg
return failure(err)
self.quotaUsedBytes = used
self.updateMetrics()
return success()
proc removeBlockExpirationEntry(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
without key =? createBlockExpirationMetadataKey(cid), err:
return failure(err)
return await self.metaDs.delete(key)
method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} =
## Delete a block from the blockstore
##
trace "Deleting block", cid
if blk =? (await self.getBlock(cid)):
if key =? makePrefixKey(self.postFixLen, cid) and
err =? (await self.repoDs.delete(key)).errorOption:
trace "Error deleting block!", err = err.msg
return failure(err)
if isErr (await self.updateQuotaBytesUsed(blk)):
trace "Unable to update quote-bytes-used in metadata store"
return failure("Unable to update quote-bytes-used in metadata store")
if isErr (await self.removeBlockExpirationEntry(blk.cid)):
trace "Unable to remove block expiration entry from metadata store"
return failure("Unable to remove block expiration entry from metadata store")
trace "Deleted block", cid, totalUsed = self.totalUsed
dec self.totalBlocks
if isErr (await self.persistTotalBlocksCount()):
trace "Unable to update block total metadata"
return failure("Unable to update block total metadata")
self.updateMetrics()
return success()
method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
## Check if the block exists in the blockstore
##
without key =? makePrefixKey(self.postFixLen, cid), err:
trace "Error getting key from provider", err = err.msg
return failure(err)
return await self.repoDs.has(key)
method listBlocks*(
self: RepoStore,
blockType = BlockType.Manifest
): Future[?!CidIter] {.async.} =
## Get the list of blocks in the RepoStore.
## This is an intensive operation
##
var
iter = CidIter()
let key =
case blockType:
of BlockType.Manifest: CodexManifestKey
of BlockType.Block: CodexBlocksKey
of BlockType.Both: CodexRepoKey
without queryIter =? (await self.repoDs.query(Query.init(key))), err:
trace "Error querying cids in repo", blockType, err = err.msg
return failure(err)
proc next(): Future[?Cid] {.async.} =
await idleAsync()
iter.finished = queryIter.finished
if not queryIter.finished:
if pair =? (await queryIter.next()) and cid =? pair.key:
trace "Retrieved record from repo", cid
return Cid.init(cid.value).option
return Cid.none
iter.next = next
return success iter
proc createBlockExpirationQuery(maxNumber: int, offset: int): ?!Query =
let queryKey = ? createBlockExpirationMetadataQueryKey()
success Query.init(queryKey, offset = offset, limit = maxNumber)
method getBlockExpirations*(
self: RepoStore,
maxNumber: int,
offset: int
): Future[?!BlockExpirationIter] {.async, base.} =
## Get block experiartions from the given RepoStore
##
without query =? createBlockExpirationQuery(maxNumber, offset), err:
trace "Unable to format block expirations query"
return failure(err)
without queryIter =? (await self.metaDs.query(query)), err:
trace "Unable to execute block expirations query"
return failure(err)
var iter = BlockExpirationIter()
proc next(): Future[?BlockExpiration] {.async.} =
if not queryIter.finished:
if pair =? (await queryIter.next()) and blockKey =? pair.key:
let expirationTimestamp = pair.data
let cidResult = Cid.init(blockKey.value)
if not cidResult.isOk:
raiseAssert("Unable to parse CID from blockKey.value: " & blockKey.value & $cidResult.error)
return BlockExpiration(
cid: cidResult.get,
expiration: expirationTimestamp.toSecondsSince1970
).some
else:
discard await queryIter.dispose()
iter.finished = true
return BlockExpiration.none
iter.next = next
return success iter
method close*(self: RepoStore): Future[void] {.async.} =
## Close the blockstore, cleaning up resources managed by it.
## For some implementations this may be a no-op
##
(await self.repoDs.close()).expect("Should close datastore")
proc hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} =
## Check if the block exists in the blockstore.
## Return false if error encountered
##
without key =? makePrefixKey(self.postFixLen, cid), err:
trace "Error getting key from provider", err = err.msg
return failure(err.msg)
return await self.repoDs.has(key)
proc reserve*(self: RepoStore, bytes: uint): Future[?!void] {.async.} =
## Reserve bytes
##
trace "Reserving bytes", reserved = self.quotaReservedBytes, bytes
if (self.totalUsed + bytes) > self.quotaMaxBytes:
trace "Not enough storage quota to reserver", reserve = self.totalUsed + bytes
return failure(
newException(QuotaNotEnoughError, "Not enough storage quota to reserver"))
self.quotaReservedBytes += bytes
if err =? (await self.metaDs.put(
QuotaReservedKey,
@(toBytesBE(self.quotaReservedBytes.uint64)))).errorOption:
trace "Error reserving bytes", err = err.msg
self.quotaReservedBytes += bytes
return failure(err)
return success()
proc release*(self: RepoStore, bytes: uint): Future[?!void] {.async.} =
## Release bytes
##
trace "Releasing bytes", reserved = self.quotaReservedBytes, bytes
if (self.quotaReservedBytes.int - bytes.int) < 0:
trace "Cannot release this many bytes",
quotaReservedBytes = self.quotaReservedBytes, bytes
return failure("Cannot release this many bytes")
self.quotaReservedBytes -= bytes
if err =? (await self.metaDs.put(
QuotaReservedKey,
@(toBytesBE(self.quotaReservedBytes.uint64)))).errorOption:
trace "Error releasing bytes", err = err.msg
self.quotaReservedBytes -= bytes
return failure(err)
trace "Released bytes", bytes
self.updateMetrics()
return success()
proc start*(self: RepoStore): Future[void] {.async.} =
## Start repo
##
if self.started:
trace "Repo already started"
return
trace "Starting repo"
without total =? await self.metaDs.get(CodexTotalBlocksKey), err:
if not (err of DatastoreKeyNotFound):
error "Unable to read total number of blocks from metadata store", err = err.msg, key = $CodexTotalBlocksKey
if total.len > 0:
self.totalBlocks = uint64.fromBytesBE(total).uint
trace "Number of blocks in store at start", total = self.totalBlocks
## load current persist and cache bytes from meta ds
without quotaUsedBytes =? await self.metaDs.get(QuotaUsedKey), err:
if not (err of DatastoreKeyNotFound):
error "Error getting cache bytes from datastore",
err = err.msg, key = $QuotaUsedKey
raise newException(Defect, err.msg)
if quotaUsedBytes.len > 0:
self.quotaUsedBytes = uint64.fromBytesBE(quotaUsedBytes).uint
notice "Current bytes used for cache quota", bytes = self.quotaUsedBytes
without quotaReservedBytes =? await self.metaDs.get(QuotaReservedKey), err:
if not (err of DatastoreKeyNotFound):
error "Error getting persist bytes from datastore",
err = err.msg, key = $QuotaReservedKey
raise newException(Defect, err.msg)
if quotaReservedBytes.len > 0:
self.quotaReservedBytes = uint64.fromBytesBE(quotaReservedBytes).uint
if self.quotaUsedBytes > self.quotaMaxBytes:
raiseAssert "All storage quota used, increase storage quota!"
notice "Current bytes used for persist quota", bytes = self.quotaReservedBytes
self.updateMetrics()
self.started = true
proc stop*(self: RepoStore): Future[void] {.async.} =
## Stop repo
##
if not self.started:
trace "Repo is not started"
return
trace "Stopping repo"
(await self.repoDs.close()).expect("Should close repo store!")
(await self.metaDs.close()).expect("Should close meta store!")
self.started = false
func new*(
T: type RepoStore,
repoDs: Datastore,
metaDs: Datastore,
clock: Clock = SystemClock.new(),
postFixLen = 2,
quotaMaxBytes = DefaultQuotaBytes,
blockTtl = DefaultBlockTtl,
treeCacheCapacity = DefaultTreeCacheCapacity
): RepoStore =
## Create new instance of a RepoStore
##
RepoStore(
repoDs: repoDs,
metaDs: metaDs,
clock: clock,
postFixLen: postFixLen,
quotaMaxBytes: quotaMaxBytes,
blockTtl: blockTtl,
treeCache: newLruCache[Cid, MerkleTree](treeCacheCapacity))